Skip to content

Commit

Permalink
feat: provide sample code for row-level error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawal-siddharth committed Nov 6, 2022
1 parent 5699122 commit 43798cb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 13 deletions.
2 changes: 1 addition & 1 deletion samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.4.0</version>
<version>26.1.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
Expand All @@ -37,6 +40,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Phaser;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
Expand Down Expand Up @@ -69,7 +73,11 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
JSONArray jsonArr = new JSONArray();
for (int j = 0; j < 10; j++) {
JSONObject record = new JSONObject();
record.put("test_string", String.format("record %03d-%03d", i, j));
StringBuilder sbSuffix = new StringBuilder();
for (int k = 0; k < j; k++) {
sbSuffix.append(k);
}
record.put("test_string", String.format("record %03d-%03d %s", i, j, sbSuffix.toString()));
jsonArr.put(record);
}

Expand All @@ -78,9 +86,31 @@ public static void writeToDefaultStream(String projectId, String datasetName, St

// Final cleanup for the stream during worker teardown.
writer.cleanup();
verifyExpectedRowCount(parentTable, 12);
System.out.println("Appended records successfully.");
}

private static void verifyExpectedRowCount(TableName parentTable, int expectedRowCount)
throws InterruptedException {
String queryRowCount = "SELECT COUNT(*) FROM `"
+ parentTable.getProject()
+ "."
+ parentTable.getDataset()
+ "."
+ parentTable.getTable()
+ "`";
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(queryRowCount).build();
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableResult results = bigquery.query(queryConfig);
int countRowsActual = Integer.parseInt(
results.getValues().iterator().next().get("f0_").getStringValue());
if (countRowsActual != expectedRowCount) {
throw new RuntimeException("Unexpected row count. Expected: " +
expectedRowCount + ". Actual: " + countRowsActual);
}
}

private static class AppendContext {

JSONArray data;
Expand Down Expand Up @@ -170,7 +200,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
}

public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
System.out.format("Append success%n");
done();
}

Expand All @@ -182,16 +212,51 @@ public void onFailure(Throwable throwable) {
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
// Mark the existing attempt as done since it's being retried.
// Use a separate thread to avoid potentially blocking while we are in a callback.
new Thread(() -> {
try {
// Since default stream appends are not ordered, we can simply retry the appends.
// Retrying with exclusive streams requires more careful consideration.
this.parent.append(appendContext);
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s%n", e);
}
}).start();
// Mark the existing attempt as done since it's being retried.
done();
return;
}

if (throwable instanceof AppendSerializtionError) {
AppendSerializtionError ase = (AppendSerializtionError) throwable;
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
if (rowIndexToErrorMessage.size() > 0) {
// Omit the faulty rows
JSONArray dataNew = new JSONArray();
for (int i = 0; i < appendContext.data.length(); i++) {
if (!rowIndexToErrorMessage.containsKey(i)) {
dataNew.put(appendContext.data.get(i));
} else {
// process faulty rows by placing them on a dead-letter-queue, for instance
}
}

// Mark the existing attempt as done since we got a response for it
done();

// Retry the remaining valid rows, but using a separate thread to
// avoid potentially blocking while we are in a callback.
if (dataNew.length() > 0) {
new Thread(() -> {
try {
this.parent.append(new AppendContext(dataNew, 0));
} catch (Exception e2) {
System.out.format("Failed to retry append with filtered rows: %s%n", e2);
}
}).start();
}
return;
} catch (Exception e) {
// Fall through to return error.
System.out.format("Failed to retry append: %s\n", e);
}
}

Expand All @@ -202,7 +267,7 @@ public void onFailure(Throwable throwable) {
(storageException != null) ? storageException : new RuntimeException(throwable);
}
}
System.out.format("Error: %s\n", throwable);
System.out.format("Error that arrived: %s%n", throwable);
done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ public void setUp() {
// Create a new dataset and table for each test.
datasetName = "WRITE_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
tableName = "DEFAULT_STREAM_TEST" + UUID.randomUUID().toString().substring(0, 8);
Schema schema = Schema.of(Field.of("test_string", StandardSQLTypeName.STRING));
Schema schema = Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"test_string", StandardSQLTypeName.STRING)
.setMaxLength(20L)
.build());
bigquery.create(DatasetInfo.newBuilder(datasetName).build());
TableInfo tableInfo =
TableInfo.newBuilder(TableId.of(datasetName, tableName), StandardTableDefinition.of(schema))
Expand Down

0 comments on commit 43798cb

Please sign in to comment.