From 5d4c7e18b82ab85f7498e34a29920e9af765f918 Mon Sep 17 00:00:00 2001 From: gnanda Date: Fri, 22 Apr 2022 10:00:44 -0700 Subject: [PATCH] docs(samples): update WriteComittedStream sample code to match best practices (#1628) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Update WriteComittedStream sample code to match best practices * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../bigquerystorage/WriteCommittedStream.java | 160 ++++++++++++++---- 1 file changed, 128 insertions(+), 32 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java index 3b088f8cb0..e3d95de66b 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteCommittedStream.java @@ -18,16 +18,23 @@ // [START bigquerystorage_jsonstreamwriter_committed] import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors.DescriptorValidationException; import java.io.IOException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; +import javax.annotation.concurrent.GuardedBy; import org.json.JSONArray; import org.json.JSONObject; @@ -45,13 +52,60 @@ public static void runWriteCommittedStream() public static void writeCommittedStream(String projectId, String datasetName, String tableName) throws DescriptorValidationException, InterruptedException, IOException { + BigQueryWriteClient client = BigQueryWriteClient.create(); + TableName parentTable = TableName.of(projectId, datasetName, tableName); - try (BigQueryWriteClient client = BigQueryWriteClient.create()) { + DataWriter writer = new DataWriter(); + // One time initialization. + writer.initialize(parentTable, client); + + try { + // Write two batches of fake data to the stream, each with 10 JSON records. Data may be + // batched up to the maximum request size: + // https://cloud.google.com/bigquery/quotas#write-api-limits + long offset = 0; + for (int i = 0; i < 2; i++) { + // Create a JSON object that is compatible with the table schema. + JSONArray jsonArr = new JSONArray(); + for (int j = 0; j < 10; j++) { + JSONObject record = new JSONObject(); + record.put("col1", String.format("batch-record %03d-%03d", i, j)); + jsonArr.put(record); + } + writer.append(jsonArr, offset); + offset += jsonArr.length(); + } + } catch (ExecutionException e) { + // If the wrapped exception is a StatusRuntimeException, check the state of the operation. + // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: + // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html + System.out.println("Failed to append records. \n" + e); + } + + // Final cleanup for the stream. + writer.cleanup(client); + System.out.println("Appended records successfully."); + } + + // A simple wrapper object showing how the stateful stream writer should be used. + private static class DataWriter { + + private JsonStreamWriter streamWriter; + // Track the number of in-flight requests to wait for all responses before shutting down. + private final Phaser inflightRequestCount = new Phaser(1); + + private final Object lock = new Object(); + + @GuardedBy("lock") + private RuntimeException error = null; + + void initialize(TableName parentTable, BigQueryWriteClient client) + throws IOException, DescriptorValidationException, InterruptedException { // Initialize a write stream for the specified table. // For more information on WriteStream.Type, see: - // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/WriteStream.Type.html + // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.html WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); - TableName parentTable = TableName.of(projectId, datasetName, tableName); + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() .setParent(parentTable.toString()) @@ -62,37 +116,79 @@ public static void writeCommittedStream(String projectId, String datasetName, St // Use the JSON stream writer to send records in JSON format. // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()) - .build()) { - // Write two batches to the stream, each with 10 JSON records. A writer should be - // used for as much writes as possible. Creating a writer for just one write is an - // antipattern. - for (int i = 0; i < 2; i++) { - // Create a JSON object that is compatible with the table schema. - JSONArray jsonArr = new JSONArray(); - for (int j = 0; j < 10; j++) { - JSONObject record = new JSONObject(); - record.put("col1", String.format("record %03d-%03d", i, j)); - jsonArr.put(record); - } + streamWriter = + JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build(); + } - // To detect duplicate records, pass the index as the record offset. - // To disable deduplication, omit the offset or use WriteStream.Type.DEFAULT. - ApiFuture future = writer.append(jsonArr, /*offset=*/ i * 10); - AppendRowsResponse response = future.get(); + public void append(JSONArray data, long offset) + throws DescriptorValidationException, IOException, ExecutionException { + synchronized (this.lock) { + // If earlier appends have failed, we need to reset before continuing. + if (this.error != null) { + throw this.error; } - // Finalize the stream after use. - FinalizeWriteStreamRequest finalizeWriteStreamRequest = - FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build(); - client.finalizeWriteStream(finalizeWriteStreamRequest); } - System.out.println("Appended records successfully."); - } catch (ExecutionException e) { - // If the wrapped exception is a StatusRuntimeException, check the state of the operation. - // If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see: - // https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html - System.out.println("Failed to append records. \n" + e.toString()); + // Append asynchronously for increased throughput. + ApiFuture future = streamWriter.append(data, offset); + ApiFutures.addCallback( + future, new DataWriter.AppendCompleteCallback(this), MoreExecutors.directExecutor()); + // Increase the count of in-flight requests. + inflightRequestCount.register(); + } + + public void cleanup(BigQueryWriteClient client) { + // Wait for all in-flight requests to complete. + inflightRequestCount.arriveAndAwaitAdvance(); + + // Close the connection to the server. + streamWriter.close(); + + // Verify that no error occurred in the stream. + synchronized (this.lock) { + if (this.error != null) { + throw this.error; + } + } + + // Finalize the stream. + FinalizeWriteStreamResponse finalizeResponse = + client.finalizeWriteStream(streamWriter.getStreamName()); + System.out.println("Rows written: " + finalizeResponse.getRowCount()); + } + + public String getStreamName() { + return streamWriter.getStreamName(); + } + + static class AppendCompleteCallback implements ApiFutureCallback { + + private final DataWriter parent; + + public AppendCompleteCallback(DataWriter parent) { + this.parent = parent; + } + + public void onSuccess(AppendRowsResponse response) { + System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue()); + done(); + } + + public void onFailure(Throwable throwable) { + synchronized (this.parent.lock) { + if (this.parent.error == null) { + StorageException storageException = Exceptions.toStorageException(throwable); + this.parent.error = + (storageException != null) ? storageException : new RuntimeException(throwable); + } + } + System.out.format("Error: %s\n", throwable.toString()); + done(); + } + + private void done() { + // Reduce the count of in-flight requests. + this.parent.inflightRequestCount.arriveAndDeregister(); + } } } }