Skip to content

Commit

Permalink
feat: wire and expose profiler api to the StreamWirter/JsonStreamWrit…
Browse files Browse the repository at this point in the history
…er (#2561)

* Add profiler for request execution details. The usage of the new API
will be added in the next PR

* Add profiler for request execution details. The usage of the new API
will be added in the next PR

* add new code change

* feat: wire profiler to the actual codebase

* .
  • Loading branch information
GaoleMeng authored Jul 17, 2024
1 parent 5691bd5 commit 16f19dd
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 128 deletions.
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,9 @@
<className>com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter$Builder setTraceIdBase(java.lang.String)</method>
</difference>
<difference>
<differenceType>1001</differenceType>
<className>com/google/cloud/bigquery/storage/v1/StreamWriter$SingleConnectionOrConnectionPool</className>
</difference>
</differences>

Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -511,6 +503,8 @@ private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) {

private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) {
lock.lock();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
try {
Condition condition = lock.newCondition();
while (shouldWaitForBackoff(requestWrapper)) {
Expand All @@ -519,6 +513,8 @@ private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper)
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
lock.unlock();
}
}
Expand All @@ -539,6 +535,8 @@ private void addMessageToWaitingQueue(
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
hasMessageInWaitingQueue.signal();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
if (addToFront) {
waitingRequestQueue.addFirst(requestWrapper);
} else {
Expand All @@ -547,7 +545,8 @@ private void addMessageToWaitingQueue(
}

/** Schedules the writing of rows at given offset. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset, String requestUniqueId) {
if (this.location != null && !this.location.equals(streamWriter.getLocation())) {
throw new StatusRuntimeException(
Status.fromCode(Code.INVALID_ARGUMENT)
Expand Down Expand Up @@ -584,7 +583,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setDefaultMissingValueInterpretation(
streamWriter.getDefaultValueInterpretation());
}
return appendInternal(streamWriter, requestBuilder.build());
return appendInternal(streamWriter, requestBuilder.build(), requestUniqueId);
}

Boolean isUserClosed() {
Expand All @@ -601,9 +600,9 @@ String getWriteLocation() {
}

private ApiFuture<AppendRowsResponse> appendInternal(
StreamWriter streamWriter, AppendRowsRequest message) {
StreamWriter streamWriter, AppendRowsRequest message, String requestUniqueId) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(message, streamWriter, this.retrySettings);
new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand Down Expand Up @@ -650,11 +649,14 @@ private ApiFuture<AppendRowsResponse> appendInternal(
writerId));
return requestWrapper.appendResult;
}

RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
waitingRequestQueue.addLast(requestWrapper);
hasMessageInWaitingQueue.signal();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
try {
maybeWaitForInflightQuota();
} catch (StatusRuntimeException ex) {
Expand All @@ -663,6 +665,8 @@ private ApiFuture<AppendRowsResponse> appendInternal(
this.inflightBytes -= requestWrapper.messageSize;
throw ex;
}
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
return requestWrapper.appendResult;
} finally {
this.lock.unlock();
Expand Down Expand Up @@ -826,7 +830,12 @@ private void appendLoop() {
// prepended as they need to be sent before new requests.
while (!inflightRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
// Consider the backend latency as completed for the current request.
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
requestWrapper.requestSendTimeStamp = null;
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
waitingRequestQueue.addFirst(requestWrapper);
}

Expand All @@ -836,6 +845,8 @@ private void appendLoop() {
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
waitForBackoffIfNecessary(requestWrapper);
this.inflightRequestQueue.add(requestWrapper);
localQueue.addLast(requestWrapper);
Expand Down Expand Up @@ -876,7 +887,9 @@ private void appendLoop() {
}
while (!localQueue.isEmpty()) {
localQueue.peekFirst().setRequestSendQueueTime();
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
AppendRequestAndResponse wrapper = localQueue.pollFirst();
AppendRowsRequest originalRequest = wrapper.message;
String requestUniqueId = wrapper.requestUniqueId;
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
// Always respect the first writer schema seen by the loop.
if (writerSchema == null) {
Expand Down Expand Up @@ -918,6 +931,9 @@ private void appendLoop() {
}
firstRequestForTableOrSchemaSwitch = false;

RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId);

// Send should only throw an exception if there is a problem with the request. The catch
// block will handle this case, and return the exception with the result.
// Otherwise send will return:
Expand Down Expand Up @@ -1196,6 +1212,8 @@ private void requestCallback(AppendRowsResponse response) {
}
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollFirstInflightRequestQueue();
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
} else if (inflightCleanuped) {
// It is possible when requestCallback is called, the inflight queue is already drained
// because we timed out waiting for done.
Expand Down Expand Up @@ -1227,35 +1245,40 @@ private void requestCallback(AppendRowsResponse response) {
// the current request callback finishes.
threadPool.submit(
() -> {
if (response.hasError()) {
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
log.fine(String.format("Got error message: %s", response.toString()));
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < response.getRowErrorsCount(); i++) {
RowError rowError = response.getRowErrors(i);
rowIndexToErrorMessage.put(
Math.toIntExact(rowError.getIndex()), rowError.getMessage());
try {
if (response.hasError()) {
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
log.fine(String.format("Got error message: %s", response.toString()));
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else if (response.getRowErrorsCount() > 0) {
Map<Integer, String> rowIndexToErrorMessage = new HashMap<>();
for (int i = 0; i < response.getRowErrorsCount(); i++) {
RowError rowError = response.getRowErrors(i);
rowIndexToErrorMessage.put(
Math.toIntExact(rowError.getIndex()), rowError.getMessage());
}
AppendSerializationError exception =
new AppendSerializationError(
response.getError().getCode(),
response.getError().getMessage(),
streamName,
rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
AppendSerializationError exception =
new AppendSerializationError(
response.getError().getCode(),
response.getError().getMessage(),
streamName,
rowIndexToErrorMessage);
requestWrapper.appendResult.setException(exception);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
requestWrapper.appendResult.set(response);
}
} else {
requestWrapper.appendResult.set(response);
} finally {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId);
}
});
}
Expand Down Expand Up @@ -1367,6 +1390,9 @@ static final class AppendRequestAndResponse {
Instant blockMessageSendDeadline;

Integer retryCount;

// Unique identifier for the request.
String requestUniqueId;
ExponentialRetryAlgorithm retryAlgorithm;

// The writer that issues the call of the request.
Expand All @@ -1379,11 +1405,15 @@ static final class AppendRequestAndResponse {
Instant requestSendTimeStamp;

AppendRequestAndResponse(
AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings) {
AppendRowsRequest message,
StreamWriter streamWriter,
RetrySettings retrySettings,
String requestUniqueId) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
this.requestUniqueId = requestUniqueId;
this.blockMessageSendDeadline = Instant.now();
this.retryCount = 0;
// To be set after first retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,6 @@ public static void setOptions(Settings settings) {
ConnectionWorkerPool.settings = settings;
}

/** Distributes the writing of a message to an underlying connection. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
return append(streamWriter, rows, -1);
}

ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
ConnectionWorker connectionWorker;
lock.lock();
Expand Down Expand Up @@ -280,12 +275,13 @@ ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
}

/** Distributes the writing of a message to an underlying connection. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset, String uniqueRequestId) {
// We are in multiplexing mode after entering the following logic.
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(streamWriter, rows, offset);
connectionWorker.append(streamWriter, rows, offset, uniqueRequestId);
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,15 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
return this;
}

/**
* Enable a latency profiler that would periodically generate a detailed latency report for the
* top latency requests. This is currently an experimental API.
*/
public Builder setEnableLatencyProfiler(boolean enableLatencyProfiler) {
this.schemaAwareStreamWriterBuilder.setEnableLatencyProfiler(enableLatencyProfiler);
return this;
}

/**
* Sets the default missing value interpretation value if the column is not presented in the
* missing_value_interpretations map.
Expand Down
Loading

0 comments on commit 16f19dd

Please sign in to comment.