Skip to content

Commit

Permalink
fix: change customer StorageException from RuntimeException to Status…
Browse files Browse the repository at this point in the history
…RuntimeException (#1559)

* fix: change customer StorageException from RuntimeException to StatusRuntimeException so that Beam connector can consume it.

* fix clirr

fix clirr

fix clirr

nit

* update unit test

* lint

* update based on comments

* add another unit test

* clean up

* clean up

* update clirr and test case
  • Loading branch information
stephaniewang526 authored Mar 3, 2022
1 parent 42b28e0 commit 523377e
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
16 changes: 16 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$SchemaMismatchedException</className>
<method>Exceptions$SchemaMismatchedException(java.lang.String, java.lang.String, java.lang.Throwable)</method>
<to>Exceptions$SchemaMismatchedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamFinalizedException</className>
<method>Exceptions$StreamFinalizedException(java.lang.String, java.lang.String, java.lang.Throwable)</method>
<to>Exceptions$StreamFinalizedException(io.grpc.Status, io.grpc.Metadata, java.lang.String)</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import javax.annotation.Nullable;

Expand All @@ -30,7 +33,7 @@ public WriterClosedException(String streamName) {
}
}
/** Main Storage Exception. Might contain map of streams to errors for that stream. */
public static class StorageException extends RuntimeException {
public static class StorageException extends StatusRuntimeException {

private final ImmutableMap<String, GrpcStatusCode> errors;
private final String streamName;
Expand All @@ -40,11 +43,11 @@ private StorageException() {
}

private StorageException(
@Nullable String message,
@Nullable Throwable cause,
@Nullable Status grpcStatus,
@Nullable Metadata metadata,
@Nullable String streamName,
ImmutableMap<String, GrpcStatusCode> errors) {
super(message, cause);
super(grpcStatus, metadata);
this.streamName = streamName;
this.errors = errors;
}
Expand All @@ -60,8 +63,8 @@ public String getStreamName() {

/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
protected StreamFinalizedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
protected StreamFinalizedException(Status grpcStatus, Metadata metadata, String name) {
super(grpcStatus, metadata, name, ImmutableMap.of());
}
}

Expand All @@ -70,8 +73,8 @@ protected StreamFinalizedException(String name, String message, Throwable cause)
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
protected SchemaMismatchedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
protected SchemaMismatchedException(Status grpcStatus, Metadata metadata, String name) {
super(grpcStatus, metadata, name, ImmutableMap.of());
}
}

Expand All @@ -98,15 +101,17 @@ private static StorageError toStorageError(com.google.rpc.Status rpcStatus) {
public static StorageException toStorageException(
com.google.rpc.Status rpcStatus, Throwable exception) {
StorageError error = toStorageError(rpcStatus);
Status grpcStatus =
Status.fromCodeValue(rpcStatus.getCode()).withDescription(rpcStatus.getMessage());
if (error == null) {
return null;
}
switch (error.getCode()) {
case STREAM_FINALIZED:
return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception);
return new StreamFinalizedException(grpcStatus, null, error.getEntity());

case SCHEMA_MISMATCH_EXTRA_FIELDS:
return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception);
return new SchemaMismatchedException(grpcStatus, null, error.getEntity());

default:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -343,6 +344,21 @@ public void testAppendFailedSchemaError() throws Exception {
writer.close();
}

@Test
public void testAppendFailRandomException() throws Exception {
StreamWriter writer = getTestStreamWriter();
// Trigger a non-StatusRuntimeException for append operation (although grpc API should not
// return anything other than StatusRuntimeException)
IllegalArgumentException illegalArgumentException =
new IllegalArgumentException("Illegal argument");
testBigQueryWrite.addException(illegalArgumentException);
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
UnknownException actualError = assertFutureException(UnknownException.class, appendFuture1);
assertEquals(Code.UNKNOWN, actualError.getStatusCode().getCode());

writer.close();
}

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiFuture;
Expand All @@ -27,11 +28,15 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.Exceptions.SchemaMismatchedException;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
Expand Down Expand Up @@ -835,8 +840,10 @@ public void testStreamSchemaMisMatchError() throws IOException, InterruptedExcep
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertEquals(Exceptions.SchemaMismatchedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage())
.contains("Schema mismatch due to extra fields in user schema");
Exceptions.SchemaMismatchedException actualError = (SchemaMismatchedException) e.getCause();
assertNotNull(actualError.getStreamName());
// This verifies that the Beam connector can consume this custom exception's grpc StatusCode
assertEquals(Code.INVALID_ARGUMENT, Status.fromThrowable(e.getCause()).getCode());
}
}
}
Expand All @@ -853,20 +860,28 @@ public void testStreamFinalizedError()
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor()))
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build()) {
// Append once before finalizing the stream
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
response.get();
// Finalize the stream in order to trigger STREAM_FINALIZED error
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
// Try to append to a finalized stream
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 1);
try {
response.get();
response2.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
assertEquals(Exceptions.StreamFinalizedException.class, e.getCause().getClass());
assertThat(e.getCause().getMessage()).contains("Stream is finalized");
Exceptions.StreamFinalizedException actualError = (StreamFinalizedException) e.getCause();
assertNotNull(actualError.getStreamName());
// This verifies that the Beam connector can consume this custom exception's grpc StatusCode
assertEquals(Code.INVALID_ARGUMENT, Status.fromThrowable(e.getCause()).getCode());
assertThat(e.getCause().getMessage()).contains("Stream has been finalized");
}
}
}
Expand Down

0 comments on commit 523377e

Please sign in to comment.