Skip to content

Commit

Permalink
Created new grpc wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Nov 18, 2024
1 parent 6ef36cb commit 88cdbfb
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public class BuiltInMetricsConstant {
DIRECT_PATH_ENABLED_KEY,
DIRECT_PATH_USED_KEY);

public static boolean DIRECT_PATH_ENABLED;
static Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
Aggregation.explicitBucketHistogram(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
Expand Down Expand Up @@ -83,8 +82,6 @@ Map<String, String> createClientAttributes(String projectId, String client_name)
Map<String, String> clientAttributes = new HashMap<>();
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
// TODO: Replace this with real value.
clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
String clientUid = getDefaultTaskValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
import com.google.cloud.spanner.BackupId;
import com.google.cloud.spanner.BuiltInMetricsConstant;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Restore;
import com.google.cloud.spanner.SpannerException;
Expand All @@ -80,13 +79,14 @@
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.encryption.EncryptionConfigProtoMapper;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStubWrapper;
import com.google.cloud.spanner.v1.stub.SpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -278,6 +278,11 @@ public class GapicSpannerRpc implements SpannerRpc {
private final int numChannels;
private final boolean isGrpcGcpExtensionEnabled;

private Supplier<Boolean> directPathEnabledSupplier =
() -> {
return false;
};;

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
}
Expand Down Expand Up @@ -353,7 +358,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
SpannerInterceptorProvider.create(
MoreObjects.firstNonNull(
options.getInterceptorProvider(),
SpannerInterceptorProvider.createDefault(options.getOpenTelemetry())))
SpannerInterceptorProvider.createDefault(
options.getOpenTelemetry(),
(() -> directPathEnabledSupplier.get()))))
// This sets the trace context headers.
.withTraceContext(endToEndTracingEnabled, options.getOpenTelemetry())
// This sets the response compressor (Server -> Client).
Expand Down Expand Up @@ -410,10 +417,15 @@ public GapicSpannerRpc(final SpannerOptions options) {
/* isAdminClient = */ false, isEmulatorEnabled(options, emulatorHost)))
.build();
ClientContext clientContext = ClientContext.create(spannerStubSettings);
this.spannerStub = GrpcSpannerStubWrapper.create(spannerStubSettings, clientContext);
BuiltInMetricsConstant.DIRECT_PATH_ENABLED =
((GrpcTransportChannel) clientContext.getTransportChannel()).isDirectPath()
&& isAttemptDirectPathXds;
this.spannerStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(
spannerStubSettings, clientContext);
this.directPathEnabledSupplier =
Suppliers.memoize(
() -> {
return ((GrpcTransportChannel) clientContext.getTransportChannel()).isDirectPath()
&& isAttemptDirectPathXds;
});
this.readRetrySettings =
options.getSpannerStubSettings().streamingReadSettings().getRetrySettings();
this.readRetryableCodes =
Expand Down Expand Up @@ -461,7 +473,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
.getStreamWatchdogProvider()
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub = GrpcSpannerStubWrapper.create(pdmlSettings.build());
this.partitionedDmlStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(pdmlSettings.build());
this.instanceAdminStubSettings =
options
.getInstanceAdminStubSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,27 @@
* limitations under the License.
*/

package com.google.cloud.spanner.v1.stub;
package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.rpc.ClientContext;
import com.google.cloud.spanner.v1.stub.GrpcSpannerStub;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import java.io.IOException;

public class GrpcSpannerStubWrapper extends GrpcSpannerStub {
/**
* Wrapper around {@link GrpcSpannerStub} to make the constructor available inside this package.
* This makes it possible to create a {@link GrpcSpannerStub} with a {@link SpannerStubSettings} and
* a {@link ClientContext}.
*/
class GrpcSpannerStubWithStubSettingsAndClientContext extends GrpcSpannerStub {

public static final GrpcSpannerStubWrapper create(
public static final GrpcSpannerStubWithStubSettingsAndClientContext create(
SpannerStubSettings settings, ClientContext clientContext) throws IOException {
return new GrpcSpannerStubWrapper(settings, clientContext);
return new GrpcSpannerStubWithStubSettingsAndClientContext(settings, clientContext);
}

protected GrpcSpannerStubWrapper(SpannerStubSettings settings, ClientContext clientContext)
throws IOException {
protected GrpcSpannerStubWithStubSettingsAndClientContext(
SpannerStubSettings settings, ClientContext clientContext) throws IOException {
super(settings, clientContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.spanner.CompositeTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.spanner.admin.database.v1.DatabaseName;
Expand Down Expand Up @@ -93,8 +94,12 @@ class HeaderInterceptor implements ClientInterceptor {
private static final Level LEVEL = Level.INFO;
private final SpannerRpcMetrics spannerRpcMetrics;

HeaderInterceptor(SpannerRpcMetrics spannerRpcMetrics) {
private final Supplier<Boolean> directPathEnabledSupplier;

HeaderInterceptor(
SpannerRpcMetrics spannerRpcMetrics, Supplier<Boolean> directPathEnabledSupplier) {
this.spannerRpcMetrics = spannerRpcMetrics;
this.directPathEnabledSupplier = directPathEnabledSupplier;
}

@Override
Expand Down Expand Up @@ -230,7 +235,7 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
BuiltInMetricsConstant.INSTANCE_ID_KEY.getKey(), databaseName.getInstance());
attributes.put(
BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY.getKey(),
String.valueOf(BuiltInMetricsConstant.DIRECT_PATH_ENABLED));
String.valueOf(this.directPathEnabledSupplier.get()));
return attributes;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down Expand Up @@ -46,11 +48,22 @@ public static SpannerInterceptorProvider createDefault() {
}

public static SpannerInterceptorProvider createDefault(OpenTelemetry openTelemetry) {
return createDefault(
openTelemetry,
Suppliers.memoize(
() -> {
return false;
}));
}

public static SpannerInterceptorProvider createDefault(
OpenTelemetry openTelemetry, Supplier<Boolean> directPathEnabledSupplier) {
List<ClientInterceptor> defaultInterceptorList = new ArrayList<>();
defaultInterceptorList.add(new SpannerErrorInterceptor());
defaultInterceptorList.add(
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));
defaultInterceptorList.add(new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry)));
defaultInterceptorList.add(
new HeaderInterceptor(new SpannerRpcMetrics(openTelemetry), directPathEnabledSupplier));
return new SpannerInterceptorProvider(ImmutableList.copyOf(defaultInterceptorList));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public static void setup() {
Attributes.builder()
.put(BuiltInMetricsConstant.PROJECT_ID_KEY, "test-project")
.put(BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY, "unknown")
.put(BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY, "false")
.put(
BuiltInMetricsConstant.LOCATION_ID_KEY,
BuiltInOpenTelemetryMetricsProvider.detectClientLocation())
Expand Down

0 comments on commit 88cdbfb

Please sign in to comment.