-
Notifications
You must be signed in to change notification settings - Fork 123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support partitioned queries + data boost in Connection API #2540
Conversation
Adds support for Partitioned Queries and Data Boost in the Connection API. This enables the use of these features in the JDBC driver and PGAdapter.
…atement Refactor the internal interface of client-side statements so these receive the entire parsed statement, including any query parameters in the statement. This allows us to create client-side statements that actually use the query parameters that have been specified by the user.
@@ -86,6 +86,12 @@ private Builder(Statement statement) { | |||
statement.queryOptions == null ? null : statement.queryOptions.toBuilder().build(); | |||
} | |||
|
|||
/** Replaces the current SQL of this builder with the given string. */ | |||
public Builder withSql(String sql) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - Should we name this replace
or replaceWithSql
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose the withSql
name to be consistent with the existing withQueryOptions
method on this class, which also replaces any existing value. I'm happy to change it to something else if you feel strongly about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the naming convention is more like existing append()
method. So going by that we can name this replace()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
transaction.partitionQuery(partitionOptions, query.getStatement(), options); | ||
return ResultSets.forRows( | ||
com.google.cloud.spanner.Type.struct( | ||
StructField.of("PARTITION", com.google.cloud.spanner.Type.string())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define a private static constant for "PARTITION".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've introduced a local final variable for it. It is not used anywhere else, and it is more readable to have it in the vicinity of where it is being used, than in a constant defined at the top of the file.
if (matcher.find() && matcher.groupCount() >= 2) { | ||
String space = matcher.group(1); | ||
String value = matcher.group(2); | ||
return (space + value).trim(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion - StringBuilder
would be a bit more optimal as compared to +
operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java compiler automatically optimizes this internally, as it is not dynamic concatenation (e.g. in a loop or other hard-to-understand construct), so for these simple cases you should keep it as is. You will see that if you change this to using a StringBuilder, IntelliJ will even give you a warning.
|
||
String getParameterValue(ParsedStatement parsedStatement) { | ||
Matcher matcher = statement.getPattern().matcher(parsedStatement.getSqlWithoutComments()); | ||
if (matcher.find() && matcher.groupCount() >= 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It could be just me but a first time reader will have little difficulty in understanding what we are doing here. My understanding is we are parsing and obtaining the partition ID from the statement. In what cases will a statement have a groupCount >=1 ?
- Should we beef up the documentation a bit by adding examples for future readers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some commentary to explain what is going on with this regex.
* value to <code>0</code>> to use the number of available processors as returned by {@link | ||
* Runtime#availableProcessors()}. | ||
*/ | ||
void setMaxPartitionedParallelism(int maxThreads); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had this query earlier as well, but thought of clarifying on code review. Isn't it more direct to use "threads" instead of "parallelism" ? Or is this a convention used widely? I would think setMaxPartitionedThreads
is more specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose parallelism over threads to indicate that it is not only the number of threads being used to iterate over the returned results, it is also the maximum number of queries that will be executed in parallel on Cloud Spanner. The latter matters, as it also indicates the amount of resources that this query could potentially consume on Cloud Spanner (and not only the amount of resources on this specific client).
@@ -1234,6 +1372,19 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) { | |||
return internalExecuteBatchUpdateAsync(CallType.ASYNC, parsedStatements); | |||
} | |||
|
|||
private QueryOption[] mergeDataBoost(QueryOption... options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Nit: Should we break databoost as a separate PR? So that its easy to use as a reference later? This PR can probably be about just adding support for partitioned reads?
- I am ok to review this together, but breaking it just adds a future reference PR on how databoost support was added. This suggestion is only if it does not result in a lot of re-work for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in hindsight we should maybe have done that. On the other hand; the only reason to add this to the Connection API is the databoost feature. Partitioned queries have been around for a long time (since 2018 I think), and there has never been a need for it so far in the Connection API. With the release of databoost there is a real use case for it in this API, as it means that users can send queries to a different type of server directly from the JDBC driver.
@@ -1234,6 +1372,19 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) { | |||
return internalExecuteBatchUpdateAsync(CallType.ASYNC, parsedStatements); | |||
} | |||
|
|||
private QueryOption[] mergeDataBoost(QueryOption... options) { | |||
if (this.dataBoostEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query - Just checking if its safe to not generate a query option when dataBoostEnabled
is false? Wouldn't we want to differentiate the case when customer explicitly marked the option as false
vs case where customer did not pass the property in connection string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason that we are not explicitly including a DataBoostOption
when dataBoostEnabled
is false, is that it is a typical configuration option that is by default off, unless it has been enabled in one or the other way. In this case, it is possible that the user has passed a QueryOption
that already contains a dataBoostEnabled
option, and we don't want to override that, unless the user has explicitly turned it on for this connection.
(Unfortunately, the way that QueryOptions
are implemented in the client library, we cannot check the actual values here, as all concrete implementations are package-private classes without any public interface, so we can't check what the user might have passed in.)
public static PartitionId decodeFromString(String id) { | ||
try (ObjectInputStream objectInputStream = | ||
new ObjectInputStream( | ||
new GZIPInputStream(new ByteArrayInputStream(Base64.getUrlDecoder().decode(id))))) { | ||
return (PartitionId) objectInputStream.readObject(); | ||
} catch (Exception exception) { | ||
throw SpannerExceptionFactory.newSpannerException(exception); | ||
} | ||
} | ||
|
||
/** | ||
* @return A string-encoded version of this {@link PartitionId}. This encoded version can be sent | ||
* to any other {@link Connection} to be executed there, including connections on different | ||
* hosts than the current host. | ||
*/ | ||
public static String encodeToString(BatchTransactionId transactionId, Partition partition) { | ||
PartitionId id = new PartitionId(transactionId, partition); | ||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); | ||
try (ObjectOutputStream objectOutputStream = | ||
new ObjectOutputStream(new GZIPOutputStream(byteArrayOutputStream))) { | ||
objectOutputStream.writeObject(id); | ||
} catch (Exception exception) { | ||
throw SpannerExceptionFactory.newSpannerException(exception); | ||
} | ||
return Base64.getUrlEncoder().encodeToString(byteArrayOutputStream.toByteArray()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a general utility to decode and encode (using Gzip)? That could be re-used at more than one place? We can use some generics to model more than one Input/Output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I completely understand what you mean in this case. There are plenty of generic Gzip utils around. The GZIPOutputStream
that is being used here for example uses the built-in Inflater
class in Java (https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/zip/Inflater.html), which already contains a lot of generic methods for compressing/decompressing. There are also no other places in this library where we are currently doing any gzipping. Pre-creating a library for gzipping any kind of object sounds like an example of premature optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not pre-creating a library. But basically breaking down this class into two classes - Partition
and say GzipEncodingUtility
. GzipEncodingUtility
will use generics to take in dynamic input/output types. Partition
class could internally use GzipEncodingUtility
to do what it's doing currently.
"statementType": "PARTITION", | ||
"regex": "(?is)\\A\\s*partition(\\s+|\\()(.*)\\z", | ||
"method": "statementPartition", | ||
"exampleStatements": [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Referring back to a previous comment of mine where I asked for a few examples, can we add a few examples here which can be used to understand the ClientSideStatementPartitionExecutor
code better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some example statements here (and update the test framework a bit, as it was not able to fully cope with these statements that can have anything at the end).
private final Type type; | ||
private final ResultSetMetadata metadata; | ||
|
||
static PartitionExecutorResult data(Struct data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't a builder pattern be better suited for such object construction? Otherwise for every member that we introduce it will be difficult to multiplex such method to partially build the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise for every member that we introduce it will be difficult to multiplex such method to partially build the object.
That is actually the reason that I think that a Builder
pattern is not suitable here. This (internal) object is not intended to allow each possible permutation. By using static builder methods for the specific permutations we allow, we gain the advantage of:
- Being able to check the specific arguments of that permutation (e.g. calling
data(Struct data)
with anull
value should be disallowed (I've added a null-check for that). - The place where it is being called is a lot easier to read:
PartititionExecutorResult.data(...)
is a clear indication that this result contains data, and that it is logical that it does not need to for example also include metadata.
* multiple queries. Each query uses its own {@link RowProducer} that feeds rows into the {@link | ||
* MergedResultSet}. The order of the records in the {@link MergedResultSet} is not guaranteed. | ||
*/ | ||
class MergedResultSet extends ForwardingStructReader implements PartitionedQueryResultSet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Have we explored re-using some elements present in
AsyncResultSetImpl
? Or in another way are there ways in which we can makeMergedResultSet
andAsyncResultSetImpl
have some common code? - Also, there is not much difference in the way we would like to implement batch read in client library and connection API, apart from the fact that connection API supports a bunch of other configurations in connection string that client library does not support.
Am I missing the other major differences between these two implementations of ForwardingStructReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two big differences between AsyncResultSetImpl
and MergedResultSet
:
AsyncResultSetImpl
uses a single-threaded row producer that reads from a single query result.MergedResultSet
uses a multi-threaded row producer that reads from multiple query results in parallel.AsyncResultSetImpl
implements the additionalAsyncResultSet
interface methods that allow users to consume the results in an asynchronous way. That is not needed forMergedResultSet
, as it is intended for APIs that are synchronous by definition (JDBC and PostgreSQL).
So sharing the code between them won't make any of them any simpler.
The main difference between how we implement partitioned queries in the Java client and how we implement them in the Java Connection API is that the Connection API is aimed at supporting standard APIs (JDBC and PostgreSQL) that do not know anything about partitioned queries. That means that:
- Both need to be able to access the feature through SQL statements, as anyone who is using these APIs through a generic tool will not know how to call any custom methods.
- Both need all results to be returned as
ResultSet
s.
In addition, we decided to add a convenience method for executing a query directly as a partitioned query to the Connection API (the executePartitionedQuery(...)
method and RUN PARTITIONED QUERY
SQL statement). This is also needed to use this feature in generic tools that have no idea how to send partition tokens to other hosts. This could also be of interest to the Java client, but for the time being I'm not pushing for that, as the direct request at the moment is to add this feature for JDBC. That means that:
- The
partitionQuery(..)
method in the Connection API is a simple wrapper around thepartitionQuery(..)
method in theBatchClient
, but it returns the results as aResultSet
with encoded strings instead of aList<Partition>
. - The
runPartition(..)
method in the Connection API is a simple wrapper around therunPartition(..)
method in theBatchClient
, but it takes an encoded string instead of aPartition
as an input argument, as aResultSet
cannot contain aPartition
. - The
MergedResultSet
and correspondingrunPartitionedQuery(..)
methods could at a later moment easily be moved to the Java client and used there as well without causing a breaking change. But I would suggest that we wait with that until there is an actual demand for it.
* chore: add ClientSideStatementPartitionExecutor to SpannerFeature * chore: wrap AbstractStatementParser static initialization in try/catch * chore: add ClientSideStatementRunPartitionExecutor to SpannerFeature * chore: add ClientSideStatementRunPartitionedQueryExecutor to SpannerFeature * chore: lint formatting
🤖 I have created a release *beep* *boop* --- ## [6.45.0](https://togithub.com/googleapis/java-spanner/compare/v6.44.0...v6.45.0) (2023-08-04) ### Features * Enable leader aware routing by default in Connection API. This enables its use in the JDBC driver and PGAdapter. The update contains performance optimisations that will reduce the latency of read/write transactions that originate from a region other than the default leader region. ([2a85446](https://togithub.com/googleapis/java-spanner/commit/2a85446b162b006ce84a86285af1767c879b27ed)) * Enable leader aware routing by default. This update contains performance optimisations that will reduce the latency of read/write transactions that originate from a region other than the default leader region. ([441c1b0](https://togithub.com/googleapis/java-spanner/commit/441c1b03c3e976c6304a99fefd93b5c4291e5364)) * Long running transaction clean up background task. Adding configuration options for closing inactive transactions. ([#2419](https://togithub.com/googleapis/java-spanner/issues/2419)) ([423e1a4](https://togithub.com/googleapis/java-spanner/commit/423e1a4b483798d9683ff9bd232b53d76e09beb0)) * Support partitioned queries + data boost in Connection API ([#2540](https://togithub.com/googleapis/java-spanner/issues/2540)) ([4e31d04](https://togithub.com/googleapis/java-spanner/commit/4e31d046f5d80abe8876a729ddba045c70f3261d)) ### Bug Fixes * Apply stream wait timeout ([#2544](https://togithub.com/googleapis/java-spanner/issues/2544)) ([5a12cd2](https://togithub.com/googleapis/java-spanner/commit/5a12cd29601253423c5738be5471a036fd0334be)) ### Dependencies * Update dependency com.google.cloud:google-cloud-shared-dependencies to v3.14.0 ([#2562](https://togithub.com/googleapis/java-spanner/issues/2562)) ([dbd5c75](https://togithub.com/googleapis/java-spanner/commit/dbd5c75be39262003092ff4a925ed470cc45f8be)) * Update dependency org.openjdk.jmh:jmh-core to v1.37 ([#2565](https://togithub.com/googleapis/java-spanner/issues/2565)) ([d5c36bf](https://togithub.com/googleapis/java-spanner/commit/d5c36bfbb67ecb14854944779da6e4dbd93f3559)) * Update dependency org.openjdk.jmh:jmh-generator-annprocess to v1.37 ([#2566](https://togithub.com/googleapis/java-spanner/issues/2566)) ([73e92d4](https://togithub.com/googleapis/java-spanner/commit/73e92d42fe6d334b6efa6485246dc67858adb0a9)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Adds support for Partitioned Queries and Data Boost in the Connection API. This enables the use of these features in the JDBC driver and PGAdapter.
This PR builds on #2556 which is a small refactoring that allows client-side statements to use statement parameters.