diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 730bee7e2a..42ddc9be6a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.bigquery.storage.util.Errors; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1.StreamConnection.RequestCallback; import com.google.common.annotations.VisibleForTesting; @@ -32,7 +33,9 @@ import java.io.IOException; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.LinkedList; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -633,6 +636,15 @@ private void requestCallback(AppendRowsResponse response) { Exceptions.toStorageException(response.getError(), null); if (storageException != null) { requestWrapper.appendResult.setException(storageException); + } else if (response.getRowErrorsCount() > 0) { + Map rowIndexToErrorMessage = new HashMap<>(); + for (int i = 0; i < response.getRowErrorsCount(); i++) { + RowError rowError = response.getRowErrors(i); + rowIndexToErrorMessage.put(Math.toIntExact(rowError.getIndex()), rowError.getMessage()); + } + AppendSerializtionError exception = + new AppendSerializtionError(streamName, rowIndexToErrorMessage); + requestWrapper.appendResult.setException(exception); } else { StatusRuntimeException exception = new StatusRuntimeException( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 115de5ea38..cdade16be7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -28,6 +28,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.storage.test.Test.*; import com.google.cloud.bigquery.storage.v1.*; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException; @@ -313,6 +314,73 @@ public void testJsonStreamWriterCommittedStream() } } + @Test + public void testRowErrors() + throws IOException, InterruptedException, ExecutionException, + Descriptors.DescriptorValidationException { + String tableName = "_default"; + TableInfo tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, tableName), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder( + "foo", StandardSQLTypeName.STRING) + .setMaxLength(10L) + .build()))) + .build(); + bigquery.create(tableInfo); + TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(parent.toString()) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build(); + LOG.info("Sending three messages"); + ApiFuture futureResponse = + streamWriter.append( + CreateProtoRows(new String[] {"aaabbbcccddd", "bbb", "cccdddeeefffggg"}), -1); + AppendRowsResponse actualResponse = null; + try { + actualResponse = futureResponse.get(); + } catch (Throwable t) { + assertTrue(t instanceof ExecutionException); + t = t.getCause(); + assertTrue(t instanceof AppendSerializtionError); + AppendSerializtionError e = (AppendSerializtionError) t; + LOG.info("Found row errors on stream: " + e.getStreamName()); + assertEquals( + "Field foo: STRING(10) has maximum length 10 but got a value with length 12 on field foo.", + e.getRowIndexToErrorMessage().get(0)); + assertEquals( + "Field foo: STRING(10) has maximum length 10 but got a value with length 15 on field foo.", + e.getRowIndexToErrorMessage().get(2)); + for (Map.Entry entry : e.getRowIndexToErrorMessage().entrySet()) { + LOG.info("Bad row index: " + entry.getKey() + ", has problem: " + entry.getValue()); + } + } + assertEquals(null, actualResponse); + LOG.info("Resending with three good messages"); + ApiFuture futureResponse1 = + streamWriter.append(CreateProtoRows(new String[] {"aaa", "bbb", "ccc"}), -1); + assertEquals(3, futureResponse1.get().getAppendResult().getOffset().getValue()); + + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + } + @Test public void testJsonStreamWriterWithDefaultSchema() throws IOException, InterruptedException, ExecutionException,