Skip to content

Commit

Permalink
chore: return row-level error information via an AppendSerializtionEr…
Browse files Browse the repository at this point in the history
…ror (#1813)

exception.
  • Loading branch information
agrawal-siddharth authored Oct 3, 2022
1 parent 505f318 commit df7c6e9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -32,7 +33,9 @@
import java.io.IOException;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -633,6 +636,15 @@ private void requestCallback(AppendRowsResponse response) {
Exceptions.toStorageException(response.getError(), null);
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < response.getRowErrorsCount(); i++) {
RowError rowError = response.getRowErrors(i);
rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage());
}
AppendSerializtionError exception =
new AppendSerializtionError(streamName, rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
Expand Down Expand Up @@ -313,6 +314,73 @@ public void testJsonStreamWriterCommittedStream()
}
}

@Test
public void testRowErrors()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException {
String tableName = "_default";
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"foo", StandardSQLTypeName.STRING)
.setMaxLength(10L)
.build())))
.build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build();
LOG.info("Sending three messages");
ApiFuture<AppendRowsResponse> futureResponse =
streamWriter.append(
CreateProtoRows(new String[] {"aaabbbcccddd", "bbb", "cccdddeeefffggg"}), -1);
AppendRowsResponse actualResponse = null;
try {
actualResponse = futureResponse.get();
} catch (Throwable t) {
assertTrue(t instanceof ExecutionException);
t = t.getCause();
assertTrue(t instanceof AppendSerializtionError);
AppendSerializtionError e = (AppendSerializtionError) t;
LOG.info("Found row errors on stream: " + e.getStreamName());
assertEquals(
"Field foo: STRING(10) has maximum length 10 but got a value with length 12 on field foo.",
e.getRowIndexToErrorMessage().get(0));
assertEquals(
"Field foo: STRING(10) has maximum length 10 but got a value with length 15 on field foo.",
e.getRowIndexToErrorMessage().get(2));
for (Map.Entry<Integer, String> entry : e.getRowIndexToErrorMessage().entrySet()) {
LOG.info("Bad row index: " + entry.getKey() + ", has problem: " + entry.getValue());
}
}
assertEquals(null, actualResponse);
LOG.info("Resending with three good messages");
ApiFuture<AppendRowsResponse> futureResponse1 =
streamWriter.append(CreateProtoRows(new String[] {"aaa", "bbb", "ccc"}), -1);
assertEquals(3, futureResponse1.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}

@Test
public void testJsonStreamWriterWithDefaultSchema()
throws IOException, InterruptedException, ExecutionException,
Expand Down

0 comments on commit df7c6e9

Please sign in to comment.