Skip to content

Commit

Permalink
Add support for transaction-level exclusion from change streams
Browse files Browse the repository at this point in the history
  • Loading branch information
dengwe1 committed Mar 19, 2024
1 parent 2392afe commit 46e0d3e
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public interface ReadOption {}
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to Update and Write operations */
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}

/**
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
* API.
Expand Down Expand Up @@ -108,6 +111,17 @@ public static TransactionOption commitStats() {
public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
* unset.
*/
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -282,6 +296,18 @@ void appendToOptions(Options options) {

static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();

/** Option to request the transaction to be excluded from change streams. */
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
implements UpdateTransactionOption {
@Override
void appendToOptions(Options options) {
options.withExcludeTxnFromChangeStreams = true;
}
}

static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
new ExcludeTxnFromChangeStreamsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -406,6 +432,7 @@ void appendToOptions(Options options) {
private String etag;
private Boolean validateOnly;
private Boolean withOptimisticLock;
private Boolean withExcludeTxnFromChangeStreams;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;
private DecodeMode decodeMode;
Expand Down Expand Up @@ -509,6 +536,10 @@ Boolean withOptimisticLock() {
return withOptimisticLock;
}

Boolean withExcludeTxnFromChangeStreams() {
return withExcludeTxnFromChangeStreams;
}

boolean hasDataBoostEnabled() {
return dataBoostEnabled != null;
}
Expand Down Expand Up @@ -572,6 +603,11 @@ public String toString() {
if (withOptimisticLock != null) {
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
}
if (withExcludeTxnFromChangeStreams != null) {
b.append("withExcludeTxnFromChangeStreams: ")
.append(withExcludeTxnFromChangeStreams)
.append(' ');
}
if (dataBoostEnabled != null) {
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
}
Expand Down Expand Up @@ -617,6 +653,7 @@ public boolean equals(Object o) {
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
}
Expand Down Expand Up @@ -663,6 +700,9 @@ public int hashCode() {
if (withOptimisticLock != null) {
result = 31 * result + withOptimisticLock.hashCode();
}
if (withExcludeTxnFromChangeStreams != null) {
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
}
if (dataBoostEnabled != null) {
result = 31 * result + dataBoostEnabled.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(

@VisibleForTesting
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
ByteString transactionId = initTransaction();
ByteString transactionId = initTransaction(options);

final TransactionSelector transactionSelector =
TransactionSelector.newBuilder().setId(transactionId).build();
Expand Down Expand Up @@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
return builder.build();
}

private ByteString initTransaction() {
private ByteString initTransaction(final Options options) {
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(
TransactionOptions.newBuilder()
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
if (tx.getId().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
}

static TransactionOptions createReadWriteTransactionOptions(Options options) {
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptions.setExcludeTxnFromChangeStreams(true);
}
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
if (options.withOptimisticLock() == Boolean.TRUE) {
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
}
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}

/**
Expand Down Expand Up @@ -181,10 +186,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.addAllMutations(mutationsProto);

TransactionOptions.Builder transactionOptionsBuilder =
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);

if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
Expand Down Expand Up @@ -238,6 +249,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
== Boolean.TRUE) {
requestBuilder.setExcludeTxnFromChangeStreams(true);
}
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
try (IScope s = tracer.withSpan(span)) {
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,9 @@ public void run() {
if (transactionId == null && transactionIdFuture == null) {
requestBuilder.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
} else {
requestBuilder.setTransactionId(
transactionId == null
Expand Down
Loading

0 comments on commit 46e0d3e

Please sign in to comment.