From a5e62bedf35238f440f7b52dc3e1d3c972e4acdd Mon Sep 17 00:00:00 2001 From: Phong Chuong <147636638+PhongChuong@users.noreply.github.com> Date: Fri, 31 May 2024 16:06:13 -0400 Subject: [PATCH] feat: add ability to write Range values with JSONStreamWriter (#2498) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add ability to write Range values with JSONStreamWriter * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update tests to include mixed case and string values * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add Arrow processor to validate reading range values * Add test scope to Arrow dependencies * Fix unit test * Fix maven dependencies check failure for arrow * Add arrow-memory-netty to fix sample exception issue with no DefaultAllocationManager * Temp remove arrow test dependencies to test sample failure * Further testing * Test by hard coding test arrow version * Revert changes for testing * Pass dep. check * Updated test for case sensitivity * Update integration test for case sensitivity for all cases * Update JsonToProtoMessageTest to include mixed case fields --------- Co-authored-by: Owl Bot --- google-cloud-bigquerystorage/pom.xml | 13 + .../v1/BQTableSchemaToProtoDescriptor.java | 105 +++- .../storage/v1/JsonToProtoMessage.java | 34 ++ .../BQTableSchemaToProtoDescriptorTest.java | 70 +++ .../storage/v1/JsonToProtoMessageTest.java | 122 +++++ .../storage/v1/it/ITBigQueryStorageTest.java | 462 +++++++++++++++++- .../storage/v1/it/SimpleRowReaderArrow.java | 191 ++++++++ ...owReader.java => SimpleRowReaderAvro.java} | 4 +- .../src/test/proto/jsonTest.proto | 24 + 9 files changed, 975 insertions(+), 50 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java rename google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/{SimpleRowReader.java => SimpleRowReaderAvro.java} (96%) diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 393abe7f20..3719a7494a 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -15,6 +15,7 @@ google-cloud-bigquerystorage + 15.0.2 @@ -197,6 +198,18 @@ 1.11.3 test + + org.apache.arrow + arrow-vector + ${arrow.version} + test + + + org.apache.arrow + arrow-memory-core + ${arrow.version} + test + io.grpc diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java index 15ed5afe6a..19febdcd73 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigquery.storage.v1; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -61,6 +62,7 @@ public class BQTableSchemaToProtoDescriptor { .put(TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64) .put(TableFieldSchema.Type.JSON, FieldDescriptorProto.Type.TYPE_STRING) .put(TableFieldSchema.Type.INTERVAL, FieldDescriptorProto.Type.TYPE_STRING) + .put(TableFieldSchema.Type.RANGE, FieldDescriptorProto.Type.TYPE_MESSAGE) .build(); /** @@ -89,7 +91,7 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl( TableSchema BQTableSchema, String scope, HashMap, Descriptor> dependencyMap) - throws Descriptors.DescriptorValidationException { + throws Descriptors.DescriptorValidationException, IllegalArgumentException { List dependenciesList = new ArrayList(); List fields = new ArrayList(); int index = 1; @@ -99,25 +101,72 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl( ? BQTableField.getName() : BigQuerySchemaUtil.generatePlaceholderFieldName(BQTableField.getName()); String currentScope = scope + "__" + scopeName; - if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) { - ImmutableList fieldList = - ImmutableList.copyOf(BQTableField.getFieldsList()); - if (dependencyMap.containsKey(fieldList)) { - Descriptor descriptor = dependencyMap.get(fieldList); - dependenciesList.add(descriptor.getFile()); - fields.add(convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName())); - } else { - Descriptor descriptor = - convertBQTableSchemaToProtoDescriptorImpl( - TableSchema.newBuilder().addAllFields(fieldList).build(), - currentScope, - dependencyMap); - dependenciesList.add(descriptor.getFile()); - dependencyMap.put(fieldList, descriptor); + switch (BQTableField.getType()) { + case STRUCT: + ImmutableList fieldList = + ImmutableList.copyOf(BQTableField.getFieldsList()); + if (dependencyMap.containsKey(fieldList)) { + Descriptor descriptor = dependencyMap.get(fieldList); + dependenciesList.add(descriptor.getFile()); + fields.add( + convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName())); + } else { + Descriptor descriptor = + convertBQTableSchemaToProtoDescriptorImpl( + TableSchema.newBuilder().addAllFields(fieldList).build(), + currentScope, + dependencyMap); + dependenciesList.add(descriptor.getFile()); + dependencyMap.put(fieldList, descriptor); + fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope)); + } + break; + case RANGE: + switch (BQTableField.getRangeElementType().getType()) { + case DATE: + case DATETIME: + case TIMESTAMP: + break; + default: + throw new IllegalArgumentException( + String.format( + "Error: %s of type RANGE requires range element type (DATE, DATETIME, TIMESTAMP)", + currentScope)); + } + // For RANGE type, expliclitly add the fields start and end of the same FieldElementType + // as it is not expliclity defined in the TableSchema. + ImmutableList rangeFields = + ImmutableList.of( + TableFieldSchema.newBuilder() + .setType(BQTableField.getRangeElementType().getType()) + .setName("start") + .setMode(Mode.NULLABLE) + .build(), + TableFieldSchema.newBuilder() + .setType(BQTableField.getRangeElementType().getType()) + .setName("end") + .setMode(Mode.NULLABLE) + .build()); + + if (dependencyMap.containsKey(rangeFields)) { + Descriptor descriptor = dependencyMap.get(rangeFields); + dependenciesList.add(descriptor.getFile()); + fields.add( + convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName())); + } else { + Descriptor descriptor = + convertBQTableSchemaToProtoDescriptorImpl( + TableSchema.newBuilder().addAllFields(rangeFields).build(), + currentScope, + dependencyMap); + dependenciesList.add(descriptor.getFile()); + dependencyMap.put(rangeFields, descriptor); + fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope)); + } + break; + default: fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope)); - } - } else { - fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope)); + break; } } FileDescriptor[] dependenciesArray = new FileDescriptor[dependenciesList.size()]; @@ -150,11 +199,19 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField( .setNumber(index) .setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode)); - if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) { - fieldDescriptor.setTypeName(scope); - } else { - fieldDescriptor.setType( - (FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType())); + switch (BQTableField.getType()) { + case STRUCT: + fieldDescriptor.setTypeName(scope); + break; + case RANGE: + fieldDescriptor.setType( + (FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType())); + fieldDescriptor.setTypeName(scope); + break; + default: + fieldDescriptor.setType( + (FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType())); + break; } // Sets columnName annotation when field name is not proto comptaible. diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index c145b4ed25..7aefe30626 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -433,6 +433,40 @@ private FieldDescriptorAndFieldTableSchema computeDescriptorAndSchema( if (tableFieldSchemaList != null) { // protoSchema is generated from tableSchema so their field ordering should match. fieldSchema = tableFieldSchemaList.get(field.getIndex()); + // For RANGE type, expliclitly add the fields start and end of the same FieldElementType as it + // is not expliclity defined in the TableFieldSchema. + if (fieldSchema.getType() == TableFieldSchema.Type.RANGE) { + switch (fieldSchema.getRangeElementType().getType()) { + case DATE: + case DATETIME: + case TIMESTAMP: + fieldSchema = + fieldSchema + .toBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("start") + .setType(fieldSchema.getRangeElementType().getType()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("end") + .setType(fieldSchema.getRangeElementType().getType()) + .build()) + .build(); + break; + default: + throw new ValidationException( + "Field at index " + + field.getIndex() + + " with name (" + + fieldSchema.getName() + + ") with type (RANGE) has an unsupported range element type (" + + fieldSchema.getRangeElementType() + + ")"); + } + } + if (!fieldSchema.getName().toLowerCase().equals(BigQuerySchemaUtil.getFieldName(field))) { throw new ValidationException( "Field at index " diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java index 111e435533..ba845c1c12 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java @@ -105,6 +105,76 @@ public void testSimpleTypes() throws Exception { } } + @Test + public void testRange() throws Exception { + final TableSchema tableSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("range_date") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATE) + .build()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_datetime") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATETIME) + .build()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_timestamp") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.TIMESTAMP) + .build()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_date_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATE) + .build()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_datetime_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATETIME) + .build()) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_timestamp_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setMode(TableFieldSchema.Mode.NULLABLE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.TIMESTAMP) + .build()) + .build()) + .build(); + final Descriptor descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema); + isDescriptorEqual(descriptor, TestRange.getDescriptor()); + } + @Test public void testStructSimple() throws Exception { final TableFieldSchema stringType = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java index a347e65e79..b8094b7c12 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java @@ -1019,6 +1019,128 @@ public void testRequired() throws Exception { } } + @Test + public void testRange() throws Exception { + TableSchema tableSchema = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("range_date") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATE) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_datetime") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATETIME) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("range_timestamp") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.TIMESTAMP) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("raNGe_daTE_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATE) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("ranGE_daTEtiME_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATETIME) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("raNGe_tiMEstAMp_miXEd_caSE") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.TIMESTAMP) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + + TestRange expectedProto = + TestRange.newBuilder() + .setRangeDate(TestRangeDate.newBuilder().setStart(18262).setEnd(18627)) + .setRangeDatetime( + TestRangeDatetime.newBuilder().setStart(1715360343).setEnd(1715446743)) + .setRangeTimestamp( + TestRangeTimestamp.newBuilder().setStart(1715360343).setEnd(1715446743)) + .setRangeDateMixedCase(TestRangeDate.newBuilder().setStart(1).setEnd(2)) + .setRangeDatetimeMixedCase( + TestRangeDatetime.newBuilder() + .setStart(142258614586538368L) + .setEnd(142258525253402624L)) + .setRangeTimestampMixedCase( + TestRangeTimestamp.newBuilder().setStart(10L).setEnd(1649174771000000L)) + .build(); + + JSONArray data = new JSONArray(); + JSONObject row = new JSONObject(); + + JSONObject rangeDate = new JSONObject(); + rangeDate.put("start", 18262); + rangeDate.put("end", 18627); + row.put("range_date", rangeDate); + + JSONObject rangeDatetime = new JSONObject(); + rangeDatetime.put("start", 1715360343); + rangeDatetime.put("end", 1715446743); + row.put("range_datetime", rangeDatetime); + + JSONObject rangeTimestamp = new JSONObject(); + rangeTimestamp.put("start", 1715360343); + rangeTimestamp.put("end", 1715446743); + row.put("range_timestamp", rangeTimestamp); + + JSONObject rangeDateMixedCase = new JSONObject(); + rangeDateMixedCase.put("START", "1970-01-02"); + rangeDateMixedCase.put("eND", "1970-01-03"); + row.put("range_date_mixed_case", rangeDateMixedCase); + + JSONObject rangeDatetimeMixedCase = new JSONObject(); + rangeDatetimeMixedCase.put("STaRT", "2021-09-27T20:51:10.752"); + rangeDatetimeMixedCase.put("END", "2021-09-27T00:00:00"); + row.put("range_datetime_mixed_case", rangeDatetimeMixedCase); + + JSONObject rangeTimestampMixedCase = new JSONObject(); + rangeTimestampMixedCase.put("START", "1970-01-01 00:00:00.000010"); + rangeTimestampMixedCase.put("eND", "2022-04-05 09:06:11 PST"); + row.put("range_timestamp_mixed_case", rangeTimestampMixedCase); + + data.put(row); + List protoMsg = + JsonToProtoMessage.INSTANCE.convertToProtoMessage( + TestRange.getDescriptor(), tableSchema, data, false); + assertEquals(expectedProto, protoMsg.get(0)); + } + @Test public void testStructSimple() throws Exception { structSimple("test", "test"); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java index b1397bbd72..8078255904 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java @@ -25,8 +25,12 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnauthenticatedException; import com.google.auth.oauth2.GoogleCredentials; @@ -37,28 +41,40 @@ import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; +import com.google.cloud.bigquery.FieldElementType; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobInfo.WriteDisposition; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Range; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; import com.google.cloud.bigquery.TimePartitioning; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.DataFormat; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1.ReadSession; import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; import com.google.cloud.bigquery.storage.v1.ReadStream; -import com.google.cloud.bigquery.storage.v1.it.SimpleRowReader.AvroRowConsumer; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; +import com.google.cloud.bigquery.storage.v1.it.SimpleRowReaderArrow.ArrowRangeBatchConsumer; +import com.google.cloud.bigquery.storage.v1.it.SimpleRowReaderAvro.AvroRowConsumer; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Timestamp; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -77,6 +93,8 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.util.Utf8; +import org.json.JSONArray; +import org.json.JSONObject; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -95,6 +113,7 @@ public class ITBigQueryStorageTest { private static final String DESCRIPTION = "BigQuery Storage Java client test dataset"; private static BigQueryReadClient client; + private static String projectName; private static String parentProjectId; private static BigQuery bigquery; @@ -158,10 +177,269 @@ public class ITBigQueryStorageTest { + " \"universe_domain\": \"fake.domain\"\n" + "}"; + private static final com.google.cloud.bigquery.Schema RANGE_SCHEMA = + com.google.cloud.bigquery.Schema.of( + Field.newBuilder("name", StandardSQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .setDescription("Name of the row") + .build(), + Field.newBuilder("date", StandardSQLTypeName.RANGE) + .setMode(Field.Mode.NULLABLE) + .setDescription("Range field with DATE") + .setRangeElementType(FieldElementType.newBuilder().setType("DATE").build()) + .build(), + Field.newBuilder("datetime", StandardSQLTypeName.RANGE) + .setMode(Field.Mode.NULLABLE) + .setDescription("Range field with DATETIME") + .setRangeElementType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build(), + Field.newBuilder("timestamp", StandardSQLTypeName.RANGE) + .setMode(Field.Mode.NULLABLE) + .setDescription("Range field with TIMESTAMP") + .setRangeElementType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()); + + // storage.v1.TableSchema of RANGE_SCHEMA + private static final TableSchema RANGE_TABLE_SCHEMA = + TableSchema.newBuilder() + .addFields( + TableFieldSchema.newBuilder() + .setName("name") + .setType(TableFieldSchema.Type.STRING) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("date") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATE) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("datetime") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.DATETIME) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .addFields( + TableFieldSchema.newBuilder() + .setName("timestamp") + .setType(TableFieldSchema.Type.RANGE) + .setRangeElementType( + TableFieldSchema.FieldElementType.newBuilder() + .setType(TableFieldSchema.Type.TIMESTAMP) + .build()) + .setMode(TableFieldSchema.Mode.NULLABLE) + .build()) + .build(); + + private static final ImmutableMap RANGE_TEST_VALUES_DATES = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("2020-01-01") + .setEnd("2020-12-31") + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("2020-12-31") + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("2020-01-01") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .build(); + + // dates are returned as days since epoch + private static final ImmutableMap RANGE_TEST_VALUES_EXPECTED_DATES = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("18262") + .setEnd("18627") + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("18627") + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("18262") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATE").build()) + .build()) + .build(); + + private static final ImmutableMap RANGE_TEST_VALUES_DATETIME = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("2014-08-19T05:41:35.220000") + .setEnd("2015-09-20T06:41:35.220000") + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("2015-09-20T06:41:35.220000") + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("2014-08-19T05:41:35.220000") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .build(); + + // datetime are returned as up to millisecond precision instead of microsecond input value + private static final ImmutableMap RANGE_TEST_VALUES_EXPECTED_DATETIME = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("2014-08-19T05:41:35.220") + .setEnd("2015-09-20T06:41:35.220") + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("2015-09-20T06:41:35.220") + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("2014-08-19T05:41:35.220") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("DATETIME").build()) + .build()) + .build(); + + private static final ImmutableMap RANGE_TEST_VALUES_TIMESTAMP = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("2014-08-19 12:41:35.220000+00:00") + .setEnd("2015-09-20 13:41:35.220000+01:00") + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("2015-09-20 13:41:35.220000+01:00") + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("2014-08-19 12:41:35.220000+00:00") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .build(); + + // timestamps are returned as seconds since epoch + private static final ImmutableMap RANGE_TEST_VALUES_EXPECTED_TIMESTAMP = + new ImmutableMap.Builder() + .put( + "bounded", + Range.newBuilder() + .setStart("1408452095220000") + .setEnd("1442752895220000") + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unboundedStart", + Range.newBuilder() + .setStart(null) + .setEnd("1442752895220000") + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unboundedEnd", + Range.newBuilder() + .setStart("1408452095220000") + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .put( + "unbounded", + Range.newBuilder() + .setStart(null) + .setEnd(null) + .setType(FieldElementType.newBuilder().setType("TIMESTAMP").build()) + .build()) + .build(); + @BeforeClass public static void beforeClass() throws IOException { client = BigQueryReadClient.create(); - parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId()); + projectName = ServiceOptions.getDefaultProjectId(); + parentProjectId = String.format("projects/%s", projectName); LOG.info( String.format( @@ -271,9 +549,9 @@ public void testSimpleReadArrow() { } @Test - public void testRangeType() throws InterruptedException { + public void testRangeTypeSimple() throws InterruptedException { // Create table with Range values. - String tableName = "test_range_type"; + String tableName = "test_range_type_read"; TableId tableId = TableId.of(DATASET, tableName); QueryJobConfiguration createTable = QueryJobConfiguration.newBuilder( @@ -329,6 +607,130 @@ public void testRangeType() throws InterruptedException { assertEquals(1, rowCount); } + @Test + public void testRangeTypeWrite() + throws InterruptedException, IOException, DescriptorValidationException { + // Create table with Range fields. + String tableName = "test_range_type_write"; + TableId tableId = TableId.of(DATASET, tableName); + bigquery.create(TableInfo.of(tableId, StandardTableDefinition.of(RANGE_SCHEMA))); + + TableName parentTable = TableName.of(projectName, DATASET, tableName); + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(parentTable.toString(), RANGE_TABLE_SCHEMA) + .setRetrySettings(retrySettings) + .build()) { + + // Write 4 rows of data to the table with and without unbounded values. + JSONArray data = new JSONArray(); + for (String name : RANGE_TEST_VALUES_DATES.keySet()) { + JSONObject row = new JSONObject(); + row.put("name", name); + + JSONObject dateColumn = new JSONObject(); + Range date = RANGE_TEST_VALUES_DATES.get(name); + if ((!date.getStart().isNull()) && (date.getStart().getStringValue() != null)) { + dateColumn.put("start", date.getStart().getStringValue()); + } + if ((!date.getEnd().isNull()) && (date.getEnd().getStringValue() != null)) { + dateColumn.put("end", date.getEnd().getStringValue()); + } + row.put("daTE", dateColumn); + + JSONObject datetimeColumn = new JSONObject(); + Range datetime = RANGE_TEST_VALUES_DATETIME.get(name); + if ((!datetime.getStart().isNull()) && (datetime.getStart().getStringValue() != null)) { + datetimeColumn.put("start", datetime.getStart().getStringValue()); + } + if ((!datetime.getEnd().isNull()) && (datetime.getEnd().getStringValue() != null)) { + datetimeColumn.put("end", datetime.getEnd().getStringValue()); + } + row.put("daTEtiME", datetimeColumn); + + JSONObject timestampColumn = new JSONObject(); + Range timestamp = RANGE_TEST_VALUES_TIMESTAMP.get(name); + if ((!timestamp.getStart().isNull()) && (timestamp.getStart().getStringValue() != null)) { + timestampColumn.put("start", timestamp.getStart().getStringValue()); + } + if ((!timestamp.getEnd().isNull()) && (timestamp.getEnd().getStringValue() != null)) { + timestampColumn.put("end", timestamp.getEnd().getStringValue()); + } + row.put("tiMEstAMp", timestampColumn); + + data.put(row); + } + + ApiFuture future = writer.append(data); + // The append method is asynchronous. Rather than waiting for the method to complete, + // which can hurt performance, register a completion callback and continue streaming. + ApiFutures.addCallback(future, new AppendCompleteCallback(), MoreExecutors.directExecutor()); + } + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ projectName, + /* datasetId = */ DATASET, + /* tableId = */ tableId.getTable()); + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.ARROW) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Set up a simple reader and start a read session. + try (SimpleRowReaderArrow reader = new SimpleRowReaderArrow(session.getArrowSchema())) { + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + reader.processRows( + response.getArrowRecordBatch(), + new ArrowRangeBatchConsumer( + RANGE_TEST_VALUES_EXPECTED_DATES, + RANGE_TEST_VALUES_EXPECTED_DATETIME, + RANGE_TEST_VALUES_EXPECTED_TIMESTAMP)); + rowCount += response.getRowCount(); + } + assertEquals(RANGE_TEST_VALUES_DATES.size(), rowCount); + } + } + @Test public void testSimpleReadAndResume() { String table = @@ -407,8 +809,8 @@ public void testFilter() throws IOException { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); - SimpleRowReader reader = - new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + SimpleRowReaderAvro reader = + new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema())); long rowCount = 0; @@ -485,7 +887,7 @@ public void testColumnSelection() throws IOException { Schema.Type.LONG, avroSchema.getField("word_count").schema().getType()); - SimpleRowReader reader = new SimpleRowReader(avroSchema); + SimpleRowReaderAvro reader = new SimpleRowReaderAvro(avroSchema); long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); @@ -536,7 +938,7 @@ public void testReadAtSnapshot() throws InterruptedException, IOException { String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ testTableId.getTable()); @@ -590,7 +992,7 @@ public void testColumnPartitionedTableByDateField() throws InterruptedException, String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ partitionedTableName); @@ -639,7 +1041,7 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* projectId = */ projectName, /* datasetId = */ testTableId.getDataset(), /* tableId = */ testTableId.getTable()); @@ -682,9 +1084,7 @@ public void testBasicSqlTypes() throws InterruptedException, IOException { String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), - /* datasetId = */ DATASET, - /* tableId = */ tableName); + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ tableName); List rows = ReadAllRows(/* table = */ table, /* filter = */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); @@ -781,9 +1181,7 @@ public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), - /* datasetId = */ DATASET, - /* tableId = */ tableName); + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ tableName); List rows = ReadAllRows(/* table = */ table, /* filter = */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); @@ -881,9 +1279,7 @@ public void testGeographySqlType() throws InterruptedException, IOException { String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), - /* datasetId = */ DATASET, - /* tableId = */ tableName); + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ tableName); List rows = ReadAllRows(/* table = */ table, /* filter = */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); @@ -932,9 +1328,7 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio String table = BigQueryResource.FormatTableResource( - /* projectId = */ ServiceOptions.getDefaultProjectId(), - /* datasetId = */ DATASET, - /* tableId = */ tableName); + /* projectId = */ projectName, /* datasetId = */ DATASET, /* tableId = */ tableName); List rows = ReadAllRows(/* table = */ table, /* filter = */ null); assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); @@ -1254,8 +1648,8 @@ private void ProcessRowsAtSnapshot( ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); - SimpleRowReader reader = - new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); + SimpleRowReaderAvro reader = + new SimpleRowReaderAvro(new Schema.Parser().parse(session.getAvroSchema().getSchema())); ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { @@ -1339,4 +1733,24 @@ static GoogleCredentials loadCredentials(String credentialFile) { } return null; } + + static class AppendCompleteCallback implements ApiFutureCallback { + private static final Object lock = new Object(); + private static int batchCount = 0; + + public void onSuccess(AppendRowsResponse response) { + synchronized (lock) { + if (response.hasError()) { + System.out.format("Error: %s\n", response.getError()); + } else { + ++batchCount; + System.out.format("Wrote batch %d\n", batchCount); + } + } + } + + public void onFailure(Throwable throwable) { + System.out.format("Error: %s\n", throwable.toString()); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java new file mode 100644 index 0000000000..685f72fbc9 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderArrow.java @@ -0,0 +1,191 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigquery.FieldElementType; +import com.google.cloud.bigquery.Range; +import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; +import com.google.cloud.bigquery.storage.v1.ArrowSchema; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +public class SimpleRowReaderArrow implements AutoCloseable { + + public interface ArrowBatchConsumer { + + /** Handler for every new Arrow batch. */ + void accept(VectorSchemaRoot root); + } + + /** ArrowRangeBatchConsumer accepts batch Arrow data and validate the range values. */ + public static class ArrowRangeBatchConsumer implements ArrowBatchConsumer { + + private final ImmutableMap expectedRangeDateValues; + private final ImmutableMap expectedRangeDatetimeValues; + private final ImmutableMap expectedRangeTimestampValues; + + public ArrowRangeBatchConsumer( + ImmutableMap expectedRangeDateValues, + ImmutableMap expectedRangeDatetimeValues, + ImmutableMap expectedRangeTimestampValues) { + this.expectedRangeDateValues = expectedRangeDateValues; + this.expectedRangeDatetimeValues = expectedRangeDatetimeValues; + this.expectedRangeTimestampValues = expectedRangeTimestampValues; + } + + @Override + public void accept(VectorSchemaRoot root) { + StructVector dateVector = (StructVector) root.getVector("date"); + for (int i = 0; i < dateVector.valueCount; i++) { + Field field = root.getSchema().findField(dateVector.getName()); + assertThat(field.getType().getTypeID()).isEqualTo(ArrowTypeID.Struct); + + Map value = dateVector.getObject(i); + Range.Builder rangeBuilder = Range.newBuilder(); + if (value.get("start") != null) { + rangeBuilder.setStart(((Integer) value.get("start")).toString()); + } + if (value.get("end") != null) { + rangeBuilder.setEnd(((Integer) value.get("end")).toString()); + } + rangeBuilder.setType(toFieldElementType(field.getChildren().get(0))); + assertThat(rangeBuilder.build()).isIn(this.expectedRangeDateValues.values()); + } + + StructVector datetimeVector = (StructVector) root.getVector("datetime"); + for (int i = 0; i < datetimeVector.valueCount; i++) { + Field field = root.getSchema().findField(datetimeVector.getName()); + assertThat(field.getType().getTypeID()).isEqualTo(ArrowTypeID.Struct); + + Map value = datetimeVector.getObject(i); + Range.Builder rangeBuilder = Range.newBuilder(); + if (value.get("start") != null) { + rangeBuilder.setStart(((LocalDateTime) value.get("start")).toString()); + } + if (value.get("end") != null) { + rangeBuilder.setEnd(((LocalDateTime) value.get("end")).toString()); + } + rangeBuilder.setType(toFieldElementType(field.getChildren().get(0))); + assertThat(rangeBuilder.build()).isIn(this.expectedRangeDatetimeValues.values()); + } + + StructVector timestampVector = (StructVector) root.getVector("timestamp"); + for (int i = 0; i < timestampVector.valueCount; i++) { + Field field = root.getSchema().findField(timestampVector.getName()); + assertThat(field.getType().getTypeID()).isEqualTo(ArrowTypeID.Struct); + + Map value = timestampVector.getObject(i); + Range.Builder rangeBuilder = Range.newBuilder(); + if (value.get("start") != null) { + rangeBuilder.setStart(((Long) value.get("start")).toString()); + } + if (value.get("end") != null) { + rangeBuilder.setEnd(((Long) value.get("end")).toString()); + } + rangeBuilder.setType(toFieldElementType(field.getChildren().get(0))); + assertThat(rangeBuilder.build()).isIn(this.expectedRangeTimestampValues.values()); + } + } + + private static FieldElementType toFieldElementType(Field field) { + switch (field.getType().getTypeID()) { + case Date: + return FieldElementType.newBuilder().setType("DATE").build(); + case Timestamp: + String timezone = ((ArrowType.Timestamp) field.getType()).getTimezone(); + if (timezone == null) { + // Datetime fields do not have timezone value. + return FieldElementType.newBuilder().setType("DATETIME").build(); + } else { + return FieldElementType.newBuilder().setType("TIMESTAMP").build(); + } + default: + return null; + } + } + } + + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + + // Decoder object will be reused to avoid re-allocation and too much garbage collection. + private final VectorSchemaRoot root; + private final VectorLoader loader; + + public SimpleRowReaderArrow(ArrowSchema arrowSchema) throws IOException { + org.apache.arrow.vector.types.pojo.Schema schema = + MessageSerializer.deserializeSchema( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + arrowSchema.getSerializedSchema().toByteArray()))); + Preconditions.checkNotNull(schema); + List vectors = new ArrayList<>(); + for (org.apache.arrow.vector.types.pojo.Field field : schema.getFields()) { + vectors.add(field.createVector(allocator)); + } + root = new VectorSchemaRoot(vectors); + loader = new VectorLoader(root); + } + + /** + * Method for processing Arrow data which validates Range values. + * + * @param batch object returned from the ReadRowsResponse. + * @param batchConsumer consumer of the batch Arrow data. + */ + public void processRows(ArrowRecordBatch batch, ArrowBatchConsumer batchConsumer) + throws IOException { + org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = + MessageSerializer.deserializeRecordBatch( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel( + batch.getSerializedRecordBatch().toByteArray())), + allocator); + + loader.load(deserializedBatch); + // Release buffers from batch (they are still held in the vectors in root). + deserializedBatch.close(); + batchConsumer.accept(root); + + // Release buffers from vectors in root. + root.clear(); + } + + @Override + public void close() { + root.close(); + allocator.close(); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java similarity index 96% rename from google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java rename to google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java index 8b72461b15..a23179c8c8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReader.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/SimpleRowReaderAvro.java @@ -30,7 +30,7 @@ * SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted * from the storage API using a generic datum decoder. */ -public class SimpleRowReader { +public class SimpleRowReaderAvro { public interface AvroRowConsumer { @@ -51,7 +51,7 @@ public interface AvroRowConsumer { // Record object will be reused. private GenericData.Record row = null; - public SimpleRowReader(Schema schema) { + public SimpleRowReaderAvro(Schema schema) { Preconditions.checkNotNull(schema); datumReader = new GenericDatumReader<>(schema); } diff --git a/google-cloud-bigquerystorage/src/test/proto/jsonTest.proto b/google-cloud-bigquerystorage/src/test/proto/jsonTest.proto index d70d214be2..fc6b6ce48e 100644 --- a/google-cloud-bigquerystorage/src/test/proto/jsonTest.proto +++ b/google-cloud-bigquerystorage/src/test/proto/jsonTest.proto @@ -211,3 +211,27 @@ message TestBignumeric { message TestMixedCaseFieldNames { required string foobar = 1; } + +message TestRange { + optional TestRangeDate range_date = 1; + optional TestRangeDatetime range_datetime = 2; + optional TestRangeTimestamp range_timestamp = 3; + optional TestRangeDate range_date_mixed_case = 4; + optional TestRangeDatetime range_datetime_mixed_case = 5; + optional TestRangeTimestamp range_timestamp_mixed_case = 6; +} + +message TestRangeDate { + optional int32 start = 1; + optional int32 end = 2; +} + +message TestRangeDatetime { + optional int64 start = 1; + optional int64 end = 2; +} + +message TestRangeTimestamp { + optional int64 start = 1; + optional int64 end = 2; +} \ No newline at end of file