Skip to content

Commit

Permalink
add profiler for request execution details.
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Jul 15, 2024
2 parents 0f16091 + 2178bed commit de95efd
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import java.time.Duration;
Expand All @@ -13,35 +28,40 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

/**
* A profiler that would periodically generate a report for the past period with the latency report
* for the slowest requests. This is used for debugging only.
*
* <p>The report will contain the execution details of the TOP_K slowest requests, one example: ```
* INFO: At system time 1720566109971, in total 2 finished during the last 60000 milliseconds, the
* top 10 long latency requests details report: ----------------------------- Request uuid:
* request_1 with total time 1000 milliseconds Operation name json_to_proto_conversion starts at:
* 1720566109971, ends at: 1720566109971, total time: 200 milliseconds Operation name
* backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 800 milliseconds
* ----------------------------- Request uuid: request_2 with total time 500 milliseconds Operation
* name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 250
* milliseconds Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971,
* total time: 250 milliseconds ```
* <pre>
* The report will contain the execution details of the TOP_K slowest requests, one example:
*
* INFO: During the last 60000 milliseconds at system time 1720825020138, in total 2 requests finished. Total dropped request is 0. The top 10 long latency requests details report:
* -----------------------------
* Request uuid: request_1 with total time 1000 milliseconds
* Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 200 milliseconds
* Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 800 milliseconds
* -----------------------------
* Request uuid: request_2 with total time 500 milliseconds
* Operation name json_to_proto_conversion starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds
* Operation name backend_latency starts at: 1720566109971, ends at: 1720566109971, total time: 250 milliseconds
* ...
* </pre>
*/
public class RequestProfiler {
enum OperationName {
// The total end to end latency for a request.
TOTAL_REQUEST("total_request_time"),
TOTAL_LATENCY("append_request_total_latency"),
// Json to proto conversion time.
JSON_TO_PROTO_CONVERSION("json_to_proto_conversion"),
// Time spent to fetch the table schema when user didn't provide it.
SCHEMA_FECTCHING("schema_fetching"),
SCHEMA_FETCHING("schema_fetching"),
// Time spent within wait queue before it get picked up.
WAIT_QUEUE("wait_queue"),
// Time spent within backend to process the request.
BACKEND_LATENCY("backend_latency");
// Time spent within backend + the time spent over network.
RESPONSE_LATENCY("response_latency");
private final String operationName;

OperationName(String operationName) {
Expand All @@ -51,6 +71,9 @@ enum OperationName {

private static final Logger log = Logger.getLogger(RequestProfiler.class.getName());

// Discard the requests if we are caching too many requests.
private static final int MAX_CACHED_REQUEST = 100000;

// Singleton for easier access.
public static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();

Expand All @@ -66,22 +89,46 @@ enum OperationName {

private Thread flushThread;

// Count the total number of dropped operations.
AtomicLong droppedOperationCount = new AtomicLong(0);

// Mark an operation for a given request id to be start.
void startOperation(OperationName operationName, String requestUniqueId) {
idToIndividualOperation.putIfAbsent(
requestUniqueId, new IndividualRequestProfiler(requestUniqueId));
if (!idToIndividualOperation.containsKey(requestUniqueId)) {
if (idToIndividualOperation.size() > MAX_CACHED_REQUEST) {
log.warning(
String.format(
"startOperation is triggered for request_id: %s that's hasn't "
+ "seen before, this is possible when "
+ "we are recording too much ongoing requests. So far we has dropped %s operations.",
requestUniqueId, droppedOperationCount));
droppedOperationCount.incrementAndGet();
return;
}
idToIndividualOperation.put(requestUniqueId, new IndividualRequestProfiler(requestUniqueId));
}
idToIndividualOperation.get(requestUniqueId).startOperation(operationName);
}

// Mark an operation for a given request id to be end.
void endOperation(OperationName operationName, String requestUniqueId) {
if (!idToIndividualOperation.containsKey(requestUniqueId)) {
log.warning(
String.format(
"endOperation is triggered for request_id: %s that's hasn't "
+ "seen before, this is possible when "
+ "we are recording too much ongoing requests. So far we has dropped %s operations.",
requestUniqueId, droppedOperationCount));
return;
}
idToIndividualOperation.get(requestUniqueId).endOperation(operationName);
}

void flushReport() {
log.info(flushAndGenerateReportText());
}

// Periodically trigger the report generation.
void startPeriodicalReportFlushing() {
this.flushThread =
new Thread(
Expand Down Expand Up @@ -111,6 +158,9 @@ String flushAndGenerateReportText() {
Iterator<Entry<String, IndividualRequestProfiler>> iterator =
idToIndividualOperation.entrySet().iterator();
int finishedRequestCount = 0;
// Iterate through all the requests stats, add to min heap if that's a finished request and has
// longer total
// latency than the least amount of latency in the min heap.
while (iterator.hasNext()) {
Entry<String, IndividualRequestProfiler> individualRequestProfiler = iterator.next();
if (!individualRequestProfiler.getValue().finalized) {
Expand All @@ -128,11 +178,16 @@ String flushAndGenerateReportText() {
iterator.remove();
}

// Generate report for the TOP_K longest requests.
String reportText =
String.format(
"At system time %s, in total %s finished during the "
+ "last %s milliseconds, the top %s long latency requests details report:\n",
System.currentTimeMillis(), finishedRequestCount, FLUSH_PERIOD.toMillis(), TOP_K);
"During the last %s milliseconds at system time %s, in total %s requests finished. Total dropped "
+ "request is %s. The top %s long latency requests details report:\n",
FLUSH_PERIOD.toMillis(),
System.currentTimeMillis(),
finishedRequestCount,
droppedOperationCount.getAndSet(0),
TOP_K);
if (minHeap.isEmpty()) {
reportText += "-----------------------------\n";
reportText += "\t0 requests finished during the last period.";
Expand Down Expand Up @@ -208,7 +263,7 @@ void endOperation(OperationName operationName) {
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
finishedOperations.add(new IndividualOperation(operationName, startTime, endTime, totalTime));
if (operationName == OperationName.TOTAL_REQUEST) {
if (operationName == OperationName.TOTAL_LATENCY) {
finalized = true;
this.totalTime = totalTime;
}
Expand All @@ -222,7 +277,7 @@ String generateReport() {
+ this.totalTime
+ " milliseconds\n";
for (int i = 0; i < finishedOperations.size(); i++) {
if (finishedOperations.get(i).operationName == OperationName.TOTAL_REQUEST) {
if (finishedOperations.get(i).operationName == OperationName.TOTAL_LATENCY) {
continue;
}
message += "\t\t";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import static org.junit.Assert.assertTrue;
Expand All @@ -24,43 +39,43 @@ public class RequestProfilerTest {
@Test
public void testNormalCase() throws Exception {
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, "request_1");
OperationName.TOTAL_LATENCY, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.BACKEND_LATENCY, "request_1");
OperationName.RESPONSE_LATENCY, "request_1");

// Another request starts in the middle
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, "request_2");
OperationName.TOTAL_LATENCY, "request_2");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_2");
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_2");

// Continue request 1
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.BACKEND_LATENCY, "request_1");
OperationName.RESPONSE_LATENCY, "request_1");

// Continue request 2
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.BACKEND_LATENCY, "request_2");
OperationName.RESPONSE_LATENCY, "request_2");
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.BACKEND_LATENCY, "request_2");
OperationName.RESPONSE_LATENCY, "request_2");
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, "request_2");
OperationName.TOTAL_LATENCY, "request_2");

// Continue request 1
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, "request_1");
OperationName.TOTAL_LATENCY, "request_1");

// Test the report generated.
String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
assertTrue(reportText.contains("Request uuid: request_1 with total time"));
assertTrue(reportText.contains("Operation name json_to_proto_conversion starts at"));
assertTrue(reportText.contains("Operation name backend_latency starts at"));
assertTrue(reportText.contains("Operation name response_latency starts at"));
assertTrue(reportText.contains("Request uuid: request_2 with total time"));

// Second time flush is called, it should generate empty report.
Expand All @@ -72,17 +87,17 @@ public void testNormalCase() throws Exception {
public void mixFinishedAndUnfinishedRequest() throws Exception {
// Start request 1.
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, "request_1");
OperationName.TOTAL_LATENCY, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_1");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.BACKEND_LATENCY, "request_1");
OperationName.RESPONSE_LATENCY, "request_1");

// Another request starts in the middle
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, "request_2");
OperationName.TOTAL_LATENCY, "request_2");
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, "request_2");

Expand All @@ -92,13 +107,13 @@ public void mixFinishedAndUnfinishedRequest() throws Exception {

// End one of them.
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, "request_1");
OperationName.TOTAL_LATENCY, "request_1");
reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
assertTrue(reportText.contains("Request uuid: request_1 with total time"));

// End another, expect the first request's log not showing up.
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, "request_2");
OperationName.TOTAL_LATENCY, "request_2");
reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
assertTrue(!reportText.contains("Request uuid: request_1 with total time"));
assertTrue(reportText.contains("Request uuid: request_2 with total time"));
Expand Down Expand Up @@ -126,7 +141,7 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception {
() -> {
String uuid = String.format("request_%s", finalI);
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, uuid);
OperationName.TOTAL_LATENCY, uuid);
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, uuid);
if (slowRequestIndex.contains(finalI)) {
Expand All @@ -147,7 +162,7 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.WAIT_QUEUE, uuid);
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, uuid);
OperationName.TOTAL_LATENCY, uuid);
}));
}

Expand All @@ -156,7 +171,8 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception {
futures.get(i).get();
}
String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
assertTrue(reportText.contains("in total 1000 finished during the last 60000 milliseconds"));
assertTrue(reportText.contains("During the last 60000 milliseconds at system time"));
assertTrue(reportText.contains("in total 1000 requests finished"));
assertTrue(reportText.contains("Request uuid: request_50 with total time"));
assertTrue(reportText.contains("Request uuid: request_40 with total time"));
assertTrue(reportText.contains("Request uuid: request_30 with total time"));
Expand All @@ -183,7 +199,7 @@ public void concurrentProfilingTest_RunWhileFlushing() throws Exception {
try {
String uuid = String.format("request_%s", finalI);
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.TOTAL_REQUEST, uuid);
OperationName.TOTAL_LATENCY, uuid);
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
OperationName.JSON_TO_PROTO_CONVERSION, uuid);
if (slowRequestIndex.contains(finalI)) {
Expand All @@ -200,7 +216,7 @@ public void concurrentProfilingTest_RunWhileFlushing() throws Exception {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.WAIT_QUEUE, uuid);
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
OperationName.TOTAL_REQUEST, uuid);
OperationName.TOTAL_LATENCY, uuid);
String unused =
RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
} catch (InterruptedException e) {
Expand Down

0 comments on commit de95efd

Please sign in to comment.