Skip to content

Commit

Permalink
fix: add extra JsonWriterTest to show that the LimitBehavior addition…
Browse files Browse the repository at this point in the history
… is not breaking (#1643)

* fix[1539]

* .

* .

* fix: Add an extra jsonWriterTest for Limit Behavior

* .

* .

* .

* .

* fix an issue that we should reject request before it is added to the queue.

* .

* .
  • Loading branch information
yirutang authored May 8, 2022
1 parent 77f44d8 commit 320f5fc
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ private JsonStreamWriter(Builder builder)
this.protoSchema = ProtoSchemaConverter.convert(this.descriptor);
this.totalMessageSize = protoSchema.getSerializedSize();
streamWriterBuilder.setWriterSchema(protoSchema);
if (builder.flowControlSettings != null) {
streamWriterBuilder.setLimitExceededBehavior(
builder.flowControlSettings.getLimitExceededBehavior());
}
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)
.withDescription("Connection is already closed")));
return requestWrapper.appendResult;
}
// Check if queue is going to be full before adding the request.
if ((this.inflightRequests + 1 >= this.maxInflightRequests
|| this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes)
&& (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException)) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
}

if (connectionFinalStatus != null) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand All @@ -339,29 +349,18 @@ private void maybeWaitForInflightQuota() {
long start_time = System.currentTimeMillis();
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new StatusRuntimeException(
Status.fromCode(Code.RESOURCE_EXHAUSTED)
.withDescription(
"Exceeds client side inflight buffer, consider add more buffer or open more connections."));
} else if (this.limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
} else {
try {
inflightReduced.await(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warning(
"Interrupted while waiting for inflight quota. Stream: "
+ streamName
+ " Error: "
+ e.toString());
throw new StatusRuntimeException(
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
Status.fromCode(Code.CANCELLED)
.withCause(e)
.withDescription("Interrupted while waiting for quota."));
}
}
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
Expand Down Expand Up @@ -812,7 +811,12 @@ public Builder setTraceId(String traceId) {
* @return
*/
public Builder setLimitExceededBehavior(
FlowController.LimitExceededBehavior limitExceededBehavior) {
FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException {
if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
.withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
}
this.limitExceededBehavior = limitExceededBehavior;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,4 +581,26 @@ public void run() throws Throwable {
"Exceeds client side inflight buffer, consider add more buffer or open more connections"));
}
}

// This is to test the new addition didn't break previous settings, i.e., sets the inflight limit
// without limit beahvior.
@Test
public void testFlowControlSettingNoLimitBehavior() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setFlowControlSettings(
FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(1L).build())
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,6 @@ public void testAppendsWithTinyMaxInflightBytesThrow() throws Exception {
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build();
// Server will sleep 100ms before every response.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(100));
long appendCount = 10;
for (int i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
Expand All @@ -577,6 +571,29 @@ public void run() throws Throwable {
writer.close();
}

@Test
public void testLimitBehaviorIgnoreNotAccepted() throws Exception {
StatusRuntimeException ex =
assertThrows(
StatusRuntimeException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM, client)
.setWriterSchema(createProtoSchema())
.setMaxInflightBytes(1)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build();
}
});
assertEquals(ex.getStatus().getCode(), Status.INVALID_ARGUMENT.getCode());
assertTrue(
ex.getStatus()
.getDescription()
.contains("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
}

@Test
public void testMessageTooLarge() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down

0 comments on commit 320f5fc

Please sign in to comment.