From 95489e7ecb1157e7eec27b9c116f346b187afd6b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 5 Jan 2023 18:27:28 -0500 Subject: [PATCH 1/2] Attempt deserialize all non-standard logical types from proto * Fixes portable and not-yet-standard logical type get deserialized to UnknownLogicalType --- .../beam/sdk/schemas/SchemaTranslation.java | 54 +++++++++++-------- .../sdk/schemas/SchemaTranslationTest.java | 42 +++++++++++++++ 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index f79db31bf7ec..fddaaab8ec80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -63,6 +64,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; import org.apache.commons.lang3.ClassUtils; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utility methods for translating schemas. */ @Experimental(Kind.SCHEMAS) @@ -71,6 +74,7 @@ "rawtypes" }) public class SchemaTranslation { + private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class); private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER; private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; @@ -124,8 +128,8 @@ private static SchemaApi.Field fieldToProto( .build(); } - private static SchemaApi.FieldType fieldTypeToProto( - FieldType fieldType, boolean serializeLogicalType) { + @VisibleForTesting + static SchemaApi.FieldType fieldTypeToProto(FieldType fieldType, boolean serializeLogicalType) { SchemaApi.FieldType.Builder builder = SchemaApi.FieldType.newBuilder(); switch (fieldType.getTypeName()) { case ROW: @@ -297,7 +301,8 @@ private static Field fieldFromProto(SchemaApi.Field protoField) { .withDescription(protoField.getDescription()); } - private static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) { + @VisibleForTesting + static FieldType fieldTypeFromProto(SchemaApi.FieldType protoFieldType) { FieldType fieldType = fieldTypeFromProtoWithoutNullable(protoFieldType); if (protoFieldType.getNullable()) { @@ -426,26 +431,33 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p return FieldType.DATETIME; } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { return FieldType.DECIMAL; - } else if (urn.equals(URN_BEAM_LOGICAL_JAVASDK)) { - return FieldType.logicalType( - (LogicalType) - SerializableUtils.deserializeFromByteArray( - logicalType.getPayload().toByteArray(), "logicalType")); - } else { - @Nullable FieldType argumentType = null; - @Nullable Object argumentValue = null; - if (logicalType.hasArgumentType()) { - argumentType = fieldTypeFromProto(logicalType.getArgumentType()); - argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument()); + } else if (urn.startsWith("beam:logical_type:")) { + try { + return FieldType.logicalType( + (LogicalType) + SerializableUtils.deserializeFromByteArray( + logicalType.getPayload().toByteArray(), "logicalType")); + } catch (IllegalArgumentException e) { + LOG.warn( + String.format( + "Unable to deserialize the logical type %s from proto. Mark as UnknownLogicalType", + urn)); } - return FieldType.logicalType( - new UnknownLogicalType( - urn, - logicalType.getPayload().toByteArray(), - argumentType, - argumentValue, - fieldTypeFromProto(logicalType.getRepresentation()))); } + // assemble an UnknownLogicalType + @Nullable FieldType argumentType = null; + @Nullable Object argumentValue = null; + if (logicalType.hasArgumentType()) { + argumentType = fieldTypeFromProto(logicalType.getArgumentType()); + argumentValue = fieldValueFromProto(argumentType, logicalType.getArgument()); + } + return FieldType.logicalType( + new UnknownLogicalType( + urn, + logicalType.getPayload().toByteArray(), + argumentType, + argumentValue, + fieldTypeFromProto(logicalType.getRepresentation()))); default: throw new IllegalArgumentException( "Unexpected type_info: " + protoFieldType.getTypeInfoCase()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index 2c1ed474a076..2c7d781d33b7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -40,9 +40,16 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.DateTime; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; +import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets; @@ -395,6 +402,41 @@ public void typeInfoNotSet() { } } + @RunWith(JUnit4.class) + public static class LogicalTypesTest { + @Test + public void testPortableLogicalTypeSerializeDeserilizeCorrectly() { + List testCases = + ImmutableList.builder() + .add(FieldType.logicalType(SqlTypes.DATE)) + .add(FieldType.logicalType(SqlTypes.TIME)) + .add(FieldType.logicalType(SqlTypes.DATETIME)) + .add(FieldType.logicalType(SqlTypes.TIMESTAMP)) + .add(FieldType.logicalType(new NanosInstant())) + .add(FieldType.logicalType(new NanosDuration())) + .add(FieldType.logicalType(FixedBytes.of(10))) + .add(FieldType.logicalType(VariableBytes.of(10))) + .add(FieldType.logicalType(FixedString.of(10))) + .add(FieldType.logicalType(VariableString.of(10))) + .add(FieldType.logicalType(FixedPrecisionNumeric.of(10))) + .build(); + + for (Schema.FieldType fieldType : testCases) { + SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true); + Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); + + assertThat( + translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass())); + assertThat( + translated.getLogicalType().getArgumentType(), + equalTo(fieldType.getLogicalType().getArgumentType())); + assertThat( + translated.getLogicalType().getArgument(), + equalTo(fieldType.getLogicalType().getArgument())); + } + } + } + /** A simple logical type that has no argument. */ private static class NullArgumentLogicalType implements Schema.LogicalType { public static final String IDENTIFIER = "beam:logical_type:null_argument:v1"; From 0025ed33396691cfd3f545a3c25608f10fae1991 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 6 Jan 2023 10:06:10 -0500 Subject: [PATCH 2/2] LOG message fix; Use Parameterized test pattern --- .../beam/sdk/schemas/SchemaTranslation.java | 5 +- .../sdk/schemas/SchemaTranslationTest.java | 62 ++++++++++--------- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index fddaaab8ec80..0a9680185626 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -439,9 +439,8 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p logicalType.getPayload().toByteArray(), "logicalType")); } catch (IllegalArgumentException e) { LOG.warn( - String.format( - "Unable to deserialize the logical type %s from proto. Mark as UnknownLogicalType", - urn)); + "Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.", + urn); } } // assemble an UnknownLogicalType diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index 2c7d781d33b7..a648e5d662ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -402,38 +402,42 @@ public void typeInfoNotSet() { } } - @RunWith(JUnit4.class) + /** Test schema translation of logical types. */ + @RunWith(Parameterized.class) public static class LogicalTypesTest { + @Parameters(name = "{index}: {0}") + public static Iterable data() { + return ImmutableList.builder() + .add(FieldType.logicalType(SqlTypes.DATE)) + .add(FieldType.logicalType(SqlTypes.TIME)) + .add(FieldType.logicalType(SqlTypes.DATETIME)) + .add(FieldType.logicalType(SqlTypes.TIMESTAMP)) + .add(FieldType.logicalType(new NanosInstant())) + .add(FieldType.logicalType(new NanosDuration())) + .add(FieldType.logicalType(FixedBytes.of(10))) + .add(FieldType.logicalType(VariableBytes.of(10))) + .add(FieldType.logicalType(FixedString.of(10))) + .add(FieldType.logicalType(VariableString.of(10))) + .add(FieldType.logicalType(FixedPrecisionNumeric.of(10))) + .build(); + } + + @Parameter(0) + public Schema.FieldType fieldType; + @Test public void testPortableLogicalTypeSerializeDeserilizeCorrectly() { - List testCases = - ImmutableList.builder() - .add(FieldType.logicalType(SqlTypes.DATE)) - .add(FieldType.logicalType(SqlTypes.TIME)) - .add(FieldType.logicalType(SqlTypes.DATETIME)) - .add(FieldType.logicalType(SqlTypes.TIMESTAMP)) - .add(FieldType.logicalType(new NanosInstant())) - .add(FieldType.logicalType(new NanosDuration())) - .add(FieldType.logicalType(FixedBytes.of(10))) - .add(FieldType.logicalType(VariableBytes.of(10))) - .add(FieldType.logicalType(FixedString.of(10))) - .add(FieldType.logicalType(VariableString.of(10))) - .add(FieldType.logicalType(FixedPrecisionNumeric.of(10))) - .build(); - - for (Schema.FieldType fieldType : testCases) { - SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true); - Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); - - assertThat( - translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass())); - assertThat( - translated.getLogicalType().getArgumentType(), - equalTo(fieldType.getLogicalType().getArgumentType())); - assertThat( - translated.getLogicalType().getArgument(), - equalTo(fieldType.getLogicalType().getArgument())); - } + SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true); + Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); + + assertThat( + translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass())); + assertThat( + translated.getLogicalType().getArgumentType(), + equalTo(fieldType.getLogicalType().getArgumentType())); + assertThat( + translated.getLogicalType().getArgument(), + equalTo(fieldType.getLogicalType().getArgument())); } }