Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [WIP] support read-committed isolation level #2173

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,24 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.ResultSet analyzeUpdateStatement(com.google.cloud.spanner.Statement, com.google.cloud.spanner.ReadContext$QueryAnalyzeMode, com.google.cloud.spanner.Options$UpdateOption[])</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.IsolationLevel getDefaultIsolationLevel()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDefaultIsolationLevel(com.google.cloud.spanner.connection.IsolationLevel)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.IsolationLevel getTransactionIsolationLevel()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setTransactionIsolationLevel(com.google.cloud.spanner.connection.IsolationLevel)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public boolean isActive() {
* Check that the current transaction actually has a valid underlying transaction. If not, the
* method will throw a {@link SpannerException}.
*/
abstract void checkValidTransaction();
abstract void checkValidTransaction(ParsedStatement statement);

/** Returns the {@link ReadContext} that can be used for queries on this transaction. */
abstract ReadContext getReadContext();
Expand All @@ -63,7 +63,7 @@ public ApiFuture<ResultSet> executeQueryAsync(
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(statement.isQuery(), "Statement is not a query");
checkValidTransaction();
checkValidTransaction(statement);
return executeStatementAsync(
statement,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ public PgTransactionMode convert(String value) {
.startsWith("isolation level serializable")) {
currentIndex += "isolation level serializable".length();
mode.setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_SERIALIZABLE);
} else if (valueWithSingleSpaces
.substring(currentIndex)
.startsWith("isolation level read committed")) {
currentIndex += "isolation level read committed".length();
mode.setIsolationLevel(IsolationLevel.ISOLATION_LEVEL_READ_COMMITTED);
} else if (valueWithSingleSpaces
.substring(currentIndex)
.startsWith("isolation level default")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ public interface Connection extends AutoCloseable {
/** @return <code>true</code> if this connection is in read-only mode */
boolean isReadOnly();

/** Sets the default isolation level to use for new read/write transactions. */
default void setDefaultIsolationLevel(IsolationLevel isolationLevel) {
throw new UnsupportedOperationException();
}

/** Returns the default isolation level that is used for new read/write transactions. */
default IsolationLevel getDefaultIsolationLevel() {
throw new UnsupportedOperationException();
}

/**
* Sets the isolation level to use for current transaction. This method may only be called when in
* a transaction, and before the transaction is actually started, i.e. before any statements have
* been executed in the transaction.
*/
default void setTransactionIsolationLevel(IsolationLevel isolationLevel) {
throw new UnsupportedOperationException();
}

/**
* @return the transaction isolation level of the current transaction. This method may only be
* called when the connection is in a transaction.
*/
default IsolationLevel getTransactionIsolationLevel() {
throw new UnsupportedOperationException();
}

/**
* Sets the duration the connection should wait before automatically aborting the execution of a
* statement. The default is no timeout. Statement timeouts are applied all types of statements,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private DatabaseClient dbClient;
private boolean autocommit;
private boolean readOnly;
private IsolationLevel defaultIsolationLevel = IsolationLevel.SERIALIZABLE;
private IsolationLevel transactionIsolationLevel = IsolationLevel.SERIALIZABLE;
private boolean returnCommitStats;

private UnitOfWork currentUnitOfWork = null;
Expand Down Expand Up @@ -230,6 +232,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.dbClient = spanner.getDatabaseClient(options.getDatabaseId());
this.retryAbortsInternally = options.isRetryAbortsInternally();
this.readOnly = options.isReadOnly();
this.defaultIsolationLevel = options.getDefaultIsolationLevel();
this.autocommit = options.isAutocommit();
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.rpcPriority = options.getRPCPriority();
Expand All @@ -256,6 +259,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.ddlClient = ddlClient;
this.dbClient = dbClient;
setReadOnly(options.isReadOnly());
setDefaultIsolationLevel(options.getDefaultIsolationLevel());
setAutocommit(options.isAutocommit());
setReturnCommitStats(options.isReturnCommitStats());
setDefaultTransactionOptions();
Expand Down Expand Up @@ -398,23 +402,69 @@ private boolean internalIsAutocommit() {

@Override
public void setReadOnly(boolean readOnly) {
checkValidStateForChangingDefaultTransactionOptions("read-only");
this.readOnly = readOnly;
clearLastTransactionAndSetDefaultTransactionOptions();
}

@Override
public boolean isReadOnly() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.readOnly;
}

@Override
public void setDefaultIsolationLevel(IsolationLevel defaultIsolationLevel) {
checkValidStateForChangingDefaultTransactionOptions("default isolation level");
checkSupportedIsolationLevel(Preconditions.checkNotNull(defaultIsolationLevel));
this.defaultIsolationLevel = defaultIsolationLevel;
clearLastTransactionAndSetDefaultTransactionOptions();
}

@Override
public IsolationLevel getDefaultIsolationLevel() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.defaultIsolationLevel;
}

private void checkSupportedIsolationLevel(IsolationLevel isolationLevel) {
if (!(isolationLevel == IsolationLevel.READ_COMMITTED
|| isolationLevel == IsolationLevel.SERIALIZABLE)) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Only isolation levels %s and %s are supported",
IsolationLevel.READ_COMMITTED, IsolationLevel.SERIALIZABLE));
}
}

private void checkValidStateForChangingDefaultTransactionOptions(String property) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isBatchActive(), "Cannot set read-only while in a batch");
ConnectionPreconditions.checkState(
!isTransactionStarted(), "Cannot set read-only while a transaction is active");
!isBatchActive(), String.format("Cannot set %s while in a batch", property));
ConnectionPreconditions.checkState(
!isTransactionStarted(),
String.format("Cannot set %s while a transaction is active", property));
ConnectionPreconditions.checkState(
!(isAutocommit() && isInTransaction()),
"Cannot set read-only while in a temporary transaction");
String.format("Cannot set default %s in a temporary transaction", property));
ConnectionPreconditions.checkState(
!transactionBeginMarked, "Cannot set read-only when a transaction has begun");
this.readOnly = readOnly;
clearLastTransactionAndSetDefaultTransactionOptions();
!transactionBeginMarked,
String.format("Cannot set %s when a transaction has begun", property));
}

@Override
public boolean isReadOnly() {
public void setTransactionIsolationLevel(IsolationLevel isolationLevel) {
checkValidStateForSetTransactionOption("isolation level");
checkSupportedIsolationLevel(Preconditions.checkNotNull(defaultIsolationLevel));
this.transactionBeginMarked = true;
this.transactionIsolationLevel = isolationLevel;
}

@Override
public IsolationLevel getTransactionIsolationLevel() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.readOnly;
return this.transactionIsolationLevel;
}

private void clearLastTransactionAndSetDefaultTransactionOptions() {
Expand Down Expand Up @@ -558,13 +608,7 @@ public TransactionMode getTransactionMode() {
@Override
public void setTransactionMode(TransactionMode transactionMode) {
Preconditions.checkNotNull(transactionMode);
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set transaction mode while in a batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"The transaction mode cannot be set after the transaction has started");
checkValidStateForSetTransactionOption("transaction mode");
ConnectionPreconditions.checkState(
!isReadOnly() || transactionMode == TransactionMode.READ_ONLY_TRANSACTION,
"The transaction mode can only be READ_ONLY when the connection is in read_only mode");
Expand All @@ -582,13 +626,7 @@ public String getTransactionTag() {

@Override
public void setTransactionTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set transaction tag while in a batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"The transaction tag cannot be set after the transaction has started");
checkValidStateForSetTransactionOption("transaction tag");
ConnectionPreconditions.checkState(
getTransactionMode() == TransactionMode.READ_WRITE_TRANSACTION,
"Transaction tag can only be set for a read/write transaction");
Expand All @@ -597,6 +635,16 @@ public void setTransactionTag(String tag) {
this.transactionTag = tag;
}

private void checkValidStateForSetTransactionOption(String property) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), String.format("Cannot set %s while in a batch", property));
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
String.format("The %s cannot be set after the transaction has started", property));
}

@Override
public String getStatementTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
Expand Down Expand Up @@ -745,6 +793,7 @@ private void setDefaultTransactionOptions() {
? UnitOfWorkType.READ_ONLY_TRANSACTION
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionIsolationLevel = defaultIsolationLevel;
transactionTag = null;
} else {
popUnitOfWorkFromTransactionStack();
Expand Down Expand Up @@ -1301,6 +1350,7 @@ UnitOfWork createNewUnitOfWork() {
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
.setDatabaseClient(dbClient)
.setIsolationLevel(defaultIsolationLevel)
.setRetryAbortsInternally(retryAbortsInternally)
.setReturnCommitStats(returnCommitStats)
.setTransactionRetryListeners(transactionRetryListeners)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public String[] getValidValues() {
private static final boolean DEFAULT_USE_PLAIN_TEXT = false;
static final boolean DEFAULT_AUTOCOMMIT = true;
static final boolean DEFAULT_READONLY = false;
static final IsolationLevel DEFAULT_ISOLATION_LEVEL = IsolationLevel.SERIALIZABLE;
static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true;
private static final String DEFAULT_CREDENTIALS = null;
private static final String DEFAULT_OAUTH_TOKEN = null;
Expand All @@ -183,6 +184,8 @@ public String[] getValidValues() {
public static final String AUTOCOMMIT_PROPERTY_NAME = "autocommit";
/** Name of the 'readonly' connection property. */
public static final String READONLY_PROPERTY_NAME = "readonly";
/** Name of the 'defaultIsolationLevel' connection property. */
public static final String DEFAULT_ISOLATION_LEVEL_PROPERTY_NAME = "defaultIsolationLevel";
/** Name of the 'retry aborts internally' connection property. */
public static final String RETRY_ABORTS_INTERNALLY_PROPERTY_NAME = "retryAbortsInternally";
/** Name of the 'credentials' connection property. */
Expand Down Expand Up @@ -231,6 +234,11 @@ public String[] getValidValues() {
READONLY_PROPERTY_NAME,
"Should the connection start in read-only mode (true/false)",
DEFAULT_READONLY),
ConnectionProperty.createStringProperty(
DEFAULT_ISOLATION_LEVEL_PROPERTY_NAME,
String.format(
"The default isolation level to use for new transactions on this connection. Must be one of %s",
IsolationLevel.getValidNames())),
ConnectionProperty.createBooleanProperty(
RETRY_ABORTS_INTERNALLY_PROPERTY_NAME,
"Should the connection automatically retry Aborted errors (true/false)",
Expand Down Expand Up @@ -547,6 +555,7 @@ public static Builder newBuilder() {

private final boolean autocommit;
private final boolean readOnly;
private final IsolationLevel defaultIsolationLevel;
private final boolean retryAbortsInternally;
private final List<StatementExecutionInterceptor> statementExecutionInterceptors;
private final SpannerOptionsConfigurator configurator;
Expand Down Expand Up @@ -636,6 +645,7 @@ private ConnectionOptions(Builder builder) {

this.autocommit = parseAutocommit(this.uri);
this.readOnly = parseReadOnly(this.uri);
this.defaultIsolationLevel = parseDefaultIsolationLevel(this.uri);
this.retryAbortsInternally = parseRetryAbortsInternally(this.uri);
this.statementExecutionInterceptors =
Collections.unmodifiableList(builder.statementExecutionInterceptors);
Expand Down Expand Up @@ -719,6 +729,20 @@ static boolean parseReadOnly(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_READONLY;
}

@VisibleForTesting
static @Nullable IsolationLevel parseDefaultIsolationLevel(String uri) {
String value = parseUriProperty(uri, DEFAULT_ISOLATION_LEVEL_PROPERTY_NAME);
try {
return value != null ? IsolationLevel.valueOf(value.toUpperCase()) : DEFAULT_ISOLATION_LEVEL;
} catch (IllegalArgumentException exception) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
String.format(
"Invalid value for %s: %s\nValue must be one of %s",
DEFAULT_ISOLATION_LEVEL_PROPERTY_NAME, value, IsolationLevel.getValidNames()));
}
}

@VisibleForTesting
static boolean parseRetryAbortsInternally(String uri) {
String value = parseUriProperty(uri, RETRY_ABORTS_INTERNALLY_PROPERTY_NAME);
Expand Down Expand Up @@ -1026,6 +1050,14 @@ public boolean isReadOnly() {
return readOnly;
}

/**
* The initial default {@link IsolationLevel} for connections created by this {@link
* ConnectionOptions}
*/
public IsolationLevel getDefaultIsolationLevel() {
return defaultIsolationLevel;
}

/**
* The initial retryAbortsInternally value for connections created by this {@link
* ConnectionOptions}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ interface ConnectionStatementExecutor {

StatementResult statementRollback();

StatementResult statementSetTransactionMode(TransactionMode mode);
StatementResult statementSetTransactionMode(PgTransactionMode mode);

StatementResult statementSetPgTransactionMode(PgTransactionMode transactionMode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,10 @@ public StatementResult statementRollback() {
}

@Override
public StatementResult statementSetTransactionMode(TransactionMode mode) {
getConnection().setTransactionMode(mode);
return noResult(SET_TRANSACTION_MODE);
public StatementResult statementSetTransactionMode(PgTransactionMode mode) {
return statementSetPgTransactionMode(mode);
// getConnection().setTransactionMode(mode);
// return noResult(SET_TRANSACTION_MODE);
}

@Override
Expand All @@ -386,6 +387,11 @@ public StatementResult statementSetPgTransactionMode(PgTransactionMode transacti
// no-op
}
}
if (transactionMode.getIsolationLevel() != null) {
getConnection()
.setTransactionIsolationLevel(
transactionMode.getIsolationLevel().getSpannerIsolationLevel());
}
return noResult(SET_TRANSACTION_MODE);
}

Expand All @@ -404,12 +410,16 @@ public StatementResult statementSetPgSessionCharacteristicsTransactionMode(
// no-op
}
}
if (transactionMode.getIsolationLevel() != null) {
getConnection()
.setDefaultIsolationLevel(transactionMode.getIsolationLevel().getSpannerIsolationLevel());
}
return noResult(SET_TRANSACTION_MODE);
}

@Override
public StatementResult statementSetPgDefaultTransactionIsolation(IsolationLevel isolationLevel) {
// no-op
getConnection().setDefaultIsolationLevel(isolationLevel.getSpannerIsolationLevel());
return noResult(SET_DEFAULT_TRANSACTION_ISOLATION);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void retry(AbortedException aborted) throws AbortedException {
.invokeInterceptors(
RUN_BATCH_STATEMENT, StatementExecutionStep.RETRY_STATEMENT, transaction);
try {
transaction.getReadContext().batchUpdate(statements);
transaction.getTransactionContext().batchUpdate(statements);
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
Expand Down
Loading