From c99bd106056b860b8fbeb497a6639a8803e5f5d0 Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Fri, 12 Apr 2024 13:21:49 -0700 Subject: [PATCH] feat: add instrumentation for a couple OpenTelemetry metrics --- google-cloud-bigquerystorage/pom.xml | 4 + .../bigquery/storage/v1/ConnectionWorker.java | 107 ++++++++++++++++++ .../cloud/bigquery/storage/v1/Singletons.java | 36 ++++++ .../storage/v1/ConnectionWorkerTest.java | 58 ++++++++++ pom.xml | 8 +- 5 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index e9b4594dc4..81d8be5a86 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -156,6 +156,10 @@ google-auth-library-credentials 1.23.0 + + io.opentelemetry + opentelemetry-api + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 4608dc942a..0d86f5e77f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -36,13 +36,21 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterProvider; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -250,6 +258,23 @@ class ConnectionWorker implements AutoCloseable { private static String projectMatching = "projects/[^/]+/"; private static Pattern streamPatternProject = Pattern.compile(projectMatching); + private Meter writeMeter; + static AttributeKey telemetryKeyStreamId = AttributeKey.stringKey("streamId"); + static AttributeKey telemetryKeyWriterId = AttributeKey.stringKey("writerId"); + static List> telemetryKeysTraceId = + new ArrayList>() { + { + add(AttributeKey.stringKey("traceField0")); + add(AttributeKey.stringKey("traceField1")); + add(AttributeKey.stringKey("traceField2")); + add(AttributeKey.stringKey("traceField3")); + add(AttributeKey.stringKey("traceField4")); + } + }; + private Attributes telemetryAttributes; + private LongCounter instrumentIncomingRequestCount; + private LongCounter instrumentIncomingRequestSize; + static final Pattern DEFAULT_STREAM_PATTERN = Pattern.compile("projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)/(streams/)?_default$"); @@ -278,6 +303,83 @@ static String getRoutingHeader(String streamName, String location) { return project + "locations/" + location; } + private Attributes buildOpenTelemetryAttributes() { + AttributesBuilder builder = Attributes.builder().put(telemetryKeyStreamId, this.streamName); + builder.put(telemetryKeyWriterId, this.writerId); + if ((this.traceId != null) && !this.traceId.isEmpty()) { + String[] traceIdParts = this.traceId.split(":", 5); + for (int i = 0; i < traceIdParts.length; i++) { + builder.put(telemetryKeysTraceId.get(i), traceIdParts[i]); + } + } + return builder.build(); + } + + @VisibleForTesting + Attributes getTelemetryAttributes() { + // streamName can change due to multiplexing. If it has changed, update the attributes. + String originalStreamNameAttribute = telemetryAttributes.get(telemetryKeyStreamId); + if ((originalStreamNameAttribute != null) + && !originalStreamNameAttribute.equals(this.streamName)) { + AttributesBuilder builder = telemetryAttributes.toBuilder(); + builder.put("streamId", this.streamName); + telemetryAttributes = builder.build(); + } + return telemetryAttributes; + } + + private void registerOpenTelemetryMetrics() { + MeterProvider meterProvider = Singletons.getOpenTelemetry().getMeterProvider(); + writeMeter = + meterProvider + .meterBuilder("com.google.cloud.bigquery.storage.v1.write") + .setInstrumentationVersion( + ConnectionWorker.class.getPackage().getImplementationVersion()) + .build(); + instrumentIncomingRequestCount = + writeMeter + .counterBuilder("incoming-request-count") + .setDescription("Counts number of incoming requests") + .build(); + instrumentIncomingRequestSize = + writeMeter + .counterBuilder("incoming-request-size") + .setDescription("Counts byte size of incoming requests") + .build(); + writeMeter + .gaugeBuilder("waiting-queue-length") + .ofLongs() + .setDescription( + "Reports length of waiting queue. This queue contains requests buffered in the client and not yet sent to the server.") + .buildWithCallback( + result -> { + long waitQueueSize = 0; + this.lock.lock(); + try { + waitQueueSize = this.waitingRequestQueue.size(); + } finally { + this.lock.unlock(); + } + result.record(waitQueueSize, getTelemetryAttributes()); + }); + writeMeter + .gaugeBuilder("inflight-queue-length") + .ofLongs() + .setDescription( + "Reports length of inflight queue. This queue contains sent append requests waiting for response from server.") + .buildWithCallback( + result -> { + long inflightQueueSize = 0; + this.lock.lock(); + try { + inflightQueueSize = this.inflightRequestQueue.size(); + } finally { + this.lock.unlock(); + } + result.record(inflightQueueSize, getTelemetryAttributes()); + }); + } + public ConnectionWorker( String streamName, String location, @@ -312,6 +414,9 @@ public ConnectionWorker( this.inflightRequestQueue = new LinkedList(); this.compressorName = compressorName; this.retrySettings = retrySettings; + this.telemetryAttributes = buildOpenTelemetryAttributes(); + registerOpenTelemetryMetrics(); + // Always recreate a client for connection worker. HashMap newHeaders = new HashMap<>(); newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders()); @@ -506,6 +611,8 @@ private ApiFuture appendInternal( + requestWrapper.messageSize))); return requestWrapper.appendResult; } + instrumentIncomingRequestCount.add(1, getTelemetryAttributes()); + instrumentIncomingRequestSize.add(requestWrapper.messageSize, getTelemetryAttributes()); this.lock.lock(); try { if (userClosed) { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java new file mode 100644 index 0000000000..aae8cd99dd --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Singletons.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import java.util.logging.Logger; + +/** Container for global singleton objects. */ +class Singletons { + + private static final Logger log = Logger.getLogger(Singletons.class.getName()); + + // Global OpenTelemetry instance + private static OpenTelemetry openTelemetry = null; + + static OpenTelemetry getOpenTelemetry() { + if (openTelemetry == null) { + openTelemetry = GlobalOpenTelemetry.get(); + } + return openTelemetry; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 71e4d47673..1404e571be 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -33,6 +33,7 @@ import com.google.protobuf.DescriptorProtos; import com.google.protobuf.Int64Value; import io.grpc.StatusRuntimeException; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -741,6 +742,63 @@ public void testLongTimeIdleWontFail() throws Exception { } } + @Test + public void testOpenTelemetryAttributesWithTraceId() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + "A:B:C", + null, + client.getSettings(), + retrySettings); + + Attributes attributes = connectionWorker.getTelemetryAttributes(); + String attributeStreamId = attributes.get(ConnectionWorker.telemetryKeyStreamId); + assertEquals(attributeStreamId, TEST_STREAM_1); + String attributesWriterId = attributes.get(ConnectionWorker.telemetryKeyWriterId); + assertEquals(attributesWriterId, connectionWorker.getWriterId()); + String attributesTraceId0 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(0)); + assertEquals(attributesTraceId0, "A"); + String attributesTraceId1 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(1)); + assertEquals(attributesTraceId1, "B"); + String attributesTraceId2 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(2)); + assertEquals(attributesTraceId2, "C"); + String attributesTraceId3 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(3)); + assertEquals(attributesTraceId3, null); + } + + @Test + public void testOpenTelemetryAttributesWithoutTraceId() throws Exception { + ProtoSchema schema1 = createProtoSchema("foo"); + StreamWriter sw1 = + StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build(); + ConnectionWorker connectionWorker = + new ConnectionWorker( + TEST_STREAM_1, + null, + createProtoSchema("foo"), + 100000, + 100000, + Duration.ofSeconds(100), + FlowController.LimitExceededBehavior.Block, + null, + null, + client.getSettings(), + retrySettings); + Attributes attributes = connectionWorker.getTelemetryAttributes(); + String attributesTraceId0 = attributes.get(ConnectionWorker.telemetryKeysTraceId.get(0)); + assertEquals(attributesTraceId0, null); + } + @Test public void testLocationName() throws Exception { assertEquals( diff --git a/pom.xml b/pom.xml index 0729a54e66..466707d326 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,13 @@ json 20240303 - + + io.opentelemetry + opentelemetry-bom + 1.38.0 + pom + import +