Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a indicator of how much time a request is waiting for inflight limit #1514

Merged
merged 4 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ public Descriptor getDescriptor() {
return this.descriptor;
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
* constructing the Writer). The value is the wait time for the last sent request. A constant high
* wait value indicates a need for more throughput, you can create a new Stream for to increase
* the throughput in exclusive stream case, or create a new Writer in the default stream case.
*/
public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -159,6 +160,11 @@ public class StreamWriter implements AutoCloseable {
*/
private Thread appendThread;

/*
* The inflight wait time for the previous sent request.
*/
private final AtomicLong inflightWaitSec = new AtomicLong(0);

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -316,6 +322,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message)

@GuardedBy("lock")
private void maybeWaitForInflightQuota() {
long start_time = System.currentTimeMillis();
while (this.inflightRequests >= this.maxInflightRequests
|| this.inflightBytes >= this.maxInflightBytes) {
try {
Expand All @@ -332,6 +339,19 @@ private void maybeWaitForInflightQuota() {
.withDescription("Interrupted while waiting for quota."));
}
}
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000);
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
* constructing the StreamWriter). The value is the wait time for the last sent request. A
* constant high wait value indicates a need for more throughput, you can create a new Stream for
* to increase the throughput in exclusive stream case, or create a new Writer in the default
* stream case.
*/
public long getInflightWaitSeconds() {
return inflightWaitSec.longValue();
}

/** Close the stream writer. Shut down all resources. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,8 @@ public void testOneMaxInflightRequests() throws Exception {
testBigQueryWrite.setResponseSleep(Duration.ofSeconds(1));
testBigQueryWrite.addResponse(createAppendResponse(0));

long appendStartTimeMs = System.currentTimeMillis();
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
long appendElapsedMs = System.currentTimeMillis() - appendStartTimeMs;
assertTrue(appendElapsedMs >= 1000);
assertTrue(writer.getInflightWaitSeconds() >= 1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
writer.close();
}
Expand Down