From b58763833ad4bd51515c055bbcb8e29d9fea05a9 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Mon, 14 Mar 2022 15:34:28 -0700 Subject: [PATCH] fix: a possible race condition that we used table schema out of the lock. (#1575) Also clean up the code to reconnect after 10MB. --- .../bigquery/storage/v1/JsonStreamWriter.java | 46 ++++++------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index fb10b4072e..8f09471762 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -134,41 +134,21 @@ public ApiFuture append(JSONArray jsonArr, long offset) // Create a new underlying StreamWriter with the updated TableSchema and Descriptor this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build(); } - } - ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); - // Any error in convertJsonToProtoMessage will throw an - // IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing - // of JSON data. - long currentRequestSize = 0; - for (int i = 0; i < jsonArr.length(); i++) { - JSONObject json = jsonArr.getJSONObject(i); - Message protoMessage = - JsonToProtoMessage.convertJsonToProtoMessage( - this.descriptor, this.tableSchema, json, ignoreUnknownFields); - rowsBuilder.addSerializedRows(protoMessage.toByteString()); - currentRequestSize += protoMessage.getSerializedSize(); - } - // Need to make sure refreshAppendAndSetDescriptor finish first before this can run - synchronized (this) { - this.totalMessageSize += currentRequestSize; - this.absTotal += currentRequestSize; - // Reconnect on every 9.5MB. - if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) { - streamWriter.close(); - // Create a new underlying StreamWriter aka establish a new connection. - this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build(); - this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize; - this.absTotal += currentRequestSize; - // Allow first request to pass. + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); + // Any error in convertJsonToProtoMessage will throw an + // IllegalArgumentException/IllegalStateException/NullPointerException and will halt + // processing + // of JSON data. + long currentRequestSize = 0; + for (int i = 0; i < jsonArr.length(); i++) { + JSONObject json = jsonArr.getJSONObject(i); + Message protoMessage = + JsonToProtoMessage.convertJsonToProtoMessage( + this.descriptor, this.tableSchema, json, ignoreUnknownFields); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); + currentRequestSize += protoMessage.getSerializedSize(); } - LOG.fine( - "Sending a total of:" - + this.totalMessageSize - + " " - + currentRequestSize - + " " - + this.absTotal); final ApiFuture appendResponseFuture = this.streamWriter.append(rowsBuilder.build(), offset); return appendResponseFuture;