Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[java] BQ: add missing avro conversions to BQ TableRow #33221

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
Expand All @@ -50,14 +52,14 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* A set of utilities for working with Avro files.
*
* <p>These utilities are based on the <a href="https://avro.apache.org/docs/1.8.1/spec.html">Avro
* 1.8.1</a> specification.
*/
/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {

private static final String VERSION_AVRO =
Optional.ofNullable(Schema.class.getPackage())
.map(Package::getImplementationVersion)
.orElse("");

// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
Expand All @@ -74,6 +76,8 @@ public DateTimeLogicalType() {
* export</a>
* @see <a href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ
* avro storage</a>
* @see <a href=https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro>BQ avro
* load</a>
*/
static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) {
String bqType = schema.getType();
Expand Down Expand Up @@ -116,6 +120,9 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
}
case "DATETIME":
if (useAvroLogicalTypes) {
// BQ export uses a custom logical type
// TODO for load/storage use
// LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType())
return DATETIME_LOGICAL_TYPE.addToSchema(SchemaBuilder.builder().stringType());
} else {
return SchemaBuilder.builder().stringBuilder().prop("sqlType", bqType).endString();
Expand Down Expand Up @@ -158,6 +165,12 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy

@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
String dateTime = formatDatetime(timestampMicro);
return dateTime + " UTC";
}

@VisibleForTesting
static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
Expand All @@ -168,11 +181,13 @@ static String formatTimestamp(Long timestampMicro) {
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);

if (micros == 0) {
return String.format("%s UTC", dayAndTime);
return dayAndTime;
} else if (micros % 1000 == 0) {
return String.format("%s.%03d", dayAndTime, micros / 1000);
} else {
return String.format("%s.%06d", dayAndTime, micros);
}
return String.format("%s.%06d UTC", dayAndTime, micros);
}

/**
Expand Down Expand Up @@ -274,8 +289,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
case UNION:
return convertNullableField(name, schema, v);
case MAP:
throw new UnsupportedOperationException(
String.format("Unexpected Avro field schema type %s for field named %s", type, name));
return convertMapField(name, schema, v);
default:
return convertRequiredField(name, schema, v);
}
Expand All @@ -296,6 +310,26 @@ private static List<Object> convertRepeatedField(String name, Schema elementType
return values;
}

private static List<TableRow> convertMapField(String name, Schema map, Object v) {
// Avro maps are represented as key/value RECORD.
if (v == null) {
// Handle the case of an empty map.
return new ArrayList<>();
}

Schema type = map.getValueType();
Map<String, Object> elements = (Map<String, Object>) v;
ArrayList<TableRow> values = new ArrayList<>();
for (Map.Entry<String, Object> element : elements.entrySet()) {
TableRow row =
new TableRow()
.set("key", element.getKey())
.set("value", convertRequiredField(name, type, element.getValue()));
values.add(row);
}
return values;
}

private static Object convertRequiredField(String name, Schema schema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
Expand All @@ -305,45 +339,83 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
LogicalType logicalType = schema.getLogicalType();
switch (type) {
case BOOLEAN:
// SQL types BOOL, BOOLEAN
// SQL type BOOL (BOOLEAN)
return v;
case INT:
if (logicalType instanceof LogicalTypes.Date) {
// SQL types DATE
// SQL type DATE
// ideally LocalDate but TableRowJsonCoder encodes as String
return formatDate((Integer) v);
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
// Write only: SQL type TIME
// ideally LocalTime but TableRowJsonCoder encodes as String
return formatTime(((Integer) v) * 1000L);
} else {
throw new UnsupportedOperationException(
String.format("Unexpected Avro field schema type %s for field named %s", type, name));
// Write only: SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Integer but keep consistency with BQ JSON export that uses String
return ((Integer) v).toString();
}
case LONG:
if (logicalType instanceof LogicalTypes.TimeMicros) {
// SQL types TIME
// SQL type TIME
// ideally LocalTime but TableRowJsonCoder encodes as String
return formatTime((Long) v);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// Write only: SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v * 1000L);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
// SQL types TIMESTAMP
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime(((Long) v) * 1000);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime((Long) v);
} else {
// SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
// String
return ((Long) v).toString();
}
case FLOAT:
Abacn marked this conversation as resolved.
Show resolved Hide resolved
// Write only: SQL type FLOAT64
// ideally Float but TableRowJsonCoder decodes as Double
return Double.valueOf(v.toString());
case DOUBLE:
// SQL types FLOAT64
// SQL type FLOAT64
return v;
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
// SQL tpe NUMERIC, BIGNUMERIC
// ideally BigDecimal but TableRowJsonCoder encodes as String
return new Conversions.DecimalConversion()
.fromBytes((ByteBuffer) v, schema, logicalType)
.toString();
} else {
// SQL types BYTES
// SQL type BYTES
// ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
}
case STRING:
// SQL types STRING, DATETIME, GEOGRAPHY, JSON
// when not using logical type DATE, TIME too
return v.toString();
case ENUM:
// SQL types STRING
return v.toString();
case FIXED:
// SQL type BYTES
// ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
case RECORD:
// SQL types RECORD
return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
Expand Down
Loading
Loading