Skip to content

Commit

Permalink
feat: Add fully managed schema support on json writer (#1794)
Browse files Browse the repository at this point in the history
b/247249766
  • Loading branch information
Neenu1995 authored Sep 22, 2022
1 parent 745ceb4 commit b6b515f
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ public static Builder newBuilder(
return new Builder(streamOrTableName, tableSchema, client);
}

/**
* newBuilder that constructs a JsonStreamWriter builder with TableSchema being initialized by
* StreamWriter by default.
*
* @param streamOrTableName name of the stream that must follow
* "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
* @param client BigQueryWriteClient
* @return Builder
*/
public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) {
Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
Preconditions.checkNotNull(client, "BigQuery client is null.");
return new Builder(streamOrTableName, null, client);
}

/** Closes the underlying StreamWriter. */
@Override
public void close() {
Expand Down Expand Up @@ -330,8 +345,20 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
} else {
this.streamName = streamOrTableName;
}
this.tableSchema = tableSchema;
this.client = client;
if (tableSchema == null) {
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.getStreamName())
.setView(WriteStreamView.FULL)
.build();

WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
this.tableSchema = writeStreamTableSchema;
} else {
this.tableSchema = tableSchema;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,161 @@ public void testJsonStreamWriterCommittedStream()
}
}

@Test
public void testJsonStreamWriterWithDefaultSchema()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "JsonTableDefaultSchema";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableFieldSchema TEST_NUMERIC =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_numerics")
.build();
TableFieldSchema TEST_DATE =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.DATETIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_datetime")
.build();
TableFieldSchema TEST_REPEATED_BYTESTRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.BYTES)
.setMode(TableFieldSchema.Mode.REPEATED)
.setName("test_bytestring_repeated")
.build();
TableFieldSchema TEST_TIMESTAMP =
TableFieldSchema.newBuilder()
.setName("test_timeStamp")
.setType(TableFieldSchema.Type.TIMESTAMP)
.setMode(TableFieldSchema.Mode.NULLABLE)
.build();
TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(0, TEST_STRING)
.addFields(1, TEST_DATE)
.addFields(2, TEST_NUMERIC)
.addFields(3, TEST_REPEATED_BYTESTRING)
.addFields(4, TEST_TIMESTAMP)
.build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_str", StandardSQLTypeName.STRING)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_numerics", StandardSQLTypeName.NUMERIC)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_datetime", StandardSQLTypeName.DATETIME)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_bytestring_repeated", StandardSQLTypeName.BYTES)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_timestamp", StandardSQLTypeName.TIMESTAMP)
.build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), client)
.setIgnoreUnknownFields(true)
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test_str", "aaa");
row1.put(
"test_numerics",
new JSONArray(
new byte[][] {
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("123.4"))
.toByteArray(),
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
.toByteArray()
}));
row1.put("unknown_field", "a");
row1.put(
"test_datetime",
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
row1.put(
"test_bytestring_repeated",
new JSONArray(
new byte[][] {
ByteString.copyFromUtf8("a").toByteArray(),
ByteString.copyFromUtf8("b").toByteArray()
}));
row1.put("test_timestamp", "2022-02-06 07:24:47.84");
JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1});

ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);

assertEquals(0, response1.get().getAppendResult().getOffset().getValue());

JSONObject row2 = new JSONObject();
row1.put("test_str", "bbb");
JSONObject row3 = new JSONObject();
row2.put("test_str", "ccc");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(row1);
jsonArr2.put(row2);

JSONObject row4 = new JSONObject();
row4.put("test_str", "ddd");
JSONArray jsonArr3 = new JSONArray();
jsonArr3.put(row4);

JSONObject row5 = new JSONObject();
// Add another ARRAY<BYTES> using a more idiomatic way
JSONArray testArr = new JSONArray(); // create empty JSONArray
testArr.put(0, ByteString.copyFromUtf8("a").toByteArray()); // insert 1st bytes array
testArr.put(1, ByteString.copyFromUtf8("b").toByteArray()); // insert 2nd bytes array
row5.put("test_bytestring_repeated", testArr);
JSONArray jsonArr4 = new JSONArray();
jsonArr4.put(row5);

LOG.info("Sending three more messages");
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, -1);
LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response3 = jsonStreamWriter.append(jsonArr3, -1);
LOG.info("Sending one more message");
ApiFuture<AppendRowsResponse> response4 = jsonStreamWriter.append(jsonArr4, -1);
Assert.assertFalse(response2.get().getAppendResult().hasOffset());
Assert.assertFalse(response3.get().getAppendResult().hasOffset());
Assert.assertFalse(response4.get().getAppendResult().hasOffset());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue());
assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue());
assertEquals(2, currentRow.get(3).getRepeatedValue().size());
assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
FieldValueList currentRow2 = iter.next();
assertEquals("YQ==", currentRow2.get(3).getRepeatedValue().get(0).getStringValue());
assertEquals("Yg==", currentRow2.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals(false, iter.hasNext());
}
}

@Test
public void testJsonStreamWriterWithDefaultStream()
throws IOException, InterruptedException, ExecutionException,
Expand Down

0 comments on commit b6b515f

Please sign in to comment.