diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index 0b351aa437..38fa510bf2 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -2,15 +2,13 @@
- 7005
+ 7004
com/google/cloud/bigquery/storage/v1/Exceptions$SchemaMismatchedException
- Exceptions$SchemaMismatchedException(java.lang.String, java.lang.String, java.lang.Throwable)
- Exceptions$SchemaMismatchedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)
+ Exceptions$SchemaMismatchedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)
- 7005
+ 7004
com/google/cloud/bigquery/storage/v1/Exceptions$StreamFinalizedException
- Exceptions$StreamFinalizedException(java.lang.String, java.lang.String, java.lang.Throwable)
- Exceptions$StreamFinalizedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)
+ Exceptions$StreamFinalizedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
index 31e3350b4c..5b02271a58 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
@@ -19,10 +19,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
-import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.annotation.Nullable;
/** Exceptions for Storage Client Libraries. */
@@ -37,18 +38,23 @@ public static class StorageException extends StatusRuntimeException {
private final ImmutableMap errors;
private final String streamName;
+ private final Long expectedOffset;
+ private final Long actualOffset;
private StorageException() {
- this(null, null, null, ImmutableMap.of());
+ this(null, null, null, null, ImmutableMap.of());
}
private StorageException(
@Nullable Status grpcStatus,
- @Nullable Metadata metadata,
@Nullable String streamName,
+ @Nullable Long expectedOffset,
+ @Nullable Long actualOffset,
ImmutableMap errors) {
- super(grpcStatus, metadata);
+ super(grpcStatus);
this.streamName = streamName;
+ this.expectedOffset = expectedOffset;
+ this.actualOffset = actualOffset;
this.errors = errors;
}
@@ -59,12 +65,20 @@ public ImmutableMap getErrors() {
public String getStreamName() {
return streamName;
}
+
+ public long getExpectedOffset() {
+ return expectedOffset;
+ }
+
+ public long getActualOffset() {
+ return actualOffset;
+ }
}
/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
- protected StreamFinalizedException(Status grpcStatus, Metadata metadata, String name) {
- super(grpcStatus, metadata, name, ImmutableMap.of());
+ protected StreamFinalizedException(Status grpcStatus, String name) {
+ super(grpcStatus, name, null, null, ImmutableMap.of());
}
}
@@ -73,8 +87,31 @@ protected StreamFinalizedException(Status grpcStatus, Metadata metadata, String
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
- protected SchemaMismatchedException(Status grpcStatus, Metadata metadata, String name) {
- super(grpcStatus, metadata, name, ImmutableMap.of());
+ protected SchemaMismatchedException(Status grpcStatus, String name) {
+ super(grpcStatus, name, null, null, ImmutableMap.of());
+ }
+ }
+
+ /** Offset already exists. */
+ public static final class OffsetAlreadyExists extends StorageException {
+ protected OffsetAlreadyExists(
+ Status grpcStatus, String name, Long expectedOffset, Long actualOffset) {
+ super(grpcStatus, name, expectedOffset, actualOffset, ImmutableMap.of());
+ }
+ }
+
+ /** Offset out of range. */
+ public static final class OffsetOutOfRange extends StorageException {
+ protected OffsetOutOfRange(
+ Status grpcStatus, String name, Long expectedOffset, Long actualOffset) {
+ super(grpcStatus, name, expectedOffset, actualOffset, ImmutableMap.of());
+ }
+ }
+
+ /** Stream is not found. */
+ public static final class StreamNotFound extends StorageException {
+ protected StreamNotFound(Status grpcStatus, String name) {
+ super(grpcStatus, name, null, null, ImmutableMap.of());
}
}
@@ -106,12 +143,48 @@ public static StorageException toStorageException(
if (error == null) {
return null;
}
+ String streamName = error.getEntity();
+ // The error message should have Entity but it's missing from the message for
+ // OFFSET_ALREADY_EXISTS
+ // TODO: Simplify the logic below when backend fixes passing Entity for OFFSET_ALREADY_EXISTS
+ // error
+ String errorMessage =
+ error.getErrorMessage().indexOf("Entity") > 0
+ ? error.getErrorMessage().substring(0, error.getErrorMessage().indexOf("Entity")).trim()
+ : error.getErrorMessage().trim();
+
+ // Ensure that erro message has the desirable pattern for parsing
+ String errormessagePatternString = "expected offset [0-9]+, received [0-9]+";
+ Pattern errorMessagePattern = Pattern.compile(errormessagePatternString);
+ Matcher errorMessageMatcher = errorMessagePattern.matcher(errorMessage);
+
+ Long expectedOffet;
+ Long actualOffset;
+ if (!errorMessageMatcher.find()) {
+ expectedOffet = -1L;
+ actualOffset = -1L;
+ } else {
+ expectedOffet =
+ Long.parseLong(
+ errorMessage.substring(
+ errorMessage.lastIndexOf("offset") + 7, errorMessage.lastIndexOf(",")));
+ actualOffset = Long.parseLong(errorMessage.substring(errorMessage.lastIndexOf(" ") + 1));
+ }
switch (error.getCode()) {
case STREAM_FINALIZED:
- return new StreamFinalizedException(grpcStatus, null, error.getEntity());
+ return new StreamFinalizedException(grpcStatus, streamName);
+
+ case STREAM_NOT_FOUND:
+ return new StreamNotFound(grpcStatus, streamName);
case SCHEMA_MISMATCH_EXTRA_FIELDS:
- return new SchemaMismatchedException(grpcStatus, null, error.getEntity());
+ return new SchemaMismatchedException(grpcStatus, streamName);
+
+ case OFFSET_OUT_OF_RANGE:
+ return new OffsetOutOfRange(grpcStatus, streamName, expectedOffet, actualOffset);
+
+ case OFFSET_ALREADY_EXISTS:
+ return new OffsetAlreadyExists(grpcStatus, streamName, expectedOffet, actualOffset);
default:
return null;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
index 3bde9598cb..623394ca65 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
@@ -28,6 +28,8 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
+import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
+import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
@@ -901,6 +903,75 @@ public void testStreamFinalizedError()
}
}
+ @Test
+ public void testOffsetAlreadyExistsError()
+ throws IOException, ExecutionException, InterruptedException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId)
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ try (StreamWriter streamWriter =
+ StreamWriter.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
+ .build()) {
+ // Append once with correct offset
+ ApiFuture response =
+ streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
+ response.get();
+ // Append again with the same offset
+ ApiFuture response2 =
+ streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
+ try {
+ response2.get();
+ Assert.fail("Should fail");
+ } catch (ExecutionException e) {
+ assertEquals(Exceptions.OffsetAlreadyExists.class, e.getCause().getClass());
+ Exceptions.OffsetAlreadyExists actualError = (OffsetAlreadyExists) e.getCause();
+ assertNotNull(actualError.getStreamName());
+ assertEquals(1, actualError.getExpectedOffset());
+ assertEquals(0, actualError.getActualOffset());
+ assertEquals(Code.ALREADY_EXISTS, Status.fromThrowable(e.getCause()).getCode());
+ assertThat(e.getCause().getMessage())
+ .contains("The offset is within stream, expected offset 1, received 0");
+ }
+ }
+ }
+
+ @Test
+ public void testOffsetOutOfRangeError() throws IOException, InterruptedException {
+ WriteStream writeStream =
+ client.createWriteStream(
+ CreateWriteStreamRequest.newBuilder()
+ .setParent(tableId)
+ .setWriteStream(
+ WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
+ .build());
+ try (StreamWriter streamWriter =
+ StreamWriter.newBuilder(writeStream.getName())
+ .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
+ .build()) {
+ // Append with an out of range offset
+ ApiFuture response =
+ streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 10);
+ try {
+ response.get();
+ Assert.fail("Should fail");
+ } catch (ExecutionException e) {
+ assertEquals(Exceptions.OffsetOutOfRange.class, e.getCause().getClass());
+ Exceptions.OffsetOutOfRange actualError = (OffsetOutOfRange) e.getCause();
+ assertNotNull(actualError.getStreamName());
+ assertEquals(0, actualError.getExpectedOffset());
+ assertEquals(10, actualError.getActualOffset());
+ assertEquals(Code.OUT_OF_RANGE, Status.fromThrowable(e.getCause()).getCode());
+ assertThat(e.getCause().getMessage())
+ .contains("The offset is beyond stream, expected offset 0, received 10");
+ }
+ }
+ }
+
@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =