Skip to content

Commit

Permalink
feat(spanner):add support for change streams transaction exclusion op…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
ShuranZhang committed Apr 15, 2024
1 parent b952f41 commit 79a3207
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 20 deletions.
35 changes: 28 additions & 7 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,10 @@ type applyOption struct {
transactionTag string
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
// If excludeTxnFromChangeStreams == true, mutations from this Client.Apply
// will not be recorded in allowed tracking change treams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -722,6 +726,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
}
}

// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams.
func ExcludeTxnFromChangeStreams() ApplyOption {
return func(ao *applyOption) {
ao.excludeTxnFromChangeStreams = true
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -740,10 +751,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand All @@ -754,21 +765,30 @@ type BatchWriteOptions struct {

// The transaction tag to use for this request.
TransactionTag string

// If excludeTxnFromChangeStreams == true, modifications from all transactions
// in this batch write request will not be recorded in allowed tracking
// change treams with DDL option allow_txn_exclusion=true.
ExcludeTxnFromChangeStreams bool
}

// merge combines two BatchWriteOptions such that the input parameter will have higher
// order of precedence.
func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions {
merged := BatchWriteOptions{
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
TransactionTag: bwo.TransactionTag,
Priority: bwo.Priority,
ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams,
}
if opts.TransactionTag != "" {
merged.TransactionTag = opts.TransactionTag
}
if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
merged.Priority = opts.Priority
}
if opts.ExcludeTxnFromChangeStreams {
merged.ExcludeTxnFromChangeStreams = opts.ExcludeTxnFromChangeStreams
}
return merged
}

Expand Down Expand Up @@ -916,9 +936,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup
var md metadata.MD
sh.updateLastUseTime()
stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
Session: sh.getID(),
MutationGroups: mgsPb,
RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag),
ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams,
}, gax.WithGRPCOptions(grpc.Header(&md)))

if getGFELatencyMetricsFlag() && md != nil && c.ct != nil {
Expand Down
200 changes: 200 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T)
t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w)
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
if err := tx.BufferWrite([]*Mutation{
Insert("foo", []string{"col1"}, []interface{}{"key1"}),
}); err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("Failed to execute the transaction: %s", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.ExecuteBatchDmlRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.ExecuteBatchDmlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()

// Test normal DML
_, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
msg := "cannot set exclude transaction from change streams for a request-level DML statement."
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}

// Test batch DML
_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
_, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
return err
}
return nil
}, TransactionOptions{ExcludeTxnFromChangeStreams: true})
if err == nil {
t.Fatalf("Missing expected exception")
}
if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) {
t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg)
}
}

func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams())
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}

_, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...)
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.CommitRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.CommitRequest).GetTransaction().(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}

func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

mutationGroups := []*MutationGroup{
{[]*Mutation{
{opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}},
}},
}
iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true})
responseCount := 0
doFunc := func(r *sppb.BatchWriteResponse) error {
responseCount++
return nil
}
if err := iter.Do(doFunc); err != nil {
t.Fatal(err)
}
if responseCount != len(mutationGroups) {
t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups))
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BatchWriteRequest{}}, requests); err != nil {
t.Fatal(err)
}
if !requests[1].(*sppb.BatchWriteRequest).GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
9 changes: 5 additions & 4 deletions spanner/pdml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// Execute the PDML and retry if the transaction is aborted.
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
for {
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req)
count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options)
if err == nil {
return count, nil
}
Expand All @@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
// 3. Execute the update statement on the PDML transaction
//
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) {
var md metadata.MD
sh.updateLastUseTime()
// Begin transaction.
// Begin transaction
res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams,
},
})
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions spanner/pdml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) {
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
}

func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true})
if err != nil {
t.Fatalf("expect no errors, but got %v", err)
}
requests := drainRequestsFromServer(server.TestSpanner)
if err := compareRequests([]interface{}{
&sppb.BatchCreateSessionsRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{}}, requests); err != nil {
t.Fatal(err)
}

if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() {
t.Fatal("Transaction is not set to be excluded from change streams")
}
}
Loading

0 comments on commit 79a3207

Please sign in to comment.