Skip to content

Commit

Permalink
Add map and fixed types conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Nov 26, 2024
1 parent fe0854d commit 6e81f65
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Conversions;
Expand Down Expand Up @@ -288,8 +289,7 @@ static TableRow convertGenericRecordToTableRow(GenericRecord record) {
case UNION:
return convertNullableField(name, schema, v);
case MAP:
throw new UnsupportedOperationException(
String.format("Unexpected Avro field schema type %s for field named %s", type, name));
return convertMapField(name, schema, v);
default:
return convertRequiredField(name, schema, v);
}
Expand All @@ -310,6 +310,26 @@ private static List<Object> convertRepeatedField(String name, Schema elementType
return values;
}

private static List<TableRow> convertMapField(String name, Schema map, Object v) {
// Avro maps are represented as key/value RECORD.
if (v == null) {
// Handle the case of an empty map.
return new ArrayList<>();
}

Schema type = map.getValueType();
Map<String, Object> elements = (Map<String, Object>) v;
ArrayList<TableRow> values = new ArrayList<>();
for (Map.Entry<String, Object> element : elements.entrySet()) {
TableRow row =
new TableRow()
.set("key", element.getKey())
.set("value", convertRequiredField(name, type, element.getValue()));
values.add(row);
}
return values;
}

private static Object convertRequiredField(String name, Schema schema, Object v) {
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
Expand Down Expand Up @@ -387,7 +407,15 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
// SQL types STRING, DATETIME, GEOGRAPHY, JSON
// when not using logical type DATE, TIME too
return v.toString();
case ENUM:
// SQL types STRING
return v.toString();
case FIXED:
// SQL type BYTES
// ideally byte[] but TableRowJsonCoder encodes as String
return BaseEncoding.base64().encode(((ByteBuffer) v).array());
case RECORD:
// SQL types RECORD
return convertGenericRecordToTableRow((GenericRecord) v);
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -317,6 +320,40 @@ public void testConvertGenericRecordToTableRow() {
assertEquals(expected, row.clone());
}

{
// enum
Schema enumSchema = SchemaBuilder.enumeration("color").symbols("red", "green", "blue");
GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema, "RED");
GenericRecord record =
new GenericRecordBuilder(avroSchema(f -> f.type(enumSchema).noDefault()))
.set("value", symbol)
.build();
TableRow expected = new TableRow().set("value", "RED");
TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);

assertEquals(expected, row);
assertEquals(expected, row.clone());
}

{
// fixed
UUID uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
bb.rewind();
byte[] bytes = bb.array();
GenericRecord record =
new GenericRecordBuilder(avroSchema(f -> f.type().fixed("uuid").size(16).noDefault()))
.set("value", bb)
.build();
TableRow expected = new TableRow().set("value", BaseEncoding.base64().encode(bytes));
TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);

assertEquals(expected, row);
assertEquals(expected, row.clone());
}

{
// null
GenericRecord record =
Expand All @@ -342,6 +379,28 @@ public void testConvertGenericRecordToTableRow() {
assertEquals(expected, row.clone());
}

{
// map
Map<String, Integer> map = new HashMap<>();
map.put("left", 1);
map.put("right", -1);
GenericRecord record =
new GenericRecordBuilder(avroSchema(f -> f.type().map().values().intType().noDefault()))
.set("value", map)
.build();
TableRow expected =
new TableRow()
.set(
"value",
Lists.newArrayList(
new TableRow().set("key", "left").set("value", "1"),
new TableRow().set("key", "right").set("value", "-1")));
TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);

assertEquals(expected, row);
assertEquals(expected, row.clone());
}

{
// record
Schema subSchema =
Expand Down

0 comments on commit 6e81f65

Please sign in to comment.