Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keep track of any BeginTransaction option for a Read #1485

Merged
merged 1 commit into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
stream.setCall(call, selector != null && selector.hasBegin());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
Long value =
client
.readWriteTransaction()
.run(
Expand All @@ -625,6 +625,381 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
return transaction.executeUpdate(UPDATE_STATEMENT);
});
assertThat(value).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try {
return transaction.executeUpdate(UPDATE_STATEMENT);
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatch() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Ignore and let the transaction continue.
// Also make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
} else if (e.getErrorCode() == ErrorCode.ABORTED) {
// Ignore Aborted errors. This will cause the transaction to try to commit.
} else {
// Propagate any other errors (there should not be any in this test case).
throw e;
}
}
return 0L;
});

assertThat(value).isEqualTo(1L);
// 1. The initial attempt will inline the BeginTransaction option.
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
// RPC.
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
// (the ID that was returned by the BeginTransaction RPC of that attempt).
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// There will be 3 attempts to read:
// 1. The first will return CANCELLED.
// 2. The second will return ABORTED.
// 3. The third will return the results.
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
// There are two attempts to commit:
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
// return a transaction ID.
// 2. The second attempt will try to commit, because the BeginTransaction RPC did return a
// transaction ID, and the Aborted error that was returned by the Read operation was caught
// by the application. This means that the TransactionRunner does not know that the
// transaction was aborted. The Commit RPC will return an Aborted error.
// 3. The third attempt will commit, as the Read operation succeeded and returned a
// transaction ID.
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithoutCatch()
throws InterruptedException, ExecutionException {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
// The CANCELLED error is not caught by the application, so it will bubble up and cause the
// transaction to fail.
assertThrows(
SpannerException.class,
() ->
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read(
"FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
}
// Always propagate the error to the TransactionRunner.
throw e;
}
return 0L;
}));

// The initial attempt will inline the BeginTransaction option.
// There is no second attempt as the CANCELLED error is not caught.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
// The CANCELLED error means that there is no transaction ID returned by the Read operation.
// So there is also no transaction to rollback.
assertThat(countRequests(RollbackRequest.class)).isEqualTo(0);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatchForCancelled()
throws InterruptedException, ExecutionException {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Do not propagate the CANCELLED error.
// Make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
} else {
// Propagate all other errors to the TransactionRunner.
throw e;
}
}
return 0L;
});

assertThat(value).isEqualTo(1L);
// 1. The initial attempt will inline the BeginTransaction option.
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
// RPC, because the error was returned by the first statement in the transaction.
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
// (the ID that was returned by the BeginTransaction RPC of that attempt).
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// There will be 3 attempts to read:
// 1. The first will return CANCELLED.
// 2. The second will return ABORTED.
// 3. The third will return the results.
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
// There is only one attempt to commit:
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
// return a transaction ID.
// 2. The second attempt will NOT try to commit, because the Aborted error from the Read
// operation is propagated to the TransactionRunner. This means that the TransactionRunner
// knows that the transaction was aborted, and will automatically initiate a retry without
// first trying to commit the transaction.
// 3. The third attempt will commit, as the Read operation succeeded and returned a
// transaction ID.
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginCommitAfterReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setCommitExecutionTime(
SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableAndCommitAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
final AtomicBoolean firstAttempt = new AtomicBoolean(true);
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
long res = 0L;
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
res = rs.getLong(0);
}
}
if (firstAttempt.compareAndSet(true, false)) {
mockSpanner.abortTransaction(transaction);
}
return res;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =
Expand Down