diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java index 229268830b86..b8b8efa457d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java @@ -32,7 +32,7 @@ import java.util.Collection; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer; import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior; import org.apache.beam.sdk.util.RowJson.RowJsonSerializer; @@ -133,12 +133,10 @@ private static Object[] makeFlatRowTestCase() { private static Object[] makeLogicalTypeTestCase() { Schema schema = Schema.builder() - .addLogicalTypeField( - "f_passThroughString", - new PassThroughLogicalType( - "SqlCharType", FieldType.STRING, "", FieldType.STRING) {}) + .addLogicalTypeField("f_passThroughString", VariableString.of(10)) .build(); + // fixed string will do padding String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}"; Row expectedRow = Row.withSchema(schema).addValues("hello").build(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 368bf8dc4645..7ef55ffef7b8 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -49,12 +49,13 @@ import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType; import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.schemas.logicaltypes.FixedString; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; @@ -407,15 +408,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu } return toBeamRow((List) value, fieldType.getRowSchema(), verifyValues); case LOGICAL_TYPE: - String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) - || VariableString.IDENTIFIER.equals(identifier)) { - return (String) value; - } else if (FixedBytes.IDENTIFIER.equals(identifier) - || VariableBytes.IDENTIFIER.equals(identifier)) { - return (byte[]) value; - } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { + LogicalType logicalType = fieldType.getLogicalType(); + assert logicalType != null; + String identifier = logicalType.getIdentifier(); + if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) { return Instant.ofEpochMilli(((Number) value).longValue()); } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) { if (value instanceof Date) { @@ -440,6 +436,9 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu LocalTime.ofNanoOfDay( (((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND)); } else { + if (logicalType instanceof PassThroughLogicalType) { + return toBeamObject(value, logicalType.getBaseType(), verifyValues); + } throw new UnsupportedOperationException("Unable to convert logical type " + identifier); } default: @@ -561,8 +560,7 @@ private static Expression getBeamField( break; case LOGICAL_TYPE: String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) + if (FixedString.IDENTIFIER.equals(identifier) || VariableString.IDENTIFIER.equals(identifier)) { value = Expressions.call(expression, "getString", fieldName); } else if (FixedBytes.IDENTIFIER.equals(identifier) @@ -643,8 +641,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType) return nullOr(value, toCalciteRow(value, fieldType.getRowSchema())); case LOGICAL_TYPE: String identifier = fieldType.getLogicalType().getIdentifier(); - if (CharType.IDENTIFIER.equals(identifier) - || FixedString.IDENTIFIER.equals(identifier) + if (FixedString.IDENTIFIER.equals(identifier) || VariableString.IDENTIFIER.equals(identifier)) { return Expressions.convert_(value, String.class); } else if (FixedBytes.IDENTIFIER.equals(identifier) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 23281ac1261e..64c3fe300023 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; -import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.MetricNameFilter; @@ -50,6 +49,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -318,7 +319,9 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { switch (type.getTypeName()) { case LOGICAL_TYPE: - String logicalId = type.getLogicalType().getIdentifier(); + LogicalType logicalType = type.getLogicalType(); + assert logicalType != null; + String logicalId = logicalType.getIdentifier(); if (SqlTypes.TIME.getIdentifier().equals(logicalId)) { if (beamValue instanceof Long) { // base type return (Long) beamValue; @@ -331,7 +334,7 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { } else { // input type return (int) ((LocalDate) beamValue).toEpochDay(); } - } else if (CharType.IDENTIFIER.equals(logicalId)) { + } else if (logicalType instanceof PassThroughLogicalType) { return beamValue; } else { throw new UnsupportedOperationException("Unknown DateTime type " + logicalId); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index 4f8d57a4fbc5..153acce03e1b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -57,15 +57,6 @@ public TimeWithLocalTzType() { } } - /** A LogicalType corresponding to CHAR. */ - public static class CharType extends PassThroughLogicalType { - public static final String IDENTIFIER = "SqlCharType"; - - public CharType() { - super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING); - } - } - /** Returns true if the type is any of the various date time types. */ public static boolean isDateTimeType(FieldType fieldType) { if (fieldType.getTypeName() == TypeName.DATETIME) { @@ -90,10 +81,9 @@ public static boolean isStringType(FieldType fieldType) { } if (fieldType.getTypeName().isLogicalType()) { - Schema.LogicalType logicalType = fieldType.getLogicalType(); - Preconditions.checkArgumentNotNull(logicalType); - String logicalId = logicalType.getIdentifier(); - return logicalId.equals(CharType.IDENTIFIER); + Schema.LogicalType logicalType = fieldType.getLogicalType(); + return logicalType instanceof PassThroughLogicalType + && logicalType.getBaseType().getTypeName() == TypeName.STRING; } return false; } @@ -107,9 +97,10 @@ public static boolean isStringType(FieldType fieldType) { public static final FieldType DOUBLE = FieldType.DOUBLE; public static final FieldType DECIMAL = FieldType.DECIMAL; public static final FieldType BOOLEAN = FieldType.BOOLEAN; + // TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments public static final FieldType VARBINARY = FieldType.BYTES; public static final FieldType VARCHAR = FieldType.STRING; - public static final FieldType CHAR = FieldType.logicalType(new CharType()); + public static final FieldType CHAR = FieldType.STRING; public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE); public static final FieldType NULLABLE_DATE = FieldType.logicalType(SqlTypes.DATE).withNullable(true); @@ -136,7 +127,6 @@ public static boolean isStringType(FieldType fieldType) { .put(BOOLEAN, SqlTypeName.BOOLEAN) .put(VARBINARY, SqlTypeName.VARBINARY) .put(VARCHAR, SqlTypeName.VARCHAR) - .put(CHAR, SqlTypeName.CHAR) .put(DATE, SqlTypeName.DATE) .put(TIME, SqlTypeName.TIME) .put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE) @@ -154,6 +144,9 @@ public static boolean isStringType(FieldType fieldType) { .put(SqlTypeName.DOUBLE, DOUBLE) .put(SqlTypeName.DECIMAL, DECIMAL) .put(SqlTypeName.BOOLEAN, BOOLEAN) + // TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments + // Handle Calcite VARBINARY/BINARY/VARCHAR/CHAR with + // VariableBinary/FixedBinary/VariableString/FixedString logical types. .put(SqlTypeName.VARBINARY, VARBINARY) .put(SqlTypeName.BINARY, VARBINARY) .put(SqlTypeName.VARCHAR, VARCHAR) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index e152beb623d7..6a340496122b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -61,8 +61,10 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.LogicalType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; @@ -262,7 +264,6 @@ public abstract static class Builder { .put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME) .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME) .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME) - .put("SqlCharType", StandardSQLTypeName.STRING) .put("Enum", StandardSQLTypeName.STRING) .build(); @@ -280,6 +281,9 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { Preconditions.checkArgumentNotNull(fieldType.getLogicalType()); ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(logicalType.getIdentifier()); if (ret == null) { + if (logicalType instanceof PassThroughLogicalType) { + return toStandardSQLTypeName(logicalType.getBaseType()); + } throw new IllegalArgumentException( "Cannot convert Beam logical type: " + logicalType.getIdentifier() @@ -718,7 +722,6 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso // TODO: BigQuery shouldn't know about SQL internal logical types. private static final Set SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType"); - private static final Set SQL_STRING_TYPES = ImmutableSet.of("SqlCharType"); /** * Tries to convert an Avro decoded value to a Beam field value based on the target type of the @@ -766,7 +769,9 @@ public static Object convertAvroFormat( case ARRAY: return convertAvroArray(beamFieldType, avroValue, options); case LOGICAL_TYPE: - String identifier = beamFieldType.getLogicalType().getIdentifier(); + LogicalType logicalType = beamFieldType.getLogicalType(); + assert logicalType != null; + String identifier = logicalType.getIdentifier(); if (SqlTypes.DATE.getIdentifier().equals(identifier)) { return convertAvroDate(avroValue); } else if (SqlTypes.TIME.getIdentifier().equals(identifier)) { @@ -784,8 +789,8 @@ public static Object convertAvroFormat( String.format( "Unknown timestamp truncation option: %s", options.getTruncateTimestamps())); } - } else if (SQL_STRING_TYPES.contains(identifier)) { - return convertAvroPrimitiveTypes(TypeName.STRING, avroValue); + } else if (logicalType instanceof PassThroughLogicalType) { + return convertAvroFormat(logicalType.getBaseType(), avroValue, options); } else { throw new RuntimeException("Unknown logical type " + identifier); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java index 1560fccadf2e..3b1ff015d7e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/CellValueParser.java @@ -25,6 +25,8 @@ import com.google.protobuf.ByteString; import java.io.Serializable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.LogicalType; +import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Shorts; @@ -95,11 +97,12 @@ ByteString valueToByteString(Object value, Schema.FieldType type) { case DATETIME: return byteString(value.toString().getBytes(UTF_8)); case LOGICAL_TYPE: - String identifier = checkArgumentNotNull(type.getLogicalType()).getIdentifier(); - if ("SqlCharType".equals(identifier)) { - return byteString(((String) value).getBytes(UTF_8)); + LogicalType logicalType = checkArgumentNotNull(type.getLogicalType()); + if (logicalType instanceof PassThroughLogicalType) { + return valueToByteString(value, logicalType.getBaseType()); } else { - throw new IllegalStateException("Unsupported logical type: " + identifier); + throw new IllegalStateException( + "Unsupported logical type: " + logicalType.getIdentifier()); } default: throw new IllegalStateException("Unsupported type: " + type.getTypeName());