Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add fully managed schema support on json writer #1794

Merged
merged 4 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ public static Builder newBuilder(
return new Builder(streamOrTableName, tableSchema, client);
}

/**
* newBuilder that constructs a JsonStreamWriter builder with TableSchema being initialized by
* StreamWriter by default.
*
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param client BigQueryWriteClient
* @return Builder
*/
public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamOrTableName, null, client);
}

/** Closes the underlying StreamWriter. */
@Override
public void close() {
Expand Down Expand Up @@ -330,8 +345,20 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
} else {
this.streamName = streamOrTableName;
}
this.tableSchema = tableSchema;
this.client = client;
if (tableSchema == null) {
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.getStreamName())
.setView(WriteStreamView.FULL)
.build();

WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
this.tableSchema = writeStreamTableSchema;
} else {
this.tableSchema = tableSchema;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,161 @@ public void testJsonStreamWriterCommittedStream()
}
}

@Test
public void testJsonStreamWriterWithDefaultSchema()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTableDefaultSchema";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableFieldSchema TEST_NUMERIC =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_numerics")
.build();
TableFieldSchema TEST_DATE =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_datetime")
.build();
TableFieldSchema TEST_REPEATED_BYTESTRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_bytestring_repeated")
.build();
TableFieldSchema TEST_TIMESTAMP =
TableFieldSchema.newBuilder()
.setName("test_timeStamp")
.setType(TableFieldSchema.Type.TIMESTAMP)
.setMode(TableFieldSchema.Mode.NULLABLE)
.build();
TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(0, TEST_STRING)
.addFields(1, TEST_DATE)
.addFields(2, TEST_NUMERIC)
.addFields(3, TEST_REPEATED_BYTESTRING)
.addFields(4, TEST_TIMESTAMP)
.build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_str", StandardSQLTypeName.STRING)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_numerics", StandardSQLTypeName.NUMERIC)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_datetime", StandardSQLTypeName.DATETIME)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_bytestring_repeated", StandardSQLTypeName.BYTES)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_timestamp", StandardSQLTypeName.TIMESTAMP)
.build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), client)
.setIgnoreUnknownFields(true)
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test_str", "aaa");
row1.put(
"test_numerics",
new JSONArray(
new byte[][] {
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("123.4"))
.toByteArray(),
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
.toByteArray()
}));
row1.put("unknown_field", "a");
row1.put(
"test_datetime",
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
row1.put(
"test_bytestring_repeated",
new JSONArray(
new byte[][] {
ByteString.copyFromUtf8("a").toByteArray(),
ByteString.copyFromUtf8("b").toByteArray()
}));
row1.put("test_timestamp", "2022-02-06 07:24:47.84");
JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1});

ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);

assertEquals(0, response1.get().getAppendResult().getOffset().getValue());

JSONObject row2 = new JSONObject();
row1.put("test_str", "bbb");
JSONObject row3 = new JSONObject();
row2.put("test_str", "ccc");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(row1);
jsonArr2.put(row2);

JSONObject row4 = new JSONObject();
row4.put("test_str", "ddd");
JSONArray jsonArr3 = new JSONArray();
jsonArr3.put(row4);

JSONObject row5 = new JSONObject();
// Add another ARRAY<BYTES> using a more idiomatic way
JSONArray testArr = new JSONArray(); // create empty JSONArray
testArr.put(0, ByteString.copyFromUtf8("a").toByteArray()); // insert 1st bytes array
testArr.put(1, ByteString.copyFromUtf8("b").toByteArray()); // insert 2nd bytes array
row5.put("test_bytestring_repeated", testArr);
JSONArray jsonArr4 = new JSONArray();
jsonArr4.put(row5);

LOG.info("Sending three more messages");
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(jsonArr3, -1);
LOG.info("Sending one more message");
ApiFuture<AppendRowsResponse> response4 = jsonStreamWriter.append(jsonArr4, -1);
Assert.assertFalse(response2.get().getAppendResult().hasOffset());
Assert.assertFalse(response3.get().getAppendResult().hasOffset());
Assert.assertFalse(response4.get().getAppendResult().hasOffset());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue());
assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue());
assertEquals(2, currentRow.get(3).getRepeatedValue().size());
assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
FieldValueList currentRow2 = iter.next();
assertEquals("YQ==", currentRow2.get(3).getRepeatedValue().get(0).getStringValue());
assertEquals("Yg==", currentRow2.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals(false, iter.hasNext());
}
}

@Test
public void testJsonStreamWriterWithDefaultStream()
throws IOException, InterruptedException, ExecutionException,
Expand Down