Skip to content

Commit

Permalink
feat: temp workaround for omg/48020 (#1521)
Browse files Browse the repository at this point in the history
This should only be used by customer who is affected by omg/48020 (connection stuck after 10MB) using JsonWriter directly, before fix finished rolling out on Friday 2/11. It can be enabled by calling JsonWriter.setReconnectOnStuck(true). 

The write performance will be impacted by this change, but it can work around the stuckness.
  • Loading branch information
yirutang authored Feb 8, 2022
1 parent 9fd7aca commit ff28f65
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class JsonStreamWriter implements AutoCloseable {
private Descriptor descriptor;
private TableSchema tableSchema;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;
private long totalMessageSize = 0;
private long absTotal = 0;
private ProtoSchema protoSchema;

/**
* Constructs the JsonStreamWriter
Expand All @@ -71,7 +75,9 @@ private JsonStreamWriter(Builder builder)
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamName, builder.client);
}
streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
streamWriterBuilder.setWriterSchema(protoSchema);
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
Expand All @@ -82,6 +88,7 @@ private JsonStreamWriter(Builder builder)
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
this.ignoreUnknownFields = builder.ignoreUnknownFields;
this.reconnectAfter10M = builder.reconnectAfter10M;
}

/**
Expand Down Expand Up @@ -122,27 +129,46 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
.build();
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
// of JSON data.
long currentRequestSize = 0;
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
currentRequestSize += protoMessage.getSerializedSize();
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
synchronized (this) {
this.totalMessageSize += currentRequestSize;
this.absTotal += currentRequestSize;
// Reconnect on every 9.5MB.
if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) {
streamWriter.close();
// Create a new underlying StreamWriter aka establish a new connection.
this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build();
this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize;
this.absTotal += currentRequestSize;
// Allow first request to pass.
}
LOG.fine(
"Sending a total of:"
+ this.totalMessageSize
+ " "
+ currentRequestSize
+ " "
+ this.absTotal);
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(rowsBuilder.build(), offset);
return appendResponseFuture;
Expand Down Expand Up @@ -264,6 +290,7 @@ public static final class Builder {
private boolean createDefaultStream = false;
private String traceId;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -377,6 +404,19 @@ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
return this;
}

/**
* Setter for a reconnectAfter10M, temporaily workaround for omg/48020. Fix for the omg is
* supposed to roll out by 2/11/2022 Friday. If you set this to True, your write will be slower
* (0.75MB/s per connection), but your writes will not be stuck as a sympton of omg/48020.
*
* @param reconnectAfter10M
* @return Builder
*/
public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
this.reconnectAfter10M = reconnectAfter10M;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,58 @@ public void testJsonStreamWriterWithDefaultStream()
}
}

// This test runs about 1 min.
@Test
public void testJsonStreamWriterWithMessagesOver10M()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "TableLarge";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 10;
int rowBatch = 40000;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses =
new ArrayList<ApiFuture<AppendRowsResponse>>(totalRequest);
// Sends a total of 30MB over the wire.
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setReconnectAfter10M(true)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}

@Test
public void testJsonStreamWriterSchemaUpdate()
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
Expand Down

0 comments on commit ff28f65

Please sign in to comment.