Skip to content

Commit

Permalink
docs(tutorials): Call append asynchronously instead of blocking (#1542)
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson authored Feb 18, 2022
1 parent 433eb70 commit c777e23
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.example.bigquerystorage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
Expand All @@ -31,11 +33,11 @@
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;

Expand Down Expand Up @@ -118,20 +120,32 @@ public static void writeToDefaultStream(
}
} // batch
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
if (response.hasUpdatedSchema()) {
// The destination table schema has changed. The client library automatically
// reestablishes a connection to the backend using the new schema, so we can continue
// to send data without interruption.
System.out.println("Table schema changed.");
}
// The append method is asynchronous. Rather than waiting for the method to complete,
// which can hurt performance, register a completion callback and continue streaming.
ApiFutures.addCallback(
future, new AppendCompleteCallback(), MoreExecutors.directExecutor());
}
System.out.println("Appended records successfully.");
} catch (ExecutionException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
System.out.println("Failed to append records. \n" + e.toString());
}
}
}

class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {

private static int batchCount = 0;
private static final Object lock = new Object();

public void onSuccess(AppendRowsResponse response) {
synchronized (lock) {
if (response.hasError()) {
System.out.format("Error: %s\n", response.getError().toString());
} else {
++batchCount;
System.out.format("Wrote batch %d\n", batchCount);
}
}
}

public void onFailure(Throwable throwable) {
System.out.format("Error: %s\n", throwable.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testJsonWriterDefaultStream() throws Exception {
System.out.println(dataFilePath.toString());
String[] args = {GOOGLE_CLOUD_PROJECT, datasetName, "github", dataFilePath.toString()};
JsonWriterDefaultStream.main(args);
assertThat(bout.toString()).contains("Appended records successfully.");
assertThat(bout.toString()).contains("Wrote batch");
}

@After
Expand Down

0 comments on commit c777e23

Please sign in to comment.