Skip to content

Commit

Permalink
feat: support partitioned queries + data boost in Connection API (#2540)
Browse files Browse the repository at this point in the history
* feat: support partitioned queries + data boost in Connection API

Adds support for Partitioned Queries and Data Boost in the Connection API.
This enables the use of these features in the JDBC driver and PGAdapter.

* fix: match the correct group in regex

* feat: add more SQL statements for partitioned queries

* chore: refactor client side statements to accept the entire parsed statement

Refactor the internal interface of client-side statements so these receive the
entire parsed statement, including any query parameters in the statement. This
allows us to create client-side statements that actually use the query parameters
that have been specified by the user.

* chore: simplify test

* chore: cleanup differences

* chore: cleanup unrelated changes

* fix: update converter name

* test: add more tests

* chore: add missing license header

* fix: handle empty partitioned queries correctly

* fix: do not use any random staleness for partitioned queries

* fix: only return false for next() if all have finished

* chore: rename to autoPartitionMode

* chore: rename sql statements + add tests for empty results

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* chore: address review comments

* Batch read connection api native adjustments (#2569)

* chore: add ClientSideStatementPartitionExecutor to SpannerFeature

* chore: wrap AbstractStatementParser static initialization in try/catch

* chore: add ClientSideStatementRunPartitionExecutor to SpannerFeature

* chore: add ClientSideStatementRunPartitionedQueryExecutor to SpannerFeature

* chore: lint formatting

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Burke Davison <[email protected]>
  • Loading branch information
3 people authored Aug 4, 2023
1 parent b59940c commit 4e31d04
Show file tree
Hide file tree
Showing 42 changed files with 39,843 additions and 25,929 deletions.
56 changes: 56 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,62 @@
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>

<!-- Partitioned queries in Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitionedParallelism()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>int getMaxPartitions()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isAutoPartitionMode()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDataBoostEnabled()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet partitionQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet runPartition(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.PartitionedQueryResultSet runPartitionedQuery(com.google.cloud.spanner.Statement, com.google.cloud.spanner.PartitionOptions, com.google.cloud.spanner.Options$QueryOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setAutoPartitionMode(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDataBoostEnabled(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitionedParallelism(int)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setMaxPartitions(int)</method>
</difference>
<!-- (Internal change, use stream timeout) -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ private Builder(Statement statement) {
statement.queryOptions == null ? null : statement.queryOptions.toBuilder().build();
}

/** Replaces the current SQL of this builder with the given string. */
public Builder replace(String sql) {
sqlBuffer.replace(0, sqlBuffer.length(), sql);
return this;
}

/** Appends {@code sqlFragment} to the statement. */
public Builder append(String sqlFragment) {
sqlBuffer.append(checkNotNull(sqlFragment));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,22 @@
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
import com.google.common.base.Preconditions;
Expand All @@ -39,13 +48,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -157,6 +168,39 @@ public void rollbackToSavepoint(
"Rollback to savepoint is not supported for " + getUnitOfWorkName());
}

@Override
public ApiFuture<ResultSet> partitionQueryAsync(
CallType callType,
ParsedStatement query,
PartitionOptions partitionOptions,
QueryOption... options) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Partition query is not supported for " + getUnitOfWorkName());
}

ResultSet partitionQuery(
BatchReadOnlyTransaction transaction,
PartitionOptions partitionOptions,
ParsedStatement query,
QueryOption... options) {
final String partitionColumnName = "PARTITION";
BatchTransactionId transactionId = transaction.getBatchTransactionId();
List<Partition> partitions =
transaction.partitionQuery(partitionOptions, query.getStatement(), options);
return ResultSets.forRows(
com.google.cloud.spanner.Type.struct(
StructField.of(partitionColumnName, com.google.cloud.spanner.Type.string())),
partitions.stream()
.map(
partition ->
Struct.newBuilder()
.set(partitionColumnName)
.to(PartitionId.encodeToString(transactionId, partition))
.build())
.collect(Collectors.toList()));
}

StatementExecutor getStatementExecutor() {
return statementExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Internal class for the Spanner Connection API.
Expand Down Expand Up @@ -91,8 +93,7 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
*/

/** Begins a transaction. */
static final ParsedStatement BEGIN_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("BEGIN"));
static final ParsedStatement BEGIN_STATEMENT;

/**
* Create a COMMIT statement to use with the {@link #commit()} method to allow it to be cancelled,
Expand All @@ -104,14 +105,10 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
* #commit()} method is called directly, we do not have a {@link ParsedStatement}, and the method
* uses this statement instead in order to use the same logic as the other statements.
*/
static final ParsedStatement COMMIT_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("COMMIT"));
static final ParsedStatement COMMIT_STATEMENT;

/** The {@link Statement} and {@link Callable} for rollbacks */
static final ParsedStatement ROLLBACK_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("ROLLBACK"));
static final ParsedStatement ROLLBACK_STATEMENT;

/**
* Create a RUN BATCH statement to use with the {@link #executeBatchUpdate(Iterable)} method to
Expand All @@ -124,9 +121,22 @@ public static AbstractStatementParser getInstance(Dialect dialect) {
* and the method uses this statement instead in order to use the same logic as the other
* statements.
*/
static final ParsedStatement RUN_BATCH_STATEMENT =
AbstractStatementParser.getInstance(Dialect.GOOGLE_STANDARD_SQL)
.parse(Statement.of("RUN BATCH"));
static final ParsedStatement RUN_BATCH_STATEMENT;

static {
try {
BEGIN_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("BEGIN"));
COMMIT_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("COMMIT"));
ROLLBACK_STATEMENT = getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("ROLLBACK"));
RUN_BATCH_STATEMENT =
getInstance(Dialect.GOOGLE_STANDARD_SQL).parse(Statement.of("RUN BATCH"));

} catch (Throwable ex) {
Logger logger = Logger.getLogger(AbstractStatementParser.class.getName());
logger.log(Level.SEVERE, "Static initialization failure.", ex);
throw ex;
}
}

/** The type of statement that has been recognized by the parser. */
@InternalApi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>PARTITION &lt;sql&gt;</code> statements. */
class ClientSideStatementPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementPartitionExecutor(ClientSideStatementImpl statement) throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), Statement.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String sql = getParameterValue(parsedStatement);
return (StatementResult)
method.invoke(connection, parsedStatement.getStatement().toBuilder().replace(sql).build());
}

String getParameterValue(ParsedStatement parsedStatement) {
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 2) {
String space = matcher.group(1);
String value = matcher.group(2);
return (space + value).trim();
}
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Invalid argument for PARTITION: %s", parsedStatement.getStatement().getSql()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException;
import com.google.common.base.Strings;
import java.lang.reflect.Method;
import java.util.regex.Matcher;

/** Executor for <code>RUN PARTITION &lt;partition_id&gt;</code> statements. */
class ClientSideStatementRunPartitionExecutor implements ClientSideStatementExecutor {
private final ClientSideStatementImpl statement;
private final Method method;

ClientSideStatementRunPartitionExecutor(ClientSideStatementImpl statement)
throws CompileException {
try {
this.statement = statement;
this.method =
ConnectionStatementExecutor.class.getDeclaredMethod(
statement.getMethodName(), String.class);
} catch (Exception e) {
throw new CompileException(e, statement);
}
}

@Override
public StatementResult execute(
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception {
String partitionId = getParameterValue(parsedStatement);
if (partitionId == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"No valid partition id found in statement: " + parsedStatement.getStatement().getSql());
}
return (StatementResult) method.invoke(connection, partitionId);
}

String getParameterValue(ParsedStatement parsedStatement) {
// The statement has the form `RUN PARTITION ['partition-id']`.
// The regex that is defined for this statement is (simplified) `run\s+partition(?:\s*'(.*)')?`
// This regex has one capturing group, which captures the partition-id inside the single quotes.
// That capturing group is however inside a non-capturing optional group.
// That means that:
// 1. If the matcher matches and returns one or more groups, we know that we have a partition-id
// in the SQL statement itself, as that is the only thing that can be in a capturing group.
// 2. If the matcher matches and returns zero groups, we know that the statement is valid, but
// that it does not contain a partition-id in the SQL statement. The partition-id must then
// be included in the statement as a query parameter.
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments());
if (matcher.find() && matcher.groupCount() >= 1) {
String value = matcher.group(1);
if (!Strings.isNullOrEmpty(value)) {
return value;
}
}
if (parsedStatement.getStatement().getParameters().size() == 1) {
Value value = parsedStatement.getStatement().getParameters().values().iterator().next();
return value.getAsString();
}
return null;
}
}
Loading

0 comments on commit 4e31d04

Please sign in to comment.