Skip to content

Commit

Permalink
chore(spanner): fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
harshachinta committed Dec 23, 2024
1 parent 82d9fa7 commit a1e72c4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public void setUp() {
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setTrackTransactionStarter()
// The extra BeginTransaction RPC for multiplexed session read-write is causing
// unexpected behavior in tests having a mock on the BeginTransaction RPC. Therefore,
// this is being skipped.
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
.build()
.getService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public void testCommitAborted() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// verify that the there is no test record
try (ResultSet rs =
connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) {
Expand Down Expand Up @@ -112,6 +114,8 @@ public void testCommitAbortedDuringUpdateWithReturning() {
AbortInterceptor interceptor = new AbortInterceptor(0);
try (ITConnection connection =
createConnection(interceptor, new CountTransactionRetryListener())) {
interceptor.setUsingMultiplexedSession(
isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
// verify that the there is no test record
try (ResultSet rs =
connection.executeQuery(Statement.of("SELECT COUNT(*) AS C FROM TEST WHERE ID=1"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,11 @@ boolean isMultiplexedSessionsEnabled(Spanner spanner) {
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession();
}

boolean isMultiplexedSessionsEnabledForRW(Spanner spanner) {
if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) {
return false;
}
return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ static ExecutionStep of(StatementExecutionStep step) {
private boolean onlyInjectOnce = false;
private final Random random = new Random();

private boolean usingMultiplexedsession = false;

public AbortInterceptor(double probability) {
Preconditions.checkArgument(probability >= 0.0D && probability <= 1.0D);
this.probability = probability;
Expand All @@ -110,6 +112,14 @@ public void setOnlyInjectOnce(boolean value) {
this.onlyInjectOnce = value;
}

/**
* Set this value to true if a multiplexed session is being used. Determining this directly from
* TransactionManagerImpl is challenging as it is a private class.
*/
public void setUsingMultiplexedSession(boolean value) {
this.usingMultiplexedsession = value;
}

protected boolean shouldAbort(String statement, ExecutionStep step) {
return probability > random.nextDouble();
}
Expand All @@ -133,27 +143,35 @@ public void intercept(
return;
}
Class<?> cls = Class.forName("com.google.cloud.spanner.TransactionManagerImpl");
Class<?> cls2 =
Class.forName("com.google.cloud.spanner.SessionPool$AutoClosingTransactionManager");
Field delegateField = cls2.getDeclaredField("delegate");
delegateField.setAccessible(true);
watch = watch.reset().start();
while (delegateField.get(tx) == null && watch.elapsed(TimeUnit.MILLISECONDS) < 100) {
Thread.sleep(1L);
}
TransactionManager delegate = (TransactionManager) delegateField.get(tx);
if (delegate == null) {
return;
if (usingMultiplexedsession) {
Field stateField = cls.getDeclaredField("txnState");
stateField.setAccessible(true);
tx.rollback();
stateField.set(tx, TransactionState.ABORTED);
} else {
Class<?> cls2 =
Class.forName(
"com.google.cloud.spanner.SessionPool$AutoClosingTransactionManager");
Field delegateField = cls2.getDeclaredField("delegate");
delegateField.setAccessible(true);
watch = watch.reset().start();
while (delegateField.get(tx) == null && watch.elapsed(TimeUnit.MILLISECONDS) < 100) {
Thread.sleep(1L);
}
TransactionManager delegate = (TransactionManager) delegateField.get(tx);
if (delegate == null) {
return;
}
Field stateField = cls.getDeclaredField("txnState");
stateField.setAccessible(true);

// First rollback the delegate, and then pretend it aborted.
// We should call rollback on the delegate and not the wrapping
// AutoClosingTransactionManager, as the latter would cause the session to be returned
// to the session pool.
delegate.rollback();
stateField.set(delegate, TransactionState.ABORTED);
}
Field stateField = cls.getDeclaredField("txnState");
stateField.setAccessible(true);

// First rollback the delegate, and then pretend it aborted.
// We should call rollback on the delegate and not the wrapping
// AutoClosingTransactionManager, as the latter would cause the session to be returned
// to the session pool.
delegate.rollback();
stateField.set(delegate, TransactionState.ABORTED);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit a1e72c4

Please sign in to comment.