diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index ae0f3f7902..cfbdf7f3e2 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -31,7 +31,7 @@
com.google.cloud
libraries-bom
- 25.4.0
+ 26.1.3
pom
import
diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
index 3e74e84694..096072ba3f 100644
--- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
+++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
@@ -23,10 +23,13 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
@@ -37,6 +40,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
@@ -69,7 +73,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
- record.put("test_string", String.format("record %03d-%03d", i, j));
+ StringBuilder sbSuffix = new StringBuilder();
+ for (int k = 0; k < j; k++) {
+ sbSuffix.append(k);
+ }
+ record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
jsonArr.put(record);
}
@@ -78,9 +86,31 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
// Final cleanup for the stream during worker teardown.
writer.cleanup();
+ verifyExpectedRowCount(parentTable, 12);
System.out.println("Appended records successfully.");
}
+ private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
+ throws InterruptedException {
+ String queryRowCount = "SELECT COUNT(*) FROM `"
+ + parentTable.getProject()
+ + "."
+ + parentTable.getDataset()
+ + "."
+ + parentTable.getTable()
+ + "`";
+ QueryJobConfiguration queryConfig =
+ QueryJobConfiguration.newBuilder(queryRowCount).build();
+ BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
+ TableResult results = bigquery.query(queryConfig);
+ int countRowsActual = Integer.parseInt(
+ results.getValues().iterator().next().get("f0_").getStringValue());
+ if (countRowsActual != expectedRowCount) {
+ throw new RuntimeException("Unexpected row count. Expected: " +
+ expectedRowCount + ". Actual: " + countRowsActual);
+ }
+ }
+
private static class AppendContext {
JSONArray data;
@@ -170,7 +200,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
}
public void onSuccess(AppendRowsResponse response) {
- System.out.format("Append success\n");
+ System.out.format("Append success%n");
done();
}
@@ -182,16 +212,51 @@ public void onFailure(Throwable throwable) {
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
- try {
- // Since default stream appends are not ordered, we can simply retry the appends.
- // Retrying with exclusive streams requires more careful consideration.
- this.parent.append(appendContext);
- // Mark the existing attempt as done since it's being retried.
+ // Use a separate thread to avoid potentially blocking while we are in a callback.
+ new Thread(() -> {
+ try {
+ // Since default stream appends are not ordered, we can simply retry the appends.
+ // Retrying with exclusive streams requires more careful consideration.
+ this.parent.append(appendContext);
+ } catch (Exception e) {
+ // Fall through to return error.
+ System.out.format("Failed to retry append: %s%n", e);
+ }
+ }).start();
+ // Mark the existing attempt as done since it's being retried.
+ done();
+ return;
+ }
+
+ if (throwable instanceof AppendSerializtionError) {
+ AppendSerializtionError ase = (AppendSerializtionError) throwable;
+ Map rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
+ if (rowIndexToErrorMessage.size() > 0) {
+ // Omit the faulty rows
+ JSONArray dataNew = new JSONArray();
+ for (int i = 0; i < appendContext.data.length(); i++) {
+ if (!rowIndexToErrorMessage.containsKey(i)) {
+ dataNew.put(appendContext.data.get(i));
+ } else {
+ // process faulty rows by placing them on a dead-letter-queue, for instance
+ }
+ }
+
+ // Mark the existing attempt as done since we got a response for it
done();
+
+ // Retry the remaining valid rows, but using a separate thread to
+ // avoid potentially blocking while we are in a callback.
+ if (dataNew.length() > 0) {
+ new Thread(() -> {
+ try {
+ this.parent.append(new AppendContext(dataNew, 0));
+ } catch (Exception e2) {
+ System.out.format("Failed to retry append with filtered rows: %s%n", e2);
+ }
+ }).start();
+ }
return;
- } catch (Exception e) {
- // Fall through to return error.
- System.out.format("Failed to retry append: %s\n", e);
}
}
@@ -202,7 +267,7 @@ public void onFailure(Throwable throwable) {
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
- System.out.format("Error: %s\n", throwable);
+ System.out.format("Error that arrived: %s%n", throwable);
done();
}
diff --git a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java
index 871902e0ae..dffa83cc34 100644
--- a/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java
+++ b/samples/snippets/src/test/java/com/example/bigquerystorage/WriteToDefaultStreamIT.java
@@ -73,7 +73,11 @@ public void setUp() {
// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
- Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING));
+ Schema schema = Schema.of(
+ com.google.cloud.bigquery.Field.newBuilder(
+ "test_string", StandardSQLTypeName.STRING)
+ .setMaxLength(20L)
+ .build());
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableInfo tableInfo =
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))