Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce InflightLimitExceededException #1746

Merged
merged 14 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,6 @@ protected StreamFinalizedException(Status grpcStatus, String name) {
}
}

/**
* This writer instance has either been closed by the user explicitly, or has encountered
* non-retriable errors.
*
* <p>To continue to write to the same stream, you will need to create a new writer instance.
*/
public static final class StreamWriterClosedException extends StorageException {
protected StreamWriterClosedException(Status grpcStatus, String name) {
super(grpcStatus, name, null, null, ImmutableMap.of());
}
}

/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
Expand Down Expand Up @@ -251,5 +239,80 @@ public String getStreamName() {
}
}

/**
* This writer instance has either been closed by the user explicitly, or has encountered
* non-retriable errors.
*
* <p>To continue to write to the same stream, you will need to create a new writer instance.
*/
public static final class StreamWriterClosedException extends StatusRuntimeException {
private final String streamName;
private final String writerId;

protected StreamWriterClosedException(Status grpcStatus, String streamName, String writerId) {
super(grpcStatus);
this.streamName = streamName;
this.writerId = writerId;
}

public String getStreamName() {
return streamName;
}

public String getWriterId() {
return writerId;
}
}

/**
* If FlowController.LimitExceededBehavior is set to Block and inflight limit is exceeded, this
* exception will be thrown. If it is just a spike, you may retry the request. Otherwise, you can
* increase the inflight limit or create more StreamWriter to handle your traffic.
*/
public static class InflightLimitExceededException extends StatusRuntimeException {
private final long currentLimit;
private final String writerId;

protected InflightLimitExceededException(
Status grpcStatus, String writerId, long currentLimit) {
super(grpcStatus);
this.currentLimit = currentLimit;
this.writerId = writerId;
}

public String getWriterId() {
return writerId;
}

public long getCurrentLimit() {
return currentLimit;
}
}

public static class InflightRequestsLimitExceededException
extends InflightLimitExceededException {
protected InflightRequestsLimitExceededException(String writerId, long currentLimit) {
super(
Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer in terms of number of requests, consider add more buffer or open more connections. Current limit: "
yirutang marked this conversation as resolved.
Show resolved Hide resolved
+ currentLimit),
writerId,
currentLimit);
}
}

public static class InflightBytesLimitExceededException extends InflightLimitExceededException {
protected InflightBytesLimitExceededException(String writerId, long currentLimit) {
super(
Status.fromCode(Status.Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer in terms of bytes, consider add more buffer or open more connections. Current limit: "
+ currentLimit),
writerId,
currentLimit);
}
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,16 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
}
}

/**
* Gets streamName
*
* @return String
*/
/** @return Name of the Stream that this writer is working on. */
yirutang marked this conversation as resolved.
Show resolved Hide resolved
public String getStreamName() {
return this.streamName;
}

/** @return A unique Id for this writer. */
public String getWriterId() {
return streamWriter.getWriterId();
}

/**
* Gets current descriptor
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -173,6 +174,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final AtomicLong inflightWaitSec = new AtomicLong(0);

/*
* A String that uniquely identifies this writer.
*/
private final String writerId = UUID.randomUUID().toString();
yirutang marked this conversation as resolved.
Show resolved Hide resolved

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -307,17 +313,19 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed"),
streamName));
streamName,
writerId));
return requestWrapper.appendResult;
}
// Check if queue is going to be full before adding the request.
if ((this.inflightRequests + 1 >= this.maxInflightRequests
|| this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
&& (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
if (this.inflightRequests + 1 >= this.maxInflightRequests) {
throw new Exceptions.InflightRequestsLimitExceededException(
writerId, this.maxInflightRequests);
}
if (this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) {
throw new Exceptions.InflightBytesLimitExceededException(writerId, this.maxInflightBytes);
}
}

if (connectionFinalStatus != null) {
Expand All @@ -326,7 +334,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription(
"Connection is closed due to " + connectionFinalStatus.toString()),
streamName));
streamName,
writerId));
return requestWrapper.appendResult;
}

Expand Down Expand Up @@ -375,6 +384,16 @@ public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}

/** @return a unique Id for the writer. */
public String getWriterId() {
return writerId;
}

/** @return name of the Stream that this writer is working on. */
public String getStreamName() {
return streamName;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down Expand Up @@ -566,7 +585,8 @@ private void cleanupInflightRequests() {
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("Connection is already closed, cleanup inflight request"),
streamName);
streamName,
writerId);
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
this.lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public void run() throws Throwable {
ex.getStatus()
.getDescription()
.contains(
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
"Exceeds client side inflight buffer in terms of bytes, consider add more buffer or open more connections"));
}
}

Expand Down Expand Up @@ -643,4 +643,14 @@ public void testMultipleAppendSerializtionErrors()
}
}
}

@Test
public void testWriterId()
throws DescriptorValidationException, IOException, InterruptedException {
JsonStreamWriter writer1 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer1.getWriterId().isEmpty());
JsonStreamWriter writer2 = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build();
Assert.assertFalse(writer2.getWriterId().isEmpty());
Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand All @@ -47,6 +48,7 @@
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
Expand Down Expand Up @@ -552,9 +554,9 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build();
StatusRuntimeException ex =
Exceptions.InflightBytesLimitExceededException ex =
assertThrows(
StatusRuntimeException.class,
Exceptions.InflightBytesLimitExceededException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
Expand All @@ -566,8 +568,10 @@ public void run() throws Throwable {
ex.getStatus()
.getDescription()
.contains(
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
"Exceeds client side inflight buffer in terms of bytes, consider add more buffer or open more connections"));

assertEquals(ex.getWriterId(), writer.getWriterId());
assertEquals(1, ex.getCurrentLimit());
writer.close();
}

Expand Down Expand Up @@ -665,6 +669,8 @@ public void testWriterAlreadyClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
assertEquals(actualError.getWriterId(), writer.getWriterId());
assertEquals(actualError.getStreamName(), writer.getStreamName());
}

@Test
Expand All @@ -683,5 +689,17 @@ public void testWriterClosedException() throws Exception {
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is closed"));
assertEquals(actualError.getWriterId(), writer.getWriterId());
assertEquals(actualError.getStreamName(), writer.getStreamName());
}

@Test
public void testWriterId()
throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
StreamWriter writer1 = getTestStreamWriter();
Assert.assertFalse(writer1.getWriterId().isEmpty());
StreamWriter writer2 = getTestStreamWriter();
Assert.assertFalse(writer2.getWriterId().isEmpty());
Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId());
}
}