Skip to content

Commit

Permalink
feat: Add support for flexible column name in JsonStreamWriter (#1786)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [X] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [X] Ensure the tests and linter pass
- [X] Code coverage does not decrease (if any source code was changed)
- [X] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
tracyz-g authored Sep 28, 2022
1 parent c24c14f commit 694abbb
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldOptions;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -92,7 +93,11 @@ private static Descriptor convertBQTableSchemaToProtoDescriptorImpl(
List<FieldDescriptorProto> fields = new ArrayList<FieldDescriptorProto>();
int index = 1;
for (TableFieldSchema BQTableField : BQTableSchema.getFieldsList()) {
String currentScope = scope + "__" + BQTableField.getName();
String scopeName =
BigQuerySchemaUtil.isProtoCompatible(BQTableField.getName())
? BQTableField.getName()
: BigQuerySchemaUtil.generatePlaceholderFieldName(BQTableField.getName());
String currentScope = scope + "__" + scopeName;
if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
ImmutableList<TableFieldSchema> fieldList =
ImmutableList.copyOf(BQTableField.getFieldsList());
Expand Down Expand Up @@ -137,19 +142,26 @@ private static FieldDescriptorProto convertBQTableFieldToProtoField(
TableFieldSchema BQTableField, int index, String scope) {
TableFieldSchema.Mode mode = BQTableField.getMode();
String fieldName = BQTableField.getName().toLowerCase();

FieldDescriptorProto.Builder fieldDescriptor =
FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setNumber(index)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode));

if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setTypeName(scope)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
fieldDescriptor.setTypeName(scope);
} else {
fieldDescriptor.setType(
(FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()));
}

// Sets columnName annotation when field name is not proto comptaible.
if (!BigQuerySchemaUtil.isProtoCompatible(fieldName)) {
fieldDescriptor.setName(BigQuerySchemaUtil.generatePlaceholderFieldName(fieldName));
fieldDescriptor.setOptions(
FieldOptions.newBuilder().setExtension(AnnotationsProto.columnName, fieldName).build());
}
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setType((FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()))
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
return fieldDescriptor.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.protobuf.Descriptors.FieldDescriptor;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.regex.Pattern;

public class BigQuerySchemaUtil {

private static final String PROTO_COMPATIBLE_NAME_REGEXP = "[A-Za-z_][A-Za-z0-9_]*";
private static final String PLACEHOLDER_FILED_NAME_PREFIX = "col_";
private static final Pattern PROTO_COMPATIBLE_NAME_PATTERN =
Pattern.compile(PROTO_COMPATIBLE_NAME_REGEXP);

/**
* * Checks if the field name is compatible with proto field naming convention.
*
* @param fieldName name for the field
* @return true if the field name is comptaible with proto naming convention, otherwise, returns
* false.
*/
public static boolean isProtoCompatible(String fieldName) {
return PROTO_COMPATIBLE_NAME_PATTERN.matcher(fieldName).matches();
}

/**
* * Generates a placeholder name that consists of a prefix + base64 encoded field name. We
* replace all dashes with underscores as they are not allowed for proto field names.
*
* @param fieldName name for the field
* @return the generated placeholder field name
*/
public static String generatePlaceholderFieldName(String fieldName) {
return PLACEHOLDER_FILED_NAME_PREFIX
+ Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(fieldName.getBytes(StandardCharsets.UTF_8))
.replace('-', '_');
}

/**
* * Gets the user-facing field name from the descriptor
*
* @param fieldDescriptor
* @return columnName annotation if present, otherwise return the field name.
*/
public static String getFieldName(FieldDescriptor fieldDescriptor) {
return fieldDescriptor.getOptions().hasExtension(AnnotationsProto.columnName)
? fieldDescriptor.getOptions().getExtension(AnnotationsProto.columnName)
: fieldDescriptor.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String jsonName = jsonNames[i];
// We want lowercase here to support case-insensitive data writes.
// The protobuf descriptor that is used is assumed to have all lowercased fields
String jsonLowercaseName = jsonName.toLowerCase();
String jsonFieldLocator = jsonName.toLowerCase();

// If jsonName is not compatible with proto naming convention, we should look by its
// placeholder name.
if (!BigQuerySchemaUtil.isProtoCompatible(jsonFieldLocator)) {
jsonFieldLocator = BigQuerySchemaUtil.generatePlaceholderFieldName(jsonFieldLocator);
}
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
FieldDescriptor field = protoSchema.findFieldByName(jsonFieldLocator);
if (field == null && !ignoreUnknownFields) {
throw new Exceptions.JsonDataHasUnknownFieldException(currentScope);
} else if (field == null) {
Expand All @@ -209,7 +215,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().toLowerCase().equals(field.getName())) {
if (!fieldSchema.getName().toLowerCase().equals(BigQuerySchemaUtil.getFieldName(field))) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSimpleTypes() throws Exception {

@Test
public void testStructSimple() throws Exception {
final TableFieldSchema StringType =
final TableFieldSchema stringType =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
Expand All @@ -118,7 +118,7 @@ public void testStructSimple() throws Exception {
.setType(TableFieldSchema.Type.STRUCT)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_field_type")
.addFields(0, StringType)
.addFields(0, stringType)
.build();
final TableSchema tableSchema = TableSchema.newBuilder().addFields(0, tableFieldSchema).build();
final Descriptor descriptor =
Expand Down Expand Up @@ -509,4 +509,32 @@ public void testDescriptorReuseDuringCreation() throws Exception {
assertEquals(descriptorToCount.get("root__reuse_lvl1__reuse_lvl2").intValue(), 3);
isDescriptorEqual(descriptor, ReuseRoot.getDescriptor());
}

@Test
public void testNestedFlexibleFieldName() throws Exception {
final TableFieldSchema stringField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("str-列")
.build();
final TableFieldSchema intField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.INT64)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("int-列")
.build();
final TableFieldSchema nestedField =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRUCT)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("nested-列")
.addFields(0, intField)
.build();
final TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, stringField).addFields(1, nestedField).build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
isDescriptorEqual(descriptor, TestNestedFlexibleFieldName.getDescriptor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.cloud.bigquery.storage.test.SchemaTest.TestNestedFlexibleFieldName;
import com.google.protobuf.Descriptors.Descriptor;
import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class BigQuerySchemaUtilTest extends TestCase {

@Test
public void testIsProtoCompatible() {
List<String> protoCompatibleNames = Arrays.asList("col_1", "name", "_0_");
List<String> protoIncompatibleNames = Arrays.asList("0_col", "()", "列", "a-1");
protoCompatibleNames.stream()
.forEach(
name -> {
assertTrue(BigQuerySchemaUtil.isProtoCompatible(name));
});
protoIncompatibleNames.stream()
.forEach(
name -> {
assertFalse(BigQuerySchemaUtil.isProtoCompatible(name));
});
}

public void testGeneratePlaceholderFieldName() {
assertEquals("col_c3RyLeWIlw", BigQuerySchemaUtil.generatePlaceholderFieldName("str-列"));
// Base64 url encodes "~/~/" to "fi9-Lw", we replaced - with _ to be proto compatible.
assertEquals("col_fi9_Lw", BigQuerySchemaUtil.generatePlaceholderFieldName("~/~/"));
}

public void testGetFieldName() {
// Test get name from annotations.
Descriptor flexibleDescriptor = TestNestedFlexibleFieldName.getDescriptor();
assertEquals("str-列", BigQuerySchemaUtil.getFieldName(flexibleDescriptor.getFields().get(0)));
assertEquals(
"nested-列", BigQuerySchemaUtil.getFieldName(flexibleDescriptor.getFields().get(1)));

// Test get name without annotations.
Descriptor descriptor = TestNestedFlexibleFieldName.getDescriptor();
assertEquals("int32_value", BigQuerySchemaUtil.getFieldName(descriptor.getFields().get(0)));
assertEquals("int64_value", BigQuerySchemaUtil.getFieldName(descriptor.getFields().get(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.SchemaTest;
import com.google.cloud.bigquery.storage.test.Test.FlexibleType;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
Expand Down Expand Up @@ -212,6 +213,54 @@ public void testSingleAppendSimpleJson() throws Exception {
}
}

@Test
public void testFlexibleColumnAppend() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test-列")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
JSONObject flexible = new JSONObject();
flexible.put("test-列", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

@Test
public void testSpecialTypeAppend() throws Exception {
TableFieldSchema field =
Expand Down
14 changes: 14 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/schemaTest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto2";

package com.google.cloud.bigquery.storage.test;

import "google/cloud/bigquery/storage/v1/annotations.proto";

message SupportedTypes {
optional int32 int32_value = 1;
optional int64 int64_value = 2;
Expand Down Expand Up @@ -257,3 +259,15 @@ message AllowUnknownUnsupportedFields {
message FakeFooType {
optional int32 foo = 1;
}

message TestNestedFlexibleFieldName {
optional string col_c3RyLeWIlw = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "str-列"];
optional FlexibleNameField col_bmVzdGVkLeWIlw = 2
[(.google.cloud.bigquery.storage.v1.column_name) = "nested-列"];
}

message FlexibleNameField {
optional int64 col_aW50LeWIlw = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "int-列"];
}
7 changes: 7 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ syntax = "proto2";

package com.google.cloud.bigquery.storage.test;

import "google/cloud/bigquery/storage/v1/annotations.proto";

enum TestEnum {
TestEnum0 = 0;
TestEnum1 = 1;
Expand Down Expand Up @@ -80,3 +82,8 @@ message DuplicateType {
optional ComplicateType f3 = 3;
optional ComplicateType f4 = 4;
}

message FlexibleType {
optional string col_dGVzdC3liJc = 1
[(.google.cloud.bigquery.storage.v1.column_name) = "test-列"];
}

0 comments on commit 694abbb

Please sign in to comment.