Skip to content

Commit

Permalink
Eliminate CalciteUtil.CharType logical type (#24013)
Browse files Browse the repository at this point in the history
* Eliminate CalciteUtils.CharType logical type

* Replace CalciteUtils.CharType to String
  Note that CalciteUtils still omits the precision of BINARY/VARBINARY/CHAR/VARCHAR
  as what it originally did. Support of the precision of these calcite types involves
  make use of making use of the overload method RelDataTypeFactory.createSqlType(var1, var2).

* Replace every reference of CalciteUtil.CharType to generic
  PassThroughLogicalType check

* Add TODO to Support sql types with arguments

* Use VariableString in LogicalTypeTestCase
  • Loading branch information
Abacn authored Nov 15, 2022
1 parent f349f41 commit 85df5f2
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collection;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.NullBehavior;
import org.apache.beam.sdk.util.RowJson.RowJsonSerializer;
Expand Down Expand Up @@ -133,12 +133,10 @@ private static Object[] makeFlatRowTestCase() {
private static Object[] makeLogicalTypeTestCase() {
Schema schema =
Schema.builder()
.addLogicalTypeField(
"f_passThroughString",
new PassThroughLogicalType<String>(
"SqlCharType", FieldType.STRING, "", FieldType.STRING) {})
.addLogicalTypeField("f_passThroughString", VariableString.of(10))
.build();

// fixed string will do padding
String rowString = "{\n" + "\"f_passThroughString\" : \"hello\"\n" + "}";

Row expectedRow = Row.withSchema(schema).addValues("hello").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@
import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
Expand Down Expand Up @@ -407,15 +408,10 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
}
return toBeamRow((List<Object>) value, fieldType.getRowSchema(), verifyValues);
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return (String) value;
} else if (FixedBytes.IDENTIFIER.equals(identifier)
|| VariableBytes.IDENTIFIER.equals(identifier)) {
return (byte[]) value;
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
LogicalType<?, ?> logicalType = fieldType.getLogicalType();
assert logicalType != null;
String identifier = logicalType.getIdentifier();
if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
return Instant.ofEpochMilli(((Number) value).longValue());
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
if (value instanceof Date) {
Expand All @@ -440,6 +436,9 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
LocalTime.ofNanoOfDay(
(((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND));
} else {
if (logicalType instanceof PassThroughLogicalType) {
return toBeamObject(value, logicalType.getBaseType(), verifyValues);
}
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
}
default:
Expand Down Expand Up @@ -561,8 +560,7 @@ private static Expression getBeamField(
break;
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
if (FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
value = Expressions.call(expression, "getString", fieldName);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
Expand Down Expand Up @@ -643,8 +641,7 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
return nullOr(value, toCalciteRow(value, fieldType.getRowSchema()));
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
if (FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return Expressions.convert_(value, String.class);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
Expand All @@ -50,6 +49,8 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -318,7 +319,9 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {

switch (type.getTypeName()) {
case LOGICAL_TYPE:
String logicalId = type.getLogicalType().getIdentifier();
LogicalType<?, ?> logicalType = type.getLogicalType();
assert logicalType != null;
String logicalId = logicalType.getIdentifier();
if (SqlTypes.TIME.getIdentifier().equals(logicalId)) {
if (beamValue instanceof Long) { // base type
return (Long) beamValue;
Expand All @@ -331,7 +334,7 @@ private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) {
} else { // input type
return (int) ((LocalDate) beamValue).toEpochDay();
}
} else if (CharType.IDENTIFIER.equals(logicalId)) {
} else if (logicalType instanceof PassThroughLogicalType) {
return beamValue;
} else {
throw new UnsupportedOperationException("Unknown DateTime type " + logicalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ public TimeWithLocalTzType() {
}
}

/** A LogicalType corresponding to CHAR. */
public static class CharType extends PassThroughLogicalType<String> {
public static final String IDENTIFIER = "SqlCharType";

public CharType() {
super(IDENTIFIER, FieldType.STRING, "", FieldType.STRING);
}
}

/** Returns true if the type is any of the various date time types. */
public static boolean isDateTimeType(FieldType fieldType) {
if (fieldType.getTypeName() == TypeName.DATETIME) {
Expand All @@ -90,10 +81,9 @@ public static boolean isStringType(FieldType fieldType) {
}

if (fieldType.getTypeName().isLogicalType()) {
Schema.LogicalType logicalType = fieldType.getLogicalType();
Preconditions.checkArgumentNotNull(logicalType);
String logicalId = logicalType.getIdentifier();
return logicalId.equals(CharType.IDENTIFIER);
Schema.LogicalType<?, ?> logicalType = fieldType.getLogicalType();
return logicalType instanceof PassThroughLogicalType
&& logicalType.getBaseType().getTypeName() == TypeName.STRING;
}
return false;
}
Expand All @@ -107,9 +97,10 @@ public static boolean isStringType(FieldType fieldType) {
public static final FieldType DOUBLE = FieldType.DOUBLE;
public static final FieldType DECIMAL = FieldType.DECIMAL;
public static final FieldType BOOLEAN = FieldType.BOOLEAN;
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
public static final FieldType VARBINARY = FieldType.BYTES;
public static final FieldType VARCHAR = FieldType.STRING;
public static final FieldType CHAR = FieldType.logicalType(new CharType());
public static final FieldType CHAR = FieldType.STRING;
public static final FieldType DATE = FieldType.logicalType(SqlTypes.DATE);
public static final FieldType NULLABLE_DATE =
FieldType.logicalType(SqlTypes.DATE).withNullable(true);
Expand All @@ -136,7 +127,6 @@ public static boolean isStringType(FieldType fieldType) {
.put(BOOLEAN, SqlTypeName.BOOLEAN)
.put(VARBINARY, SqlTypeName.VARBINARY)
.put(VARCHAR, SqlTypeName.VARCHAR)
.put(CHAR, SqlTypeName.CHAR)
.put(DATE, SqlTypeName.DATE)
.put(TIME, SqlTypeName.TIME)
.put(TIME_WITH_LOCAL_TZ, SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE)
Expand All @@ -154,6 +144,9 @@ public static boolean isStringType(FieldType fieldType) {
.put(SqlTypeName.DOUBLE, DOUBLE)
.put(SqlTypeName.DECIMAL, DECIMAL)
.put(SqlTypeName.BOOLEAN, BOOLEAN)
// TODO(https://github.com/apache/beam/issues/24019) Support sql types with arguments
// Handle Calcite VARBINARY/BINARY/VARCHAR/CHAR with
// VariableBinary/FixedBinary/VariableString/FixedString logical types.
.put(SqlTypeName.VARBINARY, VARBINARY)
.put(SqlTypeName.BINARY, VARBINARY)
.put(SqlTypeName.VARCHAR, VARCHAR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
Expand Down Expand Up @@ -262,7 +264,6 @@ public abstract static class Builder {
.put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME)
.put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
.put("SqlCharType", StandardSQLTypeName.STRING)
.put("Enum", StandardSQLTypeName.STRING)
.build();

Expand All @@ -280,6 +281,9 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
Preconditions.checkArgumentNotNull(fieldType.getLogicalType());
ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(logicalType.getIdentifier());
if (ret == null) {
if (logicalType instanceof PassThroughLogicalType) {
return toStandardSQLTypeName(logicalType.getBaseType());
}
throw new IllegalArgumentException(
"Cannot convert Beam logical type: "
+ logicalType.getIdentifier()
Expand Down Expand Up @@ -718,7 +722,6 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso

// TODO: BigQuery shouldn't know about SQL internal logical types.
private static final Set<String> SQL_DATE_TIME_TYPES = ImmutableSet.of("SqlTimeWithLocalTzType");
private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");

/**
* Tries to convert an Avro decoded value to a Beam field value based on the target type of the
Expand Down Expand Up @@ -766,7 +769,9 @@ public static Object convertAvroFormat(
case ARRAY:
return convertAvroArray(beamFieldType, avroValue, options);
case LOGICAL_TYPE:
String identifier = beamFieldType.getLogicalType().getIdentifier();
LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
assert logicalType != null;
String identifier = logicalType.getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
return convertAvroDate(avroValue);
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
Expand All @@ -784,8 +789,8 @@ public static Object convertAvroFormat(
String.format(
"Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));
}
} else if (SQL_STRING_TYPES.contains(identifier)) {
return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);
} else if (logicalType instanceof PassThroughLogicalType) {
return convertAvroFormat(logicalType.getBaseType(), avroValue, options);
} else {
throw new RuntimeException("Unknown logical type " + identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.protobuf.ByteString;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Longs;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Shorts;
Expand Down Expand Up @@ -95,11 +97,12 @@ ByteString valueToByteString(Object value, Schema.FieldType type) {
case DATETIME:
return byteString(value.toString().getBytes(UTF_8));
case LOGICAL_TYPE:
String identifier = checkArgumentNotNull(type.getLogicalType()).getIdentifier();
if ("SqlCharType".equals(identifier)) {
return byteString(((String) value).getBytes(UTF_8));
LogicalType<?, ?> logicalType = checkArgumentNotNull(type.getLogicalType());
if (logicalType instanceof PassThroughLogicalType) {
return valueToByteString(value, logicalType.getBaseType());
} else {
throw new IllegalStateException("Unsupported logical type: " + identifier);
throw new IllegalStateException(
"Unsupported logical type: " + logicalType.getIdentifier());
}
default:
throw new IllegalStateException("Unsupported type: " + type.getTypeName());
Expand Down

0 comments on commit 85df5f2

Please sign in to comment.