Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add backend query options #90

Merged
merged 5 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.Transaction;
Expand All @@ -53,20 +55,86 @@
*/
abstract class AbstractReadContext
implements ReadContext, AbstractResultSet.Listener, SessionTransaction {

abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadContext> {
private SessionImpl session;
private SpannerRpc rpc;
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;

Builder() {}

@SuppressWarnings("unchecked")
B self() {
return (B) this;
}

B setSession(SessionImpl session) {
this.session = session;
return self();
}

B setRpc(SpannerRpc rpc) {
this.rpc = rpc;
return self();
}

B setSpan(Span span) {
this.span = span;
return self();
}

B setDefaultPrefetchChunks(int defaultPrefetchChunks) {
this.defaultPrefetchChunks = defaultPrefetchChunks;
return self();
}

B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
this.defaultQueryOptions = defaultQueryOptions;
return self();
}

abstract T build();
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
*/
static class SingleReadContext extends AbstractReadContext {
static class Builder extends AbstractReadContext.Builder<Builder, SingleReadContext> {
private TimestampBound bound;

private Builder() {}

Builder setTimestampBound(TimestampBound bound) {
this.bound = bound;
return self();
}

@Override
SingleReadContext build() {
return new SingleReadContext(this);
}

SingleUseReadOnlyTransaction buildSingleUseReadOnlyTransaction() {
return new SingleUseReadOnlyTransaction(this);
}
}

static Builder newBuilder() {
return new Builder();
}

final TimestampBound bound;

@GuardedBy("lock")
private boolean used;

SingleReadContext(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.bound = bound;
private SingleReadContext(Builder builder) {
super(builder);
this.bound = builder.bound;
}

@GuardedBy("lock")
Expand Down Expand Up @@ -99,9 +167,8 @@ static class SingleUseReadOnlyTransaction extends SingleReadContext
@GuardedBy("lock")
private Timestamp timestamp;

SingleUseReadOnlyTransaction(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, bound, rpc, defaultPrefetchChunks);
private SingleUseReadOnlyTransaction(SingleReadContext.Builder builder) {
super(builder);
}

@Override
Expand Down Expand Up @@ -139,6 +206,38 @@ public void onTransactionMetadata(Transaction transaction) {

static class MultiUseReadOnlyTransaction extends AbstractReadContext
implements ReadOnlyTransaction {
static class Builder extends AbstractReadContext.Builder<Builder, MultiUseReadOnlyTransaction> {
private TimestampBound bound;
private Timestamp timestamp;
private ByteString transactionId;

private Builder() {}

Builder setTimestampBound(TimestampBound bound) {
this.bound = bound;
return this;
}

Builder setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
return this;
}

Builder setTransactionId(ByteString transactionId) {
this.transactionId = transactionId;
return this;
}

@Override
MultiUseReadOnlyTransaction build() {
return new MultiUseReadOnlyTransaction(this);
}
}

static Builder newBuilder() {
return new Builder();
}

private TimestampBound bound;
private final Object txnLock = new Object();

Expand All @@ -148,27 +247,24 @@ static class MultiUseReadOnlyTransaction extends AbstractReadContext
@GuardedBy("txnLock")
private ByteString transactionId;

MultiUseReadOnlyTransaction(
SessionImpl session, TimestampBound bound, SpannerRpc rpc, int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
MultiUseReadOnlyTransaction(Builder builder) {
super(builder);
checkArgument(
bound.getMode() != TimestampBound.Mode.MAX_STALENESS
&& bound.getMode() != TimestampBound.Mode.MIN_READ_TIMESTAMP,
"Bounded staleness mode %s is not supported for multi-use read-only transactions."
+ " Create a single-use read or read-only transaction instead.",
bound.getMode());
this.bound = bound;
}

MultiUseReadOnlyTransaction(
SessionImpl session,
ByteString transactionId,
Timestamp timestamp,
SpannerRpc rpc,
int defaultPrefetchChunks) {
super(session, rpc, defaultPrefetchChunks);
this.transactionId = transactionId;
this.timestamp = timestamp;
!(builder.bound != null && builder.transactionId != null)
&& !(builder.bound == null && builder.transactionId == null),
"Either TimestampBound or TransactionId must be specified");
if (builder.bound != null) {
checkArgument(
builder.bound.getMode() != TimestampBound.Mode.MAX_STALENESS
&& builder.bound.getMode() != TimestampBound.Mode.MIN_READ_TIMESTAMP,
"Bounded staleness mode %s is not supported for multi-use read-only transactions."
+ " Create a single-use read or read-only transaction instead.",
builder.bound.getMode());
this.bound = builder.bound;
} else {
this.timestamp = builder.timestamp;
this.transactionId = builder.transactionId;
}
Comment on lines +250 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most of this refactoring and extra checks for arguments is not related to the query options work, would it be ok to move this to a separate PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to. The reason for introducing this builder pattern in this PR is that this PR requires an additional object to be passed in to the different transaction classes. This would add another parameter to the constructors of these classes, bringing the total number to 6 in this specific case. As a general rule of thumb, a method (or constructor) in Java should not take more than 4 arguments: https://rules.sonarsource.com/java/RSPEC-107

}

@Override
Expand Down Expand Up @@ -256,6 +352,7 @@ void initTransaction() {
final SpannerRpc rpc;
final Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

@GuardedBy("lock")
private boolean isValid = true;
Expand All @@ -271,16 +368,12 @@ void initTransaction() {
// much more frequently.
private static final int MAX_BUFFERED_CHUNKS = 512;

AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
this(session, rpc, defaultPrefetchChunks, Tracing.getTracer().getCurrentSpan());
}

private AbstractReadContext(
SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks, Span span) {
this.session = session;
this.rpc = rpc;
this.defaultPrefetchChunks = defaultPrefetchChunks;
this.span = span;
AbstractReadContext(Builder<?, ?> builder) {
this.session = builder.session;
this.rpc = builder.rpc;
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
}

long getSeqNo() {
Expand Down Expand Up @@ -341,9 +434,36 @@ private ResultSet executeQueryInternal(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
Options queryOptions = Options.fromQueryOptions(options);
return executeQueryInternalWithOptions(
statement, queryMode, readOptions, null /*partitionToken*/);
statement, queryMode, queryOptions, null /*partitionToken*/);
}

/**
* Determines the {@link QueryOptions} to use for a query. This is determined using the following
* precedence:
*
* <ol>
* <li>Specific {@link QueryOptions} passed in for this query.
* <li>Any value specified in a valid environment variable when the {@link SpannerOptions}
* instance was created.
* <li>The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database
* where the query is executed.
* </ol>
*/
@VisibleForTesting
QueryOptions buildQueryOptions(QueryOptions requestOptions) {
// Shortcut for the most common return value.
if (defaultQueryOptions.equals(QueryOptions.getDefaultInstance()) && requestOptions == null) {
return QueryOptions.getDefaultInstance();
}
// Create a builder based on the default query options.
QueryOptions.Builder builder = defaultQueryOptions.toBuilder();
// Then overwrite with specific options for this query.
if (requestOptions != null) {
builder.mergeFrom(requestOptions);
}
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
Expand All @@ -365,6 +485,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
builder.setTransaction(selector);
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
return builder;
}

Expand Down Expand Up @@ -400,15 +521,15 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Stateme
ResultSet executeQueryInternalWithOptions(
Statement statement,
com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Options readOptions,
Options options,
ByteString partitionToken) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder request = getExecuteSqlRequestBuilder(statement, queryMode);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,41 +45,48 @@ public class BatchClientImpl implements BatchClient {
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = sessionClient.createSession();
return new BatchReadOnlyTransactionImpl(
sessionClient.getSpanner(), session, checkNotNull(bound));
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setRpc(sessionClient.getSpanner().getRpc())
.setTimestampBound(bound)
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
checkNotNull(bound));
}

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) {
SessionImpl session =
sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId());
return new BatchReadOnlyTransactionImpl(
sessionClient.getSpanner(), session, batchTransactionId);
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
.setRpc(sessionClient.getSpanner().getRpc())
.setTransactionId(batchTransactionId.getTransactionId())
.setTimestamp(batchTransactionId.getTimestamp())
.setDefaultQueryOptions(
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()),
batchTransactionId);
}

private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
implements BatchReadOnlyTransaction {
private final String sessionName;
private final Map<SpannerRpc.Option, ?> options;

BatchReadOnlyTransactionImpl(SpannerImpl spanner, SessionImpl session, TimestampBound bound) {
super(
checkNotNull(session),
checkNotNull(bound),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
super(builder.setTimestampBound(bound));
this.sessionName = session.getName();
this.options = session.getOptions();
initTransaction();
}

BatchReadOnlyTransactionImpl(
SpannerImpl spanner, SessionImpl session, BatchTransactionId batchTransactionId) {
super(
checkNotNull(session),
checkNotNull(batchTransactionId).getTransactionId(),
batchTransactionId.getTimestamp(),
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
spanner.getOptions().getPrefetchChunks());
MultiUseReadOnlyTransaction.Builder builder, BatchTransactionId batchTransactionId) {
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionName = session.getName();
this.options = session.getOptions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ SpannerImpl getSpanner() {
return spanner;
}

DatabaseId getDatabaseId() {
return db;
}

/** Create a single session. */
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
Expand Down
Loading