Skip to content

Commit

Permalink
fix: Persist missingValueInterpretationMap in StreamWriter's Builder (#…
Browse files Browse the repository at this point in the history
…2587)

* Persist missingValueInterpretationMap in StreamWriter's Builder. In case the StreamWriter is recreated, the map will be used in the new StreamWriter too.

* Fix format

* Addressed comments: Revived the getMissingValueInterpretationMap method and added Integration Test.
  • Loading branch information
yifatgortler authored Sep 25, 2024
1 parent 95a9977 commit 036d2e6
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 74 deletions.
15 changes: 15 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,20 @@
<differenceType>1001</differenceType>
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/JsonStreamWriter</className>
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter</className>
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/StreamWriter</className>
<method>void setMissingValueInterpretationMap(java.util.Map)</method>
</difference>
</differences>

Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,6 @@ public long getInflightWaitSeconds() {
return this.schemaAwareStreamWriter.getInflightWaitSeconds();
}

/**
* Sets the missing value interpretation map for the JsonStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* JsonStreamWriter.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.schemaAwareStreamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
Expand Down Expand Up @@ -414,6 +402,21 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}

/**
* Sets the missing value interpretation map for the JsonStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* JsonStreamWriter.
* @return Builder
*/
public Builder setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.schemaAwareStreamWriterBuilder.setMissingValueInterpretationMap(
missingValueInterpretationMap);
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private SchemaAwareStreamWriter(Builder<T> builder)
streamWriterBuilder.setLocation(builder.location);
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
streamWriterBuilder.setMissingValueInterpretationMap(builder.missingValueInterpretationMap);
streamWriterBuilder.setClientId(builder.clientId);
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
Expand Down Expand Up @@ -298,18 +299,6 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/**
* Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* SchemaAwareStreamWriter.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
Expand Down Expand Up @@ -475,6 +464,8 @@ public static final class Builder<T> {

private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
private Map<String, AppendRowsRequest.MissingValueInterpretation>
missingValueInterpretationMap = new HashMap();
private String clientId;

private boolean enableRequestProfiler = false;
Expand Down Expand Up @@ -684,6 +675,20 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}

/**
* Sets the missing value interpretation map for the SchemaAwareStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* SchemaAwareStreamWriter.
* @return Builder
*/
public Builder setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
return this;
}

/**
* Sets the RetrySettings to use for in-stream error retry.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();

// Map of fields to their MissingValueInterpretation, which dictates how a field should be
// populated when it is missing from an input user row.
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
new HashMap();

/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -103,6 +98,11 @@ public class StreamWriter implements AutoCloseable {
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

// Map of fields to their MissingValueInterpretation, which dictates how a field should be
// populated when it is missing from an input user row.
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
new HashMap();

/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
Expand Down Expand Up @@ -229,6 +229,7 @@ private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
this.missingValueInterpretationMap = builder.missingValueInterpretationMap;
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
this.requestProfilerHook =
new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
Expand Down Expand Up @@ -420,18 +421,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}

/**
* Sets the missing value interpretation map for the stream writer. The input
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by stream
* writer.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
}

/**
* Schedules the writing of rows at the end of current stream.
*
Expand Down Expand Up @@ -700,6 +689,9 @@ public static final class Builder {
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;

private Map<String, AppendRowsRequest.MissingValueInterpretation>
missingValueInterpretationMap = new HashMap();

private boolean enableRequestProfiler = false;
private boolean enableOpenTelemetry = false;

Expand Down Expand Up @@ -851,6 +843,20 @@ public Builder setDefaultMissingValueInterpretation(
return this;
}

/**
* Sets the missing value interpretation map for the stream writer. The input
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by stream
* writer.
* @return Builder
*/
public Builder setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
return this;
}

/**
* Enable a latency profiler that would periodically generate a detailed latency report for the
* top latency requests. This is currently an experimental API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,95 @@ public void testSimpleSchemaUpdate_skipRefreshWriterIfSchemaProvided() throws Ex
}
}

@Test
public void testSimpleSchemaUpdate_withInterpretationMap() throws Exception {
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setTableSchema(TABLE_SCHEMA)
.setLocation("us")
.build());
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.setMissingValueInterpretationMap(missingValueMap)
.build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(createAppendResponse(1));
// Verify the map before the writer is refreshed
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
testBigQueryWrite.addResponse(createAppendResponse(2));
testBigQueryWrite.addResponse(createAppendResponse(3));

// First batch of appends. First append request will return an updated-schema, but the second
// and maybe the third append will be processed before the first response will refresh the
// StreamWriter.
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());

// Another append, this time with columns to match the updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "aaa");
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(4, testBigQueryWrite.getAppendRequests().size());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRows(0),
UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());

assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertTrue(
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());

// Verify the map after the writer is refreshed
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
assertEquals(
testBigQueryWrite.getAppendRequests().get(3).getDefaultMissingValueInterpretation(),
MissingValueInterpretation.DEFAULT_VALUE);
assertEquals(
testBigQueryWrite.getAppendRequests().get(3).getMissingValueInterpretations(),
missingValueMap);
}
}

@Test
public void testWithoutIgnoreUnknownFieldsUpdateImmeidateSuccess() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
Expand Down Expand Up @@ -1523,13 +1612,16 @@ public void testAppendWithMissingValueMap() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema)
.setMissingValueInterpretationMap(missingValueMap)
.setTraceId("test:empty")
.build()) {

Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap<>();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());

testBigQueryWrite.addResponse(
Expand Down
Loading

0 comments on commit 036d2e6

Please sign in to comment.