Skip to content

Commit

Permalink
feat: introduce InflightLimitExceededException (#1746)
Browse files Browse the repository at this point in the history
* feat: introduce InflightLimitExceededException

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang authored Aug 15, 2022
1 parent 96a65b2 commit 449353b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 30 deletions.
3 changes: 2 additions & 1 deletion google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException</className>
<to>com/google/cloud/bigquery/storage/v1/Exceptions$StorageException</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigquery/storage/v1/Exceptions$StreamWriterClosedException</className>
<method>protected Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String, java.lang.String)</method>
<method>Exceptions$StreamWriterClosedException(io.grpc.Status, java.lang.String)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,6 @@ protected StreamFinalizedException(Status grpcStatus, String name) {
}
}

/**
* This writer instance has either been closed by the user explicitly, or has encountered
* non-retriable errors.
*
* <p>To continue to write to the same stream, you will need to create a new writer instance.
*/
public static final class StreamWriterClosedException extends StorageException {
protected StreamWriterClosedException(Status grpcStatus, String name) {
super(grpcStatus, name, null, null, ImmutableMap.of());
}
}

/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
Expand Down Expand Up @@ -251,5 +239,80 @@ public String getStreamName() {
}
}

/**
* This writer instance has either been closed by the user explicitly, or has encountered
* non-retriable errors.
*
* <p>To continue to write to the same stream, you will need to create a new writer instance.
*/
public static final class StreamWriterClosedException extends StatusRuntimeException {
private final String streamName;
private final String writerId;

protected StreamWriterClosedException(Status grpcStatus, String streamName, String writerId) {
super(grpcStatus);
this.streamName = streamName;
this.writerId = writerId;
}

public String getStreamName() {
return streamName;
}

public String getWriterId() {
return writerId;
}
}

/**
* If FlowController.LimitExceededBehavior is set to Block and inflight limit is exceeded, this
* exception will be thrown. If it is just a spike, you may retry the request. Otherwise, you can
* increase the inflight limit or create more StreamWriter to handle your traffic.
*/
public static class InflightLimitExceededException extends StatusRuntimeException {
private final long currentLimit;
private final String writerId;

protected InflightLimitExceededException(
Status grpcStatus, String writerId, long currentLimit) {
super(grpcStatus);
this.currentLimit = currentLimit;
this.writerId = writerId;
}

public String getWriterId() {
return writerId;
}

public long getCurrentLimit() {
return currentLimit;
}
}

public static class InflightRequestsLimitExceededException
extends InflightLimitExceededException {
protected InflightRequestsLimitExceededException(String writerId, long currentLimit) {
super(
Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections. Current limit: "
+ currentLimit),
writerId,
currentLimit);
}
}

public static class InflightBytesLimitExceededException extends InflightLimitExceededException {
protected InflightBytesLimitExceededException(String writerId, long currentLimit) {
super(
Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections. Current limit: "
+ currentLimit),
writerId,
currentLimit);
}
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,16 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
}
}

/**
* Gets streamName
*
* @return String
*/
/** @return The name of the write stream associated with this writer. */
public String getStreamName() {
return this.streamName;
}

/** @return A unique Id for this writer. */
public String getWriterId() {
return streamWriter.getWriterId();
}

/**
* Gets current descriptor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -173,6 +174,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final AtomicLong inflightWaitSec = new AtomicLong(0);

/*
* A String that uniquely identifies this writer.
*/
private final String writerId = UUID.randomUUID().toString();

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -307,17 +313,19 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed"),
streamName));
streamName,
writerId));
return requestWrapper.appendResult;
}
// Check if queue is going to be full before adding the request.
if ((this.inflightRequests + 1 >= this.maxInflightRequests
|| this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
&& (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
if (this.inflightRequests + 1 >= this.maxInflightRequests) {
throw new Exceptions.InflightRequestsLimitExceededException(
writerId, this.maxInflightRequests);
}
if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) {
throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes);
}
}

if (connectionFinalStatus != null) {
Expand All @@ -326,7 +334,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription(
"Connection is closed due to " + connectionFinalStatus.toString()),
streamName));
streamName,
writerId));
return requestWrapper.appendResult;
}

Expand Down Expand Up @@ -375,6 +384,16 @@ public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}

/** @return a unique Id for the writer. */
public String getWriterId() {
return writerId;
}

/** @return name of the Stream that this writer is working on. */
public String getStreamName() {
return streamName;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down Expand Up @@ -566,7 +585,8 @@ private void cleanupInflightRequests() {
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed, cleanup inflight request"),
streamName);
streamName,
writerId);
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
this.lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,14 @@ public void testMultipleAppendSerializtionErrors()
}
}
}

@Test
public void testWriterId()
throws DescriptorValidationException, IOException, InterruptedException {
JsonStreamWriter writer1 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer1.getWriterId().isEmpty());
JsonStreamWriter writer2 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer2.getWriterId().isEmpty());
Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand All @@ -47,6 +48,7 @@
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
Expand Down Expand Up @@ -552,9 +554,9 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build();
StatusRuntimeException ex =
Exceptions.InflightBytesLimitExceededException ex =
assertThrows(
StatusRuntimeException.class,
Exceptions.InflightBytesLimitExceededException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
Expand All @@ -568,6 +570,8 @@ public void run() throws Throwable {
.contains(
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));

assertEquals(ex.getWriterId(), writer.getWriterId());
assertEquals(1, ex.getCurrentLimit());
writer.close();
}

Expand Down Expand Up @@ -665,6 +669,8 @@ public void testWriterAlreadyClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
assertEquals(actualError.getWriterId(), writer.getWriterId());
assertEquals(actualError.getStreamName(), writer.getStreamName());
}

@Test
Expand All @@ -683,5 +689,17 @@ public void testWriterClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is closed"));
assertEquals(actualError.getWriterId(), writer.getWriterId());
assertEquals(actualError.getStreamName(), writer.getStreamName());
}

@Test
public void testWriterId()
throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
StreamWriter writer1 = getTestStreamWriter();
Assert.assertFalse(writer1.getWriterId().isEmpty());
StreamWriter writer2 = getTestStreamWriter();
Assert.assertFalse(writer2.getWriterId().isEmpty());
Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}
}

0 comments on commit 449353b

Please sign in to comment.