Skip to content

Commit

Permalink
Created new grpc wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Nov 18, 2024
1 parent 387fad5 commit 04c0b95
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public class BuiltInMetricsConstant {
DIRECT_PATH_ENABLED_KEY,
DIRECT_PATH_USED_KEY);

public static boolean DIRECT_PATH_ENABLED;
static Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
Aggregation.explicitBucketHistogram(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
import com.google.cloud.spanner.BackupId;
import com.google.cloud.spanner.BuiltInMetricsConstant;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Restore;
import com.google.cloud.spanner.SpannerException;
Expand All @@ -87,6 +86,8 @@
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -278,6 +279,11 @@ public class GapicSpannerRpc implements SpannerRpc {
private final int numChannels;
private final boolean isGrpcGcpExtensionEnabled;

private Supplier<Boolean> directPathEnabledSupplier =
() -> {
return false;
};;

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
}
Expand Down Expand Up @@ -353,7 +359,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
// This sets the response compressor (Server -> Client).
Expand Down Expand Up @@ -411,9 +419,12 @@ public GapicSpannerRpc(final SpannerOptions options) {
.build();
ClientContext clientContext = ClientContext.create(spannerStubSettings);
this.spannerStub = GrpcSpannerStubWrapper.create(spannerStubSettings, clientContext);
BuiltInMetricsConstant.DIRECT_PATH_ENABLED =
((GrpcTransportChannel) clientContext.getTransportChannel()).isDirectPath()
&& isAttemptDirectPathXds;
this.directPathEnabledSupplier =
Suppliers.memoize(
() -> {
return ((GrpcTransportChannel) clientContext.getTransportChannel()).isDirectPath()
&& isAttemptDirectPathXds;
});
this.readRetrySettings =
options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
this.readRetryableCodes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.spanner.admin.database.v1.DatabaseName;
Expand Down Expand Up @@ -93,8 +94,12 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Level LEVEL = Level.INFO;
private final SpannerRpcMetrics spannerRpcMetrics;

HeaderInterceptor(SpannerRpcMetrics spannerRpcMetrics) {
private final Supplier<Boolean> directPathEnabledSupplier;

HeaderInterceptor(
SpannerRpcMetrics spannerRpcMetrics, Supplier<Boolean> directPathEnabledSupplier) {
this.spannerRpcMetrics = spannerRpcMetrics;
this.directPathEnabledSupplier = directPathEnabledSupplier;
}

@Override
Expand Down Expand Up @@ -230,7 +235,7 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
BuiltInMetricsConstant.INSTANCE_ID_KEY.getKey(), databaseName.getInstance());
attributes.put(
BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY.getKey(),
String.valueOf(BuiltInMetricsConstant.DIRECT_PATH_ENABLED));
String.valueOf(this.directPathEnabledSupplier.get()));
return attributes;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down Expand Up @@ -46,11 +48,22 @@ public static SpannerInterceptorProvider createDefault() {
}

public static SpannerInterceptorProvider createDefault(OpenTelemetry openTelemetry) {
return createDefault(
openTelemetry,
Suppliers.memoize(
() -> {
return false;
}));
}

public static SpannerInterceptorProvider createDefault(
OpenTelemetry openTelemetry, Supplier<Boolean> directPathEnabledSupplier) {
List<ClientInterceptor> defaultInterceptorList = new ArrayList<>();
defaultInterceptorList.add(new SpannerErrorInterceptor());
defaultInterceptorList.add(
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));
defaultInterceptorList.add(new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry)));
defaultInterceptorList.add(
new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry), directPathEnabledSupplier));
return new SpannerInterceptorProvider(ImmutableList.copyOf(defaultInterceptorList));
}

Expand Down

0 comments on commit 04c0b95

Please sign in to comment.