-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support partitioned queries + data boost in Connection API #2540
Changes from 20 commits
052688a
9817586
dc91c9a
bca9b12
bc5b691
56dfb75
f0e9f5b
af211d3
090868c
31ba134
6960e11
549e870
0b5269e
d5248cc
a461c39
13541b1
3ae95e3
ed6a34e
1b39e8c
9e6dc6e
89889ee
1658a51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,22 @@ | |
import com.google.api.gax.grpc.GrpcCallContext; | ||
import com.google.api.gax.longrunning.OperationFuture; | ||
import com.google.api.gax.rpc.ApiCallContext; | ||
import com.google.cloud.spanner.BatchReadOnlyTransaction; | ||
import com.google.cloud.spanner.BatchTransactionId; | ||
import com.google.cloud.spanner.Dialect; | ||
import com.google.cloud.spanner.ErrorCode; | ||
import com.google.cloud.spanner.Options.QueryOption; | ||
import com.google.cloud.spanner.Options.RpcPriority; | ||
import com.google.cloud.spanner.Partition; | ||
import com.google.cloud.spanner.PartitionOptions; | ||
import com.google.cloud.spanner.ResultSet; | ||
import com.google.cloud.spanner.ResultSets; | ||
import com.google.cloud.spanner.SpannerException; | ||
import com.google.cloud.spanner.SpannerExceptionFactory; | ||
import com.google.cloud.spanner.SpannerOptions; | ||
import com.google.cloud.spanner.Statement; | ||
import com.google.cloud.spanner.Struct; | ||
import com.google.cloud.spanner.Type.StructField; | ||
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement; | ||
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout; | ||
import com.google.common.base.Preconditions; | ||
|
@@ -39,13 +48,15 @@ | |
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.CancellationException; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.GuardedBy; | ||
|
@@ -157,6 +168,38 @@ public void rollbackToSavepoint( | |
"Rollback to savepoint is not supported for " + getUnitOfWorkName()); | ||
} | ||
|
||
@Override | ||
public ApiFuture<ResultSet> partitionQueryAsync( | ||
CallType callType, | ||
ParsedStatement query, | ||
PartitionOptions partitionOptions, | ||
QueryOption... options) { | ||
throw SpannerExceptionFactory.newSpannerException( | ||
ErrorCode.FAILED_PRECONDITION, | ||
"Partition query is not supported for " + getUnitOfWorkName()); | ||
} | ||
|
||
ResultSet partitionQuery( | ||
BatchReadOnlyTransaction transaction, | ||
PartitionOptions partitionOptions, | ||
ParsedStatement query, | ||
QueryOption... options) { | ||
BatchTransactionId transactionId = transaction.getBatchTransactionId(); | ||
List<Partition> partitions = | ||
transaction.partitionQuery(partitionOptions, query.getStatement(), options); | ||
return ResultSets.forRows( | ||
com.google.cloud.spanner.Type.struct( | ||
StructField.of("PARTITION", com.google.cloud.spanner.Type.string())), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we define a private static constant for "PARTITION". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've introduced a local final variable for it. It is not used anywhere else, and it is more readable to have it in the vicinity of where it is being used, than in a constant defined at the top of the file. |
||
partitions.stream() | ||
.map( | ||
partition -> | ||
Struct.newBuilder() | ||
.set("PARTITION") | ||
.to(PartitionId.encodeToString(transactionId, partition)) | ||
.build()) | ||
.collect(Collectors.toList())); | ||
} | ||
|
||
StatementExecutor getStatementExecutor() { | ||
return statementExecutor; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright 2023 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner.connection; | ||
|
||
import com.google.cloud.spanner.ErrorCode; | ||
import com.google.cloud.spanner.SpannerExceptionFactory; | ||
import com.google.cloud.spanner.Statement; | ||
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement; | ||
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException; | ||
import java.lang.reflect.Method; | ||
import java.util.regex.Matcher; | ||
|
||
/** Executor for <code>PARTITION <sql></code> statements. */ | ||
class ClientSideStatementPartitionExecutor implements ClientSideStatementExecutor { | ||
private final ClientSideStatementImpl statement; | ||
private final Method method; | ||
|
||
ClientSideStatementPartitionExecutor(ClientSideStatementImpl statement) throws CompileException { | ||
try { | ||
this.statement = statement; | ||
this.method = | ||
ConnectionStatementExecutor.class.getDeclaredMethod( | ||
statement.getMethodName(), Statement.class); | ||
} catch (Exception e) { | ||
throw new CompileException(e, statement); | ||
} | ||
} | ||
|
||
@Override | ||
public StatementResult execute( | ||
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception { | ||
String sql = getParameterValue(parsedStatement); | ||
return (StatementResult) | ||
method.invoke(connection, parsedStatement.getStatement().toBuilder().withSql(sql).build()); | ||
} | ||
|
||
String getParameterValue(ParsedStatement parsedStatement) { | ||
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments()); | ||
if (matcher.find() && matcher.groupCount() >= 2) { | ||
String space = matcher.group(1); | ||
String value = matcher.group(2); | ||
return (space + value).trim(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Java compiler automatically optimizes this internally, as it is not dynamic concatenation (e.g. in a loop or other hard-to-understand construct), so for these simple cases you should keep it as is. You will see that if you change this to using a StringBuilder, IntelliJ will even give you a warning. |
||
} | ||
throw SpannerExceptionFactory.newSpannerException( | ||
ErrorCode.INVALID_ARGUMENT, | ||
String.format( | ||
"Invalid argument for PARTITION: %s", parsedStatement.getStatement().getSql())); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Copyright 2023 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner.connection; | ||
|
||
import com.google.cloud.spanner.ErrorCode; | ||
import com.google.cloud.spanner.SpannerExceptionFactory; | ||
import com.google.cloud.spanner.Value; | ||
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement; | ||
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException; | ||
import com.google.common.base.Strings; | ||
import java.lang.reflect.Method; | ||
import java.util.regex.Matcher; | ||
|
||
/** Executor for <code>RUN PARTITION <partition_id></code> statements. */ | ||
class ClientSideStatementRunPartitionExecutor implements ClientSideStatementExecutor { | ||
private final ClientSideStatementImpl statement; | ||
private final Method method; | ||
|
||
ClientSideStatementRunPartitionExecutor(ClientSideStatementImpl statement) | ||
throws CompileException { | ||
try { | ||
this.statement = statement; | ||
this.method = | ||
ConnectionStatementExecutor.class.getDeclaredMethod( | ||
statement.getMethodName(), String.class); | ||
} catch (Exception e) { | ||
throw new CompileException(e, statement); | ||
} | ||
} | ||
|
||
@Override | ||
public StatementResult execute( | ||
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception { | ||
String partitionId = getParameterValue(parsedStatement); | ||
if (partitionId == null) { | ||
throw SpannerExceptionFactory.newSpannerException( | ||
ErrorCode.INVALID_ARGUMENT, | ||
"No valid partition id found in statement: " + parsedStatement.getStatement().getSql()); | ||
} | ||
return (StatementResult) method.invoke(connection, partitionId); | ||
} | ||
|
||
String getParameterValue(ParsedStatement parsedStatement) { | ||
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments()); | ||
if (matcher.find() && matcher.groupCount() >= 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some commentary to explain what is going on with this regex. |
||
String value = matcher.group(1); | ||
if (!Strings.isNullOrEmpty(value)) { | ||
return value; | ||
} | ||
} | ||
if (parsedStatement.getStatement().getParameters().size() == 1) { | ||
Value value = parsedStatement.getStatement().getParameters().values().iterator().next(); | ||
return value.getAsString(); | ||
} | ||
return null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright 2023 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.cloud.spanner.connection; | ||
|
||
import com.google.cloud.spanner.ErrorCode; | ||
import com.google.cloud.spanner.SpannerExceptionFactory; | ||
import com.google.cloud.spanner.Statement; | ||
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement; | ||
import com.google.cloud.spanner.connection.ClientSideStatementImpl.CompileException; | ||
import java.lang.reflect.Method; | ||
import java.util.regex.Matcher; | ||
|
||
/** Executor for <code>RUN PARTITIONED QUERY <sql></code> statements. */ | ||
class ClientSideStatementRunPartitionedQueryExecutor implements ClientSideStatementExecutor { | ||
private final ClientSideStatementImpl statement; | ||
private final Method method; | ||
|
||
ClientSideStatementRunPartitionedQueryExecutor(ClientSideStatementImpl statement) | ||
throws CompileException { | ||
try { | ||
this.statement = statement; | ||
this.method = | ||
ConnectionStatementExecutor.class.getDeclaredMethod( | ||
statement.getMethodName(), Statement.class); | ||
} catch (Exception e) { | ||
throw new CompileException(e, statement); | ||
} | ||
} | ||
|
||
@Override | ||
public StatementResult execute( | ||
ConnectionStatementExecutor connection, ParsedStatement parsedStatement) throws Exception { | ||
String sql = getParameterValue(parsedStatement); | ||
return (StatementResult) | ||
method.invoke(connection, parsedStatement.getStatement().toBuilder().withSql(sql).build()); | ||
} | ||
|
||
String getParameterValue(ParsedStatement parsedStatement) { | ||
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments()); | ||
if (matcher.find() && matcher.groupCount() >= 2) { | ||
// Include the spacing group in case the query is enclosed in parentheses like this: | ||
// `run partitioned query(select * from foo)` | ||
String space = matcher.group(1); | ||
String value = matcher.group(2); | ||
return (space + value).trim(); | ||
} | ||
throw SpannerExceptionFactory.newSpannerException( | ||
ErrorCode.INVALID_ARGUMENT, | ||
String.format( | ||
"Invalid argument for RUN PARTITIONED QUERY: %s", | ||
parsedStatement.getStatement().getSql())); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - Should we name this
replace
orreplaceWithSql
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the
withSql
name to be consistent with the existingwithQueryOptions
method on this class, which also replaces any existing value. I'm happy to change it to something else if you feel strongly about this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the naming convention is more like existing
append()
method. So going by that we can name thisreplace()
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done