Skip to content

Commit

Permalink
feat: add instrumentation for a couple of OpenTelemetry metrics (#2501)
Browse files Browse the repository at this point in the history
* feat: add instrumentation for a couple OpenTelemetry metrics

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
agrawal-siddharth and gcf-owl-bot[bot] authored May 28, 2024
1 parent 9708ef4 commit 195ea96
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.1'
implementation 'com.google.cloud:google-cloud-bigquerystorage:3.5.2'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "3.5.2"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -221,7 +221,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.1
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/3.5.2
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@
<artifactId>google-auth-library-credentials</artifactId>
<version>1.23.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -253,6 +261,24 @@ class ConnectionWorker implements AutoCloseable {
static final Pattern DEFAULT_STREAM_PATTERN =
Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$");

private static String tableMatching = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/";
private static Pattern streamPatternTable = Pattern.compile(tableMatching);
private Meter writeMeter;
static AttributeKey<String> telemetryKeyTableId = AttributeKey.stringKey("table_id");
private static String dataflowPrefix = "dataflow:";
static List<AttributeKey<String>> telemetryKeysTraceId =
new ArrayList<AttributeKey<String>>() {
{
add(AttributeKey.stringKey("trace_field_1"));
add(AttributeKey.stringKey("trace_field_2"));
add(AttributeKey.stringKey("trace_field_3"));
}
};
private Attributes telemetryAttributes;
private LongCounter instrumentIncomingRequestCount;
private LongCounter instrumentIncomingRequestSize;
private LongCounter instrumentIncomingRequestRows;

public static Boolean isDefaultStreamName(String streamName) {
Matcher matcher = DEFAULT_STREAM_PATTERN.matcher(streamName);
return matcher.matches();
Expand All @@ -278,6 +304,85 @@ static String getRoutingHeader(String streamName, String location) {
return project + "locations/" + location;
}

private String getTableName() {
Matcher tableMatcher = streamPatternTable.matcher(this.streamName);
return tableMatcher.find() ? tableMatcher.group(1) : "";
}

private void setTraceIdAttributesPart(
AttributesBuilder builder,
String[] traceIdParts,
int indexPartsToCheck,
int indexTelemetryKeysToUse) {
if ((indexPartsToCheck < traceIdParts.length) && !traceIdParts[indexPartsToCheck].isEmpty()) {
builder.put(
telemetryKeysTraceId.get(indexTelemetryKeysToUse), traceIdParts[indexPartsToCheck]);
}
}

private void setTraceIdAttributes(AttributesBuilder builder) {
if ((this.traceId != null) && !this.traceId.isEmpty()) {
int indexDataflow = this.traceId.toLowerCase().indexOf(dataflowPrefix);
if (indexDataflow >= 0) {
String[] traceIdParts =
this.traceId.substring(indexDataflow + dataflowPrefix.length()).split(":", 8);
setTraceIdAttributesPart(builder, traceIdParts, 0, 0);
setTraceIdAttributesPart(builder, traceIdParts, 1, 1);
setTraceIdAttributesPart(builder, traceIdParts, 2, 2);
}
}
}

private Attributes buildOpenTelemetryAttributes() {
AttributesBuilder builder = Attributes.builder();
String tableName = getTableName();
if (!tableName.isEmpty()) {
builder.put(telemetryKeyTableId, tableName);
}
setTraceIdAttributes(builder);
return builder.build();
}

private void refreshOpenTelemetryTableNameAttributes() {
String tableName = getTableName();
if (!tableName.isEmpty()
&& !tableName.equals(getTelemetryAttributes().get(telemetryKeyTableId))) {
AttributesBuilder builder = getTelemetryAttributes().toBuilder();
builder.put(telemetryKeyTableId, tableName);
this.telemetryAttributes = builder.build();
}
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return telemetryAttributes;
}

private void registerOpenTelemetryMetrics() {
MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider();
writeMeter =
meterProvider
.meterBuilder("com.google.cloud.bigquery.storage.v1.write")
.setInstrumentationVersion(
ConnectionWorker.class.getPackage().getImplementationVersion())
.build();
instrumentIncomingRequestCount =
writeMeter
.counterBuilder("append_requests")
.setDescription("Counts number of incoming requests")
.build();
instrumentIncomingRequestSize =
writeMeter
.counterBuilder("append_request_bytes")
.setDescription("Counts byte size of incoming requests")
.build();
instrumentIncomingRequestRows =
writeMeter
.counterBuilder("append_rows")
.setDescription("Counts number of incoming request rows")
.build();
}

public ConnectionWorker(
String streamName,
String location,
Expand Down Expand Up @@ -312,6 +417,9 @@ public ConnectionWorker(
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
Expand Down Expand Up @@ -507,6 +615,9 @@ private ApiFuture<AppendRowsResponse> appendInternal(
+ requestWrapper.messageSize)));
return requestWrapper.appendResult;
}
instrumentIncomingRequestCount.add(1, getTelemetryAttributes());
instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes());
instrumentIncomingRequestRows.add(message.getProtoRows().getRows().getSerializedRowsCount());
this.lock.lock();
try {
if (userClosed) {
Expand Down Expand Up @@ -783,6 +894,7 @@ private void appendLoop() {
|| (originalRequest.getProtoRows().hasWriterSchema()
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
streamName = originalRequest.getWriteStream();
refreshOpenTelemetryTableNameAttributes();
writerSchema = originalRequest.getProtoRows().getWriterSchema();
isMultiplexing = true;
firstRequestForTableOrSchemaSwitch = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -238,9 +239,7 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows)
return append(streamWriter, rows, -1);
}

/** Distributes the writing of a message to an underlying connection. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
// We are in multiplexing mode after entering the following logic.
ConnectionWorker getConnectionWorker(StreamWriter streamWriter) {
ConnectionWorker connectionWorker;
lock.lock();
try {
Expand Down Expand Up @@ -277,6 +276,13 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
} finally {
lock.unlock();
}
return connectionWorker;
}

/** Distributes the writing of a message to an underlying connection. */
ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows, long offset) {
// We are in multiplexing mode after entering the following logic.
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(streamWriter, rows, offset);
Expand All @@ -294,6 +300,12 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
MoreExecutors.directExecutor());
}

@VisibleForTesting
Attributes getTelemetryAttributes(StreamWriter streamWriter) {
ConnectionWorker connectionWorker = getConnectionWorker(streamWriter);
return connectionWorker.getTelemetryAttributes();
}

/**
* Create a new connection if we haven't reached current maximum, or reuse an existing connection
* with least load.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.util.logging.Logger;

/** Container for global singleton objects. */
class Singletons {

private static final Logger log = Logger.getLogger(Singletons.class.getName());

// Global OpenTelemetry instance
private static OpenTelemetry openTelemetry = null;

static OpenTelemetry getOpenTelemetry() {
if (openTelemetry == null) {
openTelemetry = GlobalOpenTelemetry.get();
}
return openTelemetry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -172,6 +173,15 @@ public ApiFuture<AppendRowsResponse> append(
}
}

@VisibleForTesting
Attributes getTelemetryAttributes(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker().getTelemetryAttributes();
} else {
return connectionWorkerPool().getTelemetryAttributes(streamWriter);
}
}

public void close(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER) {
connectionWorker().close();
Expand Down Expand Up @@ -459,6 +469,11 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
}

@VisibleForTesting
Attributes getTelemetryAttributes() {
return this.singleConnectionOrConnectionPool.getTelemetryAttributes(this);
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
Expand Down
Loading

0 comments on commit 195ea96

Please sign in to comment.