Skip to content

Commit

Permalink
perf: minor optimizations to the standard query path (#3101)
Browse files Browse the repository at this point in the history
* perf: minor optimizations to the standard query path

Optimizes the standard query path slightly by adding some caching and
removing regex matching that happened on the critical path of each
query that was executed.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] authored May 21, 2024
1 parent cc3352b commit ec820a1
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
Expand Down Expand Up @@ -457,7 +458,7 @@ void initTransaction() {

// A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
// ignored for query by the server.
private AtomicLong seqNo = new AtomicLong();
private final AtomicLong seqNo = new AtomicLong();

// Allow up to 512MB to be buffered (assuming 1MB chunks). In practice, restart tokens are sent
// much more frequently.
Expand Down Expand Up @@ -488,6 +489,10 @@ long getSeqNo() {
return seqNo.incrementAndGet();
}

protected boolean isReadOnly() {
return true;
}

protected boolean isRouteToLeader() {
return false;
}
Expand Down Expand Up @@ -622,19 +627,18 @@ private ResultSet executeQueryInternal(
@VisibleForTesting
QueryOptions buildQueryOptions(QueryOptions requestOptions) {
// Shortcut for the most common return value.
if (defaultQueryOptions.equals(QueryOptions.getDefaultInstance()) && requestOptions == null) {
return QueryOptions.getDefaultInstance();
if (requestOptions == null) {
return defaultQueryOptions;
}
// Create a builder based on the default query options.
QueryOptions.Builder builder = defaultQueryOptions.toBuilder();
// Then overwrite with specific options for this query.
if (requestOptions != null) {
builder.mergeFrom(requestOptions);
}
return builder.build();
return defaultQueryOptions.toBuilder().mergeFrom(requestOptions).build();
}

RequestOptions buildRequestOptions(Options options) {
// Shortcut for the most common return value.
if (!(options.hasPriority() || options.hasTag() || getTransactionTag() != null)) {
return RequestOptions.getDefaultInstance();
}

RequestOptions.Builder builder = RequestOptions.newBuilder();
if (options.hasPriority()) {
builder.setPriority(options.priority());
Expand All @@ -655,16 +659,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
.setSql(statement.getSql())
.setQueryMode(queryMode)
.setSession(session.getName());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
addParameters(builder, statement.getParameters());
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
Expand All @@ -679,12 +674,26 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
builder.setSeqno(getSeqNo());
if (!isReadOnly()) {
builder.setSeqno(getSeqNo());
}
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

static void addParameters(ExecuteSqlRequest.Builder builder, Map<String, Value> stmtParameters) {
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements, Options options) {
ExecuteBatchDmlRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import org.threeten.bp.Duration;

public class LatencyTest {

public static void main(String[] args) throws Exception {
ThreadFactory threadFactory =
ThreadFactoryUtil.tryCreateVirtualThreadFactory("spanner-async-worker");
if (threadFactory == null) {
return;
}
ScheduledExecutorService service = Executors.newScheduledThreadPool(0, threadFactory);
Spanner spanner =
SpannerOptions.newBuilder()
.setCredentials(
GoogleCredentials.fromStream(
Files.newInputStream(
Paths.get("/Users/loite/Downloads/appdev-soda-spanner-staging.json"))))
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessions(Duration.ofSeconds(5L))
// .setUseMultiplexedSession(true)
.build())
.setUseVirtualThreads(true)
.setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service))
.build()
.getService();
DatabaseClient client =
spanner.getDatabaseClient(
DatabaseId.of("appdev-soda-spanner-staging", "knut-test-ycsb", "latencytest"));
for (int i = 0; i < 1000000; i++) {
try (AsyncResultSet resultSet =
client
.singleUse()
.executeQueryAsync(
Statement.newBuilder("select col_varchar from latency_test where col_bigint=$1")
.bind("p1")
.to(ThreadLocalRandom.current().nextLong(100000L))
.build())) {
while (resultSet.next()) {
for (int col = 0; col < resultSet.getColumnCount(); col++) {
if (resultSet.getValue(col) == null) {
throw new IllegalStateException();
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,6 @@ private ByteString initTransaction(final Options options) {
private void setParameters(
final ExecuteSqlRequest.Builder requestBuilder,
final Map<String, Value> statementParameters) {
if (!statementParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = requestBuilder.getParamsBuilder();
for (Map.Entry<String, Value> param : statementParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
requestBuilder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
AbstractReadContext.addParameters(requestBuilder, statementParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
private final RetrySettings streamingRetrySettings;
private final Set<Code> retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
private final BackOff backOff;
private BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final ISpan span;
Expand Down Expand Up @@ -106,7 +106,6 @@ protected ResumableStreamIterator(
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
}

private ExponentialBackOff newBackOff() {
Expand Down Expand Up @@ -271,7 +270,10 @@ protected PartialResultSet computeNext() {
if (delay != -1) {
backoffSleep(context, delay);
} else {
backoffSleep(context, backOff);
if (this.backOff == null) {
this.backOff = newBackOff();
}
backoffSleep(context, this.backOff);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ interface SessionTransaction {
void close();
}

private static final Map<SpannerRpc.Option, ?>[] CHANNEL_HINT_OPTIONS =
new Map[SpannerOptions.MAX_CHANNELS];

static {
for (int i = 0; i < CHANNEL_HINT_OPTIONS.length; i++) {
CHANNEL_HINT_OPTIONS[i] = optionMap(SessionOption.channelHint(i));
}
}

static final int NO_CHANNEL_HINT = -1;

private final SpannerImpl spanner;
Expand All @@ -125,7 +134,7 @@ interface SessionTransaction {
if (channelHint == NO_CHANNEL_HINT) {
return sessionReference.getOptions();
}
return optionMap(SessionOption.channelHint(channelHint));
return CHANNEL_HINT_OPTIONS[channelHint % CHANNEL_HINT_OPTIONS.length];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ Builder setPoolMaintainerClock(Clock poolMaintainerClock) {
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
public Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
this.useMultiplexedSession = useMultiplexedSession;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
ImmutableSet.of(
"https://www.googleapis.com/auth/spanner.admin",
"https://www.googleapis.com/auth/spanner.data");
private static final int MAX_CHANNELS = 256;
static final int MAX_CHANNELS = 256;
@VisibleForTesting static final int DEFAULT_CHANNELS = 4;
// Set the default number of channels to GRPC_GCP_ENABLED_DEFAULT_CHANNELS when gRPC-GCP extension
// is enabled, to make sure there are sufficient channels available to move the sessions to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ private TransactionContextImpl(Builder builder) {
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
}

@Override
protected boolean isReadOnly() {
return false;
}

@Override
protected boolean isRouteToLeader() {
return true;
Expand Down
Loading

0 comments on commit ec820a1

Please sign in to comment.