diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index e2a56066e3..ca9d4778e6 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -18,10 +18,11 @@
5001
com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException
+ com/google/cloud/bigquery/storage/v1/Exceptions$StorageException
7004
com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException
- protected Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String, java.lang.String)
+ Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String)
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
index 1bfd241f6a..409f097697 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java
@@ -81,18 +81,6 @@ protected StreamFinalizedException(Status grpcStatus, String name) {
}
}
- /**
- * This writer instance has either been closed by the user explicitly, or has encountered
- * non-retriable errors.
- *
- *
To continue to write to the same stream, you will need to create a new writer instance.
- */
- public static final class StreamWriterClosedException extends StorageException {
- protected StreamWriterClosedException(Status grpcStatus, String name) {
- super(grpcStatus, name, null, null, ImmutableMap.of());
- }
- }
-
/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
@@ -251,5 +239,80 @@ public String getStreamName() {
}
}
+ /**
+ * This writer instance has either been closed by the user explicitly, or has encountered
+ * non-retriable errors.
+ *
+ *
To continue to write to the same stream, you will need to create a new writer instance.
+ */
+ public static final class StreamWriterClosedException extends StatusRuntimeException {
+ private final String streamName;
+ private final String writerId;
+
+ protected StreamWriterClosedException(Status grpcStatus, String streamName, String writerId) {
+ super(grpcStatus);
+ this.streamName = streamName;
+ this.writerId = writerId;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getWriterId() {
+ return writerId;
+ }
+ }
+
+ /**
+ * If FlowController.LimitExceededBehavior is set to Block and inflight limit is exceeded, this
+ * exception will be thrown. If it is just a spike, you may retry the request. Otherwise, you can
+ * increase the inflight limit or create more StreamWriter to handle your traffic.
+ */
+ public static class InflightLimitExceededException extends StatusRuntimeException {
+ private final long currentLimit;
+ private final String writerId;
+
+ protected InflightLimitExceededException(
+ Status grpcStatus, String writerId, long currentLimit) {
+ super(grpcStatus);
+ this.currentLimit = currentLimit;
+ this.writerId = writerId;
+ }
+
+ public String getWriterId() {
+ return writerId;
+ }
+
+ public long getCurrentLimit() {
+ return currentLimit;
+ }
+ }
+
+ public static class InflightRequestsLimitExceededException
+ extends InflightLimitExceededException {
+ protected InflightRequestsLimitExceededException(String writerId, long currentLimit) {
+ super(
+ Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
+ .withDescription(
+ "Exceeds client side inflight buffer, consider add more buffer or open more connections. Current limit: "
+ + currentLimit),
+ writerId,
+ currentLimit);
+ }
+ }
+
+ public static class InflightBytesLimitExceededException extends InflightLimitExceededException {
+ protected InflightBytesLimitExceededException(String writerId, long currentLimit) {
+ super(
+ Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
+ .withDescription(
+ "Exceeds client side inflight buffer, consider add more buffer or open more connections. Current limit: "
+ + currentLimit),
+ writerId,
+ currentLimit);
+ }
+ }
+
private Exceptions() {}
}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
index a7d2a0a589..b6f1d26a42 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
@@ -168,15 +168,16 @@ public ApiFuture append(JSONArray jsonArr, long offset)
}
}
- /**
- * Gets streamName
- *
- * @return String
- */
+ /** @return The name of the write stream associated with this writer. */
public String getStreamName() {
return this.streamName;
}
+ /** @return A unique Id for this writer. */
+ public String getWriterId() {
+ return streamWriter.getWriterId();
+ }
+
/**
* Gets current descriptor
*
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index a51b5af3bd..c2ad5ba5a3 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -173,6 +174,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final AtomicLong inflightWaitSec = new AtomicLong(0);
+ /*
+ * A String that uniquely identifies this writer.
+ */
+ private final String writerId = UUID.randomUUID().toString();
+
/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
@@ -307,17 +313,19 @@ private ApiFuture appendInternal(AppendRowsRequest message)
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed"),
- streamName));
+ streamName,
+ writerId));
return requestWrapper.appendResult;
}
// Check if queue is going to be full before adding the request.
- if ((this.inflightRequests + 1 >= this.maxInflightRequests
- || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
- && (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
- throw new StatusRuntimeException(
- Status.fromCode(Code.RESOURCE_EXHAUSTED)
- .withDescription(
- "Exceeds client side inflight buffer, consider add more buffer or open more connections."));
+ if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
+ if (this.inflightRequests + 1 >= this.maxInflightRequests) {
+ throw new Exceptions.InflightRequestsLimitExceededException(
+ writerId, this.maxInflightRequests);
+ }
+ if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) {
+ throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes);
+ }
}
if (connectionFinalStatus != null) {
@@ -326,7 +334,8 @@ private ApiFuture appendInternal(AppendRowsRequest message)
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription(
"Connection is closed due to " + connectionFinalStatus.toString()),
- streamName));
+ streamName,
+ writerId));
return requestWrapper.appendResult;
}
@@ -375,6 +384,16 @@ public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}
+ /** @return a unique Id for the writer. */
+ public String getWriterId() {
+ return writerId;
+ }
+
+ /** @return name of the Stream that this writer is working on. */
+ public String getStreamName() {
+ return streamName;
+ }
+
/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
@@ -566,7 +585,8 @@ private void cleanupInflightRequests() {
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed, cleanup inflight request"),
- streamName);
+ streamName,
+ writerId);
Deque localQueue = new LinkedList();
this.lock.lock();
try {
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
index 434659dee7..eaae1ca329 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
@@ -643,4 +643,14 @@ public void testMultipleAppendSerializtionErrors()
}
}
}
+
+ @Test
+ public void testWriterId()
+ throws DescriptorValidationException, IOException, InterruptedException {
+ JsonStreamWriter writer1 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
+ Assert.assertFalse(writer1.getWriterId().isEmpty());
+ JsonStreamWriter writer2 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
+ Assert.assertFalse(writer2.getWriterId().isEmpty());
+ Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index 1ff0c65c69..ed206e82ec 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -33,6 +33,7 @@
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -47,6 +48,7 @@
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
@@ -552,9 +554,9 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build();
- StatusRuntimeException ex =
+ Exceptions.InflightBytesLimitExceededException ex =
assertThrows(
- StatusRuntimeException.class,
+ Exceptions.InflightBytesLimitExceededException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
@@ -568,6 +570,8 @@ public void run() throws Throwable {
.contains(
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
+ assertEquals(ex.getWriterId(), writer.getWriterId());
+ assertEquals(1, ex.getCurrentLimit());
writer.close();
}
@@ -665,6 +669,8 @@ public void testWriterAlreadyClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
+ assertEquals(actualError.getWriterId(), writer.getWriterId());
+ assertEquals(actualError.getStreamName(), writer.getStreamName());
}
@Test
@@ -683,5 +689,17 @@ public void testWriterClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is closed"));
+ assertEquals(actualError.getWriterId(), writer.getWriterId());
+ assertEquals(actualError.getStreamName(), writer.getStreamName());
+ }
+
+ @Test
+ public void testWriterId()
+ throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
+ StreamWriter writer1 = getTestStreamWriter();
+ Assert.assertFalse(writer1.getWriterId().isEmpty());
+ StreamWriter writer2 = getTestStreamWriter();
+ Assert.assertFalse(writer2.getWriterId().isEmpty());
+ Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}
}