Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding gfe_latencies metric to built-in metrics #3490

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
public class BuiltInMetricsConstant {

public static final String METER_NAME = "spanner.googleapis.com/internal/client";

public static final String GAX_METER_NAME = OpenTelemetryMetricsRecorder.GAX_METER_NAME;

static final String SPANNER_METER_NAME = "spanner-java";
static final String GFE_LATENCIES_NAME = "gfe_latencies";
static final String OPERATION_LATENCIES_NAME = "operation_latencies";
static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
static final String OPERATION_LATENCY_NAME = "operation_latency";
Expand Down Expand Up @@ -114,27 +114,39 @@ static Map<InstrumentSelector, View> getAllViews() {
ImmutableMap.Builder<InstrumentSelector, View> views = ImmutableMap.builder();
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.OPERATION_LATENCY_NAME,
BuiltInMetricsConstant.OPERATION_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.ATTEMPT_LATENCY_NAME,
BuiltInMetricsConstant.ATTEMPT_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.SPANNER_METER_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.GFE_LATENCIES_NAME,
BuiltInMetricsConstant.AGGREGATION_WITH_MILLIS_HISTOGRAM,
InstrumentType.HISTOGRAM,
"ms");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.OPERATION_COUNT_NAME,
BuiltInMetricsConstant.OPERATION_COUNT_NAME,
Aggregation.sum(),
InstrumentType.COUNTER,
"1");
defineView(
views,
BuiltInMetricsConstant.GAX_METER_NAME,
BuiltInMetricsConstant.ATTEMPT_COUNT_NAME,
BuiltInMetricsConstant.ATTEMPT_COUNT_NAME,
Aggregation.sum(),
Expand All @@ -145,6 +157,7 @@ static Map<InstrumentSelector, View> getAllViews() {

private static void defineView(
ImmutableMap.Builder<InstrumentSelector, View> viewMap,
String meterName,
String metricName,
String metricViewName,
Aggregation aggregation,
Expand All @@ -153,7 +166,7 @@ private static void defineView(
InstrumentSelector selector =
InstrumentSelector.builder()
.setName(BuiltInMetricsConstant.METER_NAME + '/' + metricName)
.setMeterName(BuiltInMetricsConstant.GAX_METER_NAME)
.setMeterName(meterName)
.setType(type)
.setUnit(unit)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.cloud.opentelemetry.detection.AttributeKeys;
import com.google.cloud.opentelemetry.detection.DetectedPlatform;
import com.google.cloud.opentelemetry.detection.GCPPlatformDetector;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.OpenTelemetry;
Expand All @@ -42,6 +44,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -57,6 +60,9 @@ final class BuiltInOpenTelemetryMetricsProvider {

private OpenTelemetry openTelemetry;

private final Cache<String, Map<String, String>> clientAttributesCache =
CacheBuilder.newBuilder().maximumSize(1000).build();

private BuiltInOpenTelemetryMetricsProvider() {}

OpenTelemetry getOrCreateOpenTelemetry(String projectId, @Nullable Credentials credentials) {
Expand All @@ -78,16 +84,29 @@ OpenTelemetry getOrCreateOpenTelemetry(String projectId, @Nullable Credentials c
}
}

Map<String, String> createClientAttributes(String projectId, String client_name) {
Map<String, String> clientAttributes = new HashMap<>();
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
clientAttributes.put(CLIENT_UID_KEY.getKey(), clientUid);
clientAttributes.put(CLIENT_HASH_KEY.getKey(), generateClientHash(clientUid));
return clientAttributes;
Map<String, String> createOrGetClientAttributes(String projectId, String client_name) {
try {
String key = projectId + client_name;
return clientAttributesCache.get(
key,
() -> {
Map<String, String> clientAttributes = new HashMap<>();
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
clientAttributes.put(CLIENT_UID_KEY.getKey(), clientUid);
clientAttributes.put(CLIENT_HASH_KEY.getKey(), generateClientHash(clientUid));
return clientAttributes;
});
} catch (ExecutionException executionException) {
logger.log(
Level.WARNING,
"Unable to get Client Attributes for client side metrics, will skip exporting client side metrics",
executionException);
return null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.core.GaxProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import java.util.HashMap;
import java.util.Map;

/** OpenTelemetry implementation of recording built in metrics. */
public class BuiltInOpenTelemetryMetricsRecorder {

private final DoubleHistogram gfeLatencyRecorder;
private final Map<String, String> attributes = new HashMap<>();

/**
* Creates the following instruments for the following metrics:
*
* <ul>
* <li>GFE Latency: Histogram
* </ul>
*
* @param openTelemetry OpenTelemetry instance
*/
public BuiltInOpenTelemetryMetricsRecorder(
OpenTelemetry openTelemetry, Map<String, String> clientAttributes) {
if (openTelemetry != null && clientAttributes != null) {
gfeLatencyRecorder = null;
return;
}
Meter meter =
openTelemetry
.meterBuilder(BuiltInMetricsConstant.SPANNER_METER_NAME)
.setInstrumentationVersion(GaxProperties.getLibraryVersion(getClass()))
.build();
this.gfeLatencyRecorder =
meter
.histogramBuilder(
BuiltInMetricsConstant.METER_NAME + '/' + BuiltInMetricsConstant.GFE_LATENCIES_NAME)
.setDescription(
"Latency between Google's network receiving an RPC and reading back the first byte of the response")
.setUnit("ms")
.build();
this.attributes.putAll(clientAttributes);
}

/**
* Record the latency between Google's network receiving an RPC and reading back the first byte of
* the response. Data is stored in a Histogram.
*
* @param gfeLatency Attempt Latency in ms
* @param attributes Map of the attributes to store
*/
public void recordGFELatency(double gfeLatency, Map<String, String> attributes) {
if (gfeLatencyRecorder != null) {
this.attributes.putAll(attributes);
gfeLatencyRecorder.record(gfeLatency, toOtelAttributes(this.attributes));
}
}

@VisibleForTesting
Attributes toOtelAttributes(Map<String, String> attributes) {
Preconditions.checkNotNull(attributes, "Attributes map cannot be null");
AttributesBuilder attributesBuilder = Attributes.builder();
attributes.forEach(attributesBuilder::put);
return attributesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,18 @@ public OpenTelemetry getOpenTelemetry() {
}
}

/** Returns an instance of OpenTelemetry object for Built-in Client metrics. */
public OpenTelemetry getBuiltInMetricsOpenTelemetry() {
return this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
this.getProjectId(), getCredentials());
}

/** Returns attributes for an instance of Built-in Client metrics. */
public Map<String, String> getBuiltInMetricsClientAttributes() {
return builtInOpenTelemetryMetricsProvider.createOrGetClientAttributes(
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass()));
}

@Override
public ApiTracerFactory getApiTracerFactory() {
return createApiTracerFactory(false, false);
Expand Down Expand Up @@ -1729,11 +1741,13 @@ private ApiTracerFactory createMetricsApiTracerFactory() {
this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
this.getProjectId(), getCredentials());

return openTelemetry != null
Map<String, String> clientAttributes =
builtInOpenTelemetryMetricsProvider.createOrGetClientAttributes(
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass()));
return openTelemetry != null && clientAttributes != null
? new MetricsTracerFactory(
new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
builtInOpenTelemetryMetricsProvider.createClientAttributes(
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
clientAttributes)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
options.getBuiltInMetricsOpenTelemetry(),
options.getBuiltInMetricsClientAttributes(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.api.gax.tracing.ApiTracer;
import com.google.cloud.spanner.BuiltInMetricsConstant;
import com.google.cloud.spanner.BuiltInOpenTelemetryMetricsRecorder;
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
Expand Down Expand Up @@ -94,12 +95,17 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Level LEVEL = Level.INFO;
private final SpannerRpcMetrics spannerRpcMetrics;

private final BuiltInOpenTelemetryMetricsRecorder builtInOpenTelemetryMetricsRecorder;

private final Supplier<Boolean> directPathEnabledSupplier;

HeaderInterceptor(
SpannerRpcMetrics spannerRpcMetrics, Supplier<Boolean> directPathEnabledSupplier) {
SpannerRpcMetrics spannerRpcMetrics,
BuiltInOpenTelemetryMetricsRecorder builtInOpenTelemetryMetricsRecorder,
Supplier<Boolean> directPathEnabledSupplier) {
this.spannerRpcMetrics = spannerRpcMetrics;
this.directPathEnabledSupplier = directPathEnabledSupplier;
this.builtInOpenTelemetryMetricsRecorder = builtInOpenTelemetryMetricsRecorder;
}

@Override
Expand All @@ -118,17 +124,22 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
Map<String, String> builtInMetricsAttributes =
getBuiltInMetricAttributes(key, databaseName);
Map<String, String> commonBuiltInMetricAttributes =
getCommonBuiltInMetricAttributes(key, databaseName);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata metadata) {
Boolean isDirectPathUsed =
isDirectPathUsed(getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
addBuiltInMetricAttributes(
compositeTracer, builtInMetricsAttributes, isDirectPathUsed);
processHeader(metadata, tagContext, attributes, span);
compositeTracer, commonBuiltInMetricAttributes, isDirectPathUsed);
processHeader(
metadata,
tagContext,
attributes,
span,
getBuiltInMetricAttributes(commonBuiltInMetricAttributes, isDirectPathUsed));
super.onHeaders(metadata);
}
},
Expand All @@ -142,7 +153,11 @@ public void onHeaders(Metadata metadata) {
}

private void processHeader(
Metadata metadata, TagContext tagContext, Attributes attributes, Span span) {
Metadata metadata,
TagContext tagContext,
Attributes attributes,
Span span,
Map<String, String> builtInMetricsAttributes) {
MeasureMap measureMap = STATS_RECORDER.newMeasureMap();
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
if (serverTiming != null && serverTiming.startsWith(SERVER_TIMING_HEADER_PREFIX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this compatible with other server timing headers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More specifically, a server timing header is in the format of [component]; dur=[latency], as stated in the w3c standard. gfet4t7 is just one of the possible component and it is not guaranteed to be the first one, so startsWith shall be changed to something like contains.

Expand All @@ -154,6 +169,7 @@ private void processHeader(

spannerRpcMetrics.recordGfeLatency(latency, attributes);
spannerRpcMetrics.recordGfeHeaderMissingCount(0L, attributes);
builtInOpenTelemetryMetricsRecorder.recordGFELatency(latency, builtInMetricsAttributes);

if (span != null) {
span.setAttribute("gfe_latency", String.valueOf(latency));
Expand Down Expand Up @@ -224,8 +240,8 @@ private Attributes getMetricAttributes(String key, String method, DatabaseName d
});
}

private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName databaseName)
throws ExecutionException {
private Map<String, String> getCommonBuiltInMetricAttributes(
String key, DatabaseName databaseName) throws ExecutionException {
return builtInAttributesCache.get(
key,
() -> {
Expand All @@ -240,17 +256,21 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
});
}

private Map<String, String> getBuiltInMetricAttributes(
Map<String, String> commonBuiltInMetricsAttributes, Boolean isDirectPathUsed) {
Map<String, String> builtInMetricAttributes = new HashMap<>(commonBuiltInMetricsAttributes);
builtInMetricAttributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
return builtInMetricAttributes;
}

private void addBuiltInMetricAttributes(
CompositeTracer compositeTracer,
Map<String, String> builtInMetricsAttributes,
Map<String, String> commonBuiltInMetricsAttributes,
Boolean isDirectPathUsed) {
if (compositeTracer != null) {
// Direct Path used attribute
Map<String, String> attributes = new HashMap<>(builtInMetricsAttributes);
attributes.put(
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));

compositeTracer.addAttributes(attributes);
compositeTracer.addAttributes(
getBuiltInMetricAttributes(commonBuiltInMetricsAttributes, isDirectPathUsed));
}
}

Expand Down
Loading
Loading