Skip to content

Commit

Permalink
feat: add ability to write Range values with JSONStreamWriter (#2498)
Browse files Browse the repository at this point in the history
* feat: add ability to write Range values with JSONStreamWriter

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Update tests to include mixed case and string values

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Add Arrow processor to validate reading range values

* Add test scope to Arrow dependencies

* Fix unit test

* Fix maven dependencies check failure for arrow

* Add arrow-memory-netty to fix sample exception issue with no DefaultAllocationManager

* Temp remove arrow test dependencies to test sample failure

* Further testing

* Test by hard coding test arrow version

* Revert changes for testing

* Pass dep. check

* Updated test for case sensitivity

* Update integration test for case sensitivity for all cases

* Update JsonToProtoMessageTest to include mixed case fields

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
PhongChuong and gcf-owl-bot[bot] authored May 31, 2024
1 parent 53bb216 commit a5e62be
Show file tree
Hide file tree
Showing 9 changed files with 975 additions and 50 deletions.
13 changes: 13 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
</parent>
<properties>
<site.installationModule>google-cloud-bigquerystorage</site.installationModule>
<arrow.version>15.0.2</arrow.version>
</properties>
<build>
<extensions>
Expand Down Expand Up @@ -197,6 +198,18 @@
<version>1.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class BQTableSchemaToProtoDescriptor {
.put(TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.JSON, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.INTERVAL, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.RANGE, FieldDescriptorProto.Type.TYPE_MESSAGE)
.build();

/**
Expand Down Expand Up @@ -89,7 +91,7 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl(
TableSchema BQTableSchema,
String scope,
HashMap<ImmutableList<TableFieldSchema>, Descriptor> dependencyMap)
throws Descriptors.DescriptorValidationException {
throws Descriptors.DescriptorValidationException, IllegalArgumentException {
List<FileDescriptor> dependenciesList = new ArrayList<FileDescriptor>();
List<FieldDescriptorProto> fields = new ArrayList<FieldDescriptorProto>();
int index = 1;
Expand All @@ -99,25 +101,72 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl(
? BQTableField.getName()
: BigQuerySchemaUtil.generatePlaceholderFieldName(BQTableField.getName());
String currentScope = scope + "__" + scopeName;
if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
ImmutableList<TableFieldSchema> fieldList =
ImmutableList.copyOf(BQTableField.getFieldsList());
if (dependencyMap.containsKey(fieldList)) {
Descriptor descriptor = dependencyMap.get(fieldList);
dependenciesList.add(descriptor.getFile());
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName()));
} else {
Descriptor descriptor =
convertBQTableSchemaToProtoDescriptorImpl(
TableSchema.newBuilder().addAllFields(fieldList).build(),
currentScope,
dependencyMap);
dependenciesList.add(descriptor.getFile());
dependencyMap.put(fieldList, descriptor);
switch (BQTableField.getType()) {
case STRUCT:
ImmutableList<TableFieldSchema> fieldList =
ImmutableList.copyOf(BQTableField.getFieldsList());
if (dependencyMap.containsKey(fieldList)) {
Descriptor descriptor = dependencyMap.get(fieldList);
dependenciesList.add(descriptor.getFile());
fields.add(
convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName()));
} else {
Descriptor descriptor =
convertBQTableSchemaToProtoDescriptorImpl(
TableSchema.newBuilder().addAllFields(fieldList).build(),
currentScope,
dependencyMap);
dependenciesList.add(descriptor.getFile());
dependencyMap.put(fieldList, descriptor);
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
}
break;
case RANGE:
switch (BQTableField.getRangeElementType().getType()) {
case DATE:
case DATETIME:
case TIMESTAMP:
break;
default:
throw new IllegalArgumentException(
String.format(
"Error: %s of type RANGE requires range element type (DATE, DATETIME, TIMESTAMP)",
currentScope));
}
// For RANGE type, expliclitly add the fields start and end of the same FieldElementType
// as it is not expliclity defined in the TableSchema.
ImmutableList<TableFieldSchema> rangeFields =
ImmutableList.of(
TableFieldSchema.newBuilder()
.setType(BQTableField.getRangeElementType().getType())
.setName("start")
.setMode(Mode.NULLABLE)
.build(),
TableFieldSchema.newBuilder()
.setType(BQTableField.getRangeElementType().getType())
.setName("end")
.setMode(Mode.NULLABLE)
.build());

if (dependencyMap.containsKey(rangeFields)) {
Descriptor descriptor = dependencyMap.get(rangeFields);
dependenciesList.add(descriptor.getFile());
fields.add(
convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName()));
} else {
Descriptor descriptor =
convertBQTableSchemaToProtoDescriptorImpl(
TableSchema.newBuilder().addAllFields(rangeFields).build(),
currentScope,
dependencyMap);
dependenciesList.add(descriptor.getFile());
dependencyMap.put(rangeFields, descriptor);
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
}
break;
default:
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
}
} else {
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
break;
}
}
FileDescriptor[] dependenciesArray = new FileDescriptor[dependenciesList.size()];
Expand Down Expand Up @@ -150,11 +199,19 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField(
.setNumber(index)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode));

if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
fieldDescriptor.setTypeName(scope);
} else {
fieldDescriptor.setType(
(FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()));
switch (BQTableField.getType()) {
case STRUCT:
fieldDescriptor.setTypeName(scope);
break;
case RANGE:
fieldDescriptor.setType(
(FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()));
fieldDescriptor.setTypeName(scope);
break;
default:
fieldDescriptor.setType(
(FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()));
break;
}

// Sets columnName annotation when field name is not proto comptaible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,40 @@ private FieldDescriptorAndFieldTableSchema computeDescriptorAndSchema(
if (tableFieldSchemaList != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableFieldSchemaList.get(field.getIndex());
// For RANGE type, expliclitly add the fields start and end of the same FieldElementType as it
// is not expliclity defined in the TableFieldSchema.
if (fieldSchema.getType() == TableFieldSchema.Type.RANGE) {
switch (fieldSchema.getRangeElementType().getType()) {
case DATE:
case DATETIME:
case TIMESTAMP:
fieldSchema =
fieldSchema
.toBuilder()
.addFields(
TableFieldSchema.newBuilder()
.setName("start")
.setType(fieldSchema.getRangeElementType().getType())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("end")
.setType(fieldSchema.getRangeElementType().getType())
.build())
.build();
break;
default:
throw new ValidationException(
"Field at index "
+ field.getIndex()
+ " with name ("
+ fieldSchema.getName()
+ ") with type (RANGE) has an unsupported range element type ("
+ fieldSchema.getRangeElementType()
+ ")");
}
}

if (!fieldSchema.getName().toLowerCase().equals(BigQuerySchemaUtil.getFieldName(field))) {
throw new ValidationException(
"Field at index "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,76 @@ public void testSimpleTypes() throws Exception {
}
}

@Test
public void testRange() throws Exception {
final TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(
TableFieldSchema.newBuilder()
.setName("range_date")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.DATE)
.build())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("range_datetime")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.build())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("range_timestamp")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.TIMESTAMP)
.build())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("range_date_miXEd_caSE")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.DATE)
.build())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("range_datetime_miXEd_caSE")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.build())
.build())
.addFields(
TableFieldSchema.newBuilder()
.setName("range_timestamp_miXEd_caSE")
.setType(TableFieldSchema.Type.RANGE)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setRangeElementType(
TableFieldSchema.FieldElementType.newBuilder()
.setType(TableFieldSchema.Type.TIMESTAMP)
.build())
.build())
.build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
isDescriptorEqual(descriptor, TestRange.getDescriptor());
}

@Test
public void testStructSimple() throws Exception {
final TableFieldSchema stringType =
Expand Down
Loading

0 comments on commit a5e62be

Please sign in to comment.