diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index f79db31bf7ec..0a9680185626 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -63,6 +64,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; import org.apache.commons.lang3.ClassUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utility methods for translating schemas. */ @Experimental(Kind.SCHEMAS) @@ -71,6 +74,7 @@ "rawtypes" }) public class SchemaTranslation { + private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class); private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER; private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; @@ -124,8 +128,8 @@ private static SchemaApi.Field fieldToProto( .build(); } - private static SchemaApi.FieldType fieldTypeToProto( - FieldType fieldType, boolean serializeLogicalType) { + @VisibleForTesting + static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) { SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder(); switch (fieldType.getTypeName()) { case ROW: @@ -297,7 +301,8 @@ private static Field fieldFromProto(SchemaApi.Field protoField) { .withDescription(protoField.getDescription()); } - private static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) { + @VisibleForTesting + static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) { FieldType fieldType = fieldTypeFromProtoWithoutNullable(protoFieldType); if (protoFieldType.getNullable()) { @@ -426,26 +431,32 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p return FieldType.DATETIME; } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { return FieldType.DECIMAL; - } else if (urn.equals(URN_BEAM_LOGICAL_JAVASDK)) { - return FieldType.logicalType( - (LogicalType) - SerializableUtils.deserializeFromByteArray( - logicalType.getPayload().toByteArray(), "logicalType")); - } else { - @Nullable FieldType argumentType = null; - @Nullable Object argumentValue = null; - if (logicalType.hasArgumentType()) { - argumentType = fieldTypeFromProto(logicalType.getArgumentType()); - argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument()); + } else if (urn.startsWith("beam:logical_type:")) { + try { + return FieldType.logicalType( + (LogicalType) + SerializableUtils.deserializeFromByteArray( + logicalType.getPayload().toByteArray(), "logicalType")); + } catch (IllegalArgumentException e) { + LOG.warn( + "Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.", + urn); } - return FieldType.logicalType( - new UnknownLogicalType( - urn, - logicalType.getPayload().toByteArray(), - argumentType, - argumentValue, - fieldTypeFromProto(logicalType.getRepresentation()))); } + // assemble an UnknownLogicalType + @Nullable FieldType argumentType = null; + @Nullable Object argumentValue = null; + if (logicalType.hasArgumentType()) { + argumentType = fieldTypeFromProto(logicalType.getArgumentType()); + argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument()); + } + return FieldType.logicalType( + new UnknownLogicalType( + urn, + logicalType.getPayload().toByteArray(), + argumentType, + argumentValue, + fieldTypeFromProto(logicalType.getRepresentation()))); default: throw new IllegalArgumentException( "Unexpected type_info: " + protoFieldType.getTypeInfoCase()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index 2c1ed474a076..a648e5d662ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -40,9 +40,16 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; @@ -395,6 +402,45 @@ public void typeInfoNotSet() { } } + /** Test schema translation of logical types. */ + @RunWith(Parameterized.class) + public static class LogicalTypesTest { + @Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(FieldType.logicalType(SqlTypes.DATE)) + .add(FieldType.logicalType(SqlTypes.TIME)) + .add(FieldType.logicalType(SqlTypes.DATETIME)) + .add(FieldType.logicalType(SqlTypes.TIMESTAMP)) + .add(FieldType.logicalType(new NanosInstant())) + .add(FieldType.logicalType(new NanosDuration())) + .add(FieldType.logicalType(FixedBytes.of(10))) + .add(FieldType.logicalType(VariableBytes.of(10))) + .add(FieldType.logicalType(FixedString.of(10))) + .add(FieldType.logicalType(VariableString.of(10))) + .add(FieldType.logicalType(FixedPrecisionNumeric.of(10))) + .build(); + } + + @Parameter(0) + public Schema.FieldType fieldType; + + @Test + public void testPortableLogicalTypeSerializeDeserilizeCorrectly() { + SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true); + Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); + + assertThat( + translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass())); + assertThat( + translated.getLogicalType().getArgumentType(), + equalTo(fieldType.getLogicalType().getArgumentType())); + assertThat( + translated.getLogicalType().getArgument(), + equalTo(fieldType.getLogicalType().getArgument())); + } + } + /** A simple logical type that has no argument. */ private static class NullArgumentLogicalType implements Schema.LogicalType { public static final String IDENTIFIER = "beam:logical_type:null_argument:v1";