diff --git a/spanner/client.go b/spanner/client.go index fe87fafb1714..81e62db75411 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -684,6 +684,10 @@ type applyOption struct { transactionTag string // priority is the RPC priority that is used for the commit operation. priority sppb.RequestOptions_Priority + // If excludeTxnFromChangeStreams == true, mutations from this Client.Apply + // will not be recorded in allowed tracking change treams with DDL option + // allow_txn_exclusion=true. + excludeTxnFromChangeStreams bool } // An ApplyOption is an optional argument to Apply. @@ -722,6 +726,13 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption { } } +// ExcludeTxnFromChangeStreams returns an ApplyOptions that sets whether to exclude recording this commit operation from allowed tracking change streams. +func ExcludeTxnFromChangeStreams() ApplyOption { + return func(ao *applyOption) { + ao.excludeTxnFromChangeStreams = true + } +} + // Apply applies a list of mutations atomically to the database. func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) { ao := &applyOption{} @@ -740,10 +751,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) if !ao.atLeastOnce { resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error { return t.BufferWrite(ms) - }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag}) + }, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}) return resp.CommitTs, err } - t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader} + t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams} return t.applyAtLeastOnce(ctx, ms...) } @@ -754,14 +765,20 @@ type BatchWriteOptions struct { // The transaction tag to use for this request. TransactionTag string + + // If excludeTxnFromChangeStreams == true, modifications from all transactions + // in this batch write request will not be recorded in allowed tracking + // change treams with DDL option allow_txn_exclusion=true. + ExcludeTxnFromChangeStreams bool } // merge combines two BatchWriteOptions such that the input parameter will have higher // order of precedence. func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions { merged := BatchWriteOptions{ - TransactionTag: bwo.TransactionTag, - Priority: bwo.Priority, + TransactionTag: bwo.TransactionTag, + Priority: bwo.Priority, + ExcludeTxnFromChangeStreams: bwo.ExcludeTxnFromChangeStreams, } if opts.TransactionTag != "" { merged.TransactionTag = opts.TransactionTag @@ -769,6 +786,9 @@ func (bwo BatchWriteOptions) merge(opts BatchWriteOptions) BatchWriteOptions { if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { merged.Priority = opts.Priority } + if opts.ExcludeTxnFromChangeStreams { + merged.ExcludeTxnFromChangeStreams = opts.ExcludeTxnFromChangeStreams + } return merged } @@ -916,9 +936,10 @@ func (c *Client) BatchWriteWithOptions(ctx context.Context, mgs []*MutationGroup var md metadata.MD sh.updateLastUseTime() stream, rpcErr := sh.getClient().BatchWrite(contextWithOutgoingMetadata(ct, sh.getMetadata(), c.disableRouteToLeader), &sppb.BatchWriteRequest{ - Session: sh.getID(), - MutationGroups: mgsPb, - RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag), + Session: sh.getID(), + MutationGroups: mgsPb, + RequestOptions: createRequestOptions(opts.Priority, "", opts.TransactionTag), + ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams, }, gax.WithGRPCOptions(grpc.Header(&md))) if getGFELatencyMetricsFlag() && md != nil && c.ct != nil { diff --git a/spanner/client_test.go b/spanner/client_test.go index df31417a1182..5d511717748b 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -5338,3 +5338,203 @@ func TestClient_NestedReadWriteTransactionWithTag_InnerBlindWrite(t *testing.T) t.Fatalf("transaction tag mismatch\nGot: %s\nWant: %s", g, w) } } + +func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_ExecuteSqlRequest(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := tx.Update(ctx, Statement{SQL: UpdateBarSetFoo}) + if err != nil { + return err + } + return nil + }, TransactionOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + t.Fatalf("Failed to execute the transaction: %s", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} + +func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BufferWrite(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + if err := tx.BufferWrite([]*Mutation{ + Insert("foo", []string{"col1"}, []interface{}{"key1"}), + }); err != nil { + return err + } + return nil + }, TransactionOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + t.Fatalf("Failed to execute the transaction: %s", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} + +func TestClient_ReadWriteTransactionWithExcludeTxnFromChangeStreams_BatchUpdate(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := tx.BatchUpdate(ctx, []Statement{NewStatement(UpdateBarSetFoo)}) + if err != nil { + return err + } + return nil + }, TransactionOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + t.Fatalf("Failed to execute the transaction: %s", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.ExecuteBatchDmlRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.ExecuteBatchDmlRequest).GetTransaction().GetBegin().GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} + +func TestClient_RequestLevelDMLWithExcludeTxnFromChangeStreams_Failed(t *testing.T) { + _, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + + // Test normal DML + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + return err + } + return nil + }, TransactionOptions{ExcludeTxnFromChangeStreams: true}) + if err == nil { + t.Fatalf("Missing expected exception") + } + msg := "cannot set exclude transaction from change streams for a request-level DML statement." + if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) { + t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg) + } + + // Test batch DML + _, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + _, err := tx.UpdateWithOptions(ctx, Statement{SQL: UpdateBarSetFoo}, QueryOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + return err + } + return nil + }, TransactionOptions{ExcludeTxnFromChangeStreams: true}) + if err == nil { + t.Fatalf("Missing expected exception") + } + if status.Code(err) != codes.InvalidArgument || !strings.Contains(err.Error(), msg) { + t.Fatalf("error mismatch\nGot: %v\nWant: %v", err, msg) + } +} + +func TestClient_ApplyExcludeTxnFromChangeStreams(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + ms := []*Mutation{ + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + } + + _, err := client.Apply(context.Background(), ms, ExcludeTxnFromChangeStreams()) + if err != nil { + t.Fatal(err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} + +func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + ms := []*Mutation{ + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + } + + _, err := client.Apply(context.Background(), ms, []ApplyOption{ExcludeTxnFromChangeStreams(), ApplyAtLeastOnce()}...) + if err != nil { + t.Fatal(err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.CommitRequest).GetTransaction().(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} + +func TestClient_BatchWriteExcludeTxnFromChangeStreams(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + mutationGroups := []*MutationGroup{ + {[]*Mutation{ + {opInsertOrUpdate, "t_test", nil, []string{"key", "val"}, []interface{}{"foo1", 1}}, + }}, + } + iter := client.BatchWriteWithOptions(context.Background(), mutationGroups, BatchWriteOptions{ExcludeTxnFromChangeStreams: true}) + responseCount := 0 + doFunc := func(r *sppb.BatchWriteResponse) error { + responseCount++ + return nil + } + if err := iter.Do(doFunc); err != nil { + t.Fatal(err) + } + if responseCount != len(mutationGroups) { + t.Fatalf("Response count mismatch.\nGot: %v\nWant:%v", responseCount, len(mutationGroups)) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.BatchWriteRequest{}}, requests); err != nil { + t.Fatal(err) + } + if !requests[1].(*sppb.BatchWriteRequest).GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} diff --git a/spanner/pdml.go b/spanner/pdml.go index ac3b5b0fe3cd..8b452c828724 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -84,7 +84,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt // Execute the PDML and retry if the transaction is aborted. executePdmlWithRetry := func(ctx context.Context) (int64, error) { for { - count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req) + count, err := executePdml(contextWithOutgoingMetadata(ctx, sh.getMetadata(), c.disableRouteToLeader), sh, req, options) if err == nil { return count, nil } @@ -106,14 +106,15 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt // 3. Execute the update statement on the PDML transaction // // Note that PDML transactions cannot be committed or rolled back. -func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { +func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest, options QueryOptions) (count int64, err error) { var md metadata.MD sh.updateLastUseTime() - // Begin transaction. + // Begin transaction res, err := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sh.getID(), Options: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, + Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, + ExcludeTxnFromChangeStreams: options.ExcludeTxnFromChangeStreams, }, }) if err != nil { diff --git a/spanner/pdml_test.go b/spanner/pdml_test.go index 09f50b9d53e6..1cda47fc9577 100644 --- a/spanner/pdml_test.go +++ b/spanner/pdml_test.go @@ -179,3 +179,25 @@ func TestPartitionedUpdate_Tagging(t *testing.T) { } checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"}) } + +func TestPartitionedUpdate_ExcludeTxnFromChangeStreams(t *testing.T) { + ctx := context.Background() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + _, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{ExcludeTxnFromChangeStreams: true}) + if err != nil { + t.Fatalf("expect no errors, but got %v", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}}, requests); err != nil { + t.Fatal(err) + } + + if !requests[1].(*sppb.BeginTransactionRequest).GetOptions().GetExcludeTxnFromChangeStreams() { + t.Fatal("Transaction is not set to be excluded from change streams") + } +} diff --git a/spanner/transaction.go b/spanner/transaction.go index 21f39fb8c756..d1201e1d0e18 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -115,15 +115,20 @@ type TransactionOptions struct { // the transaction lock mode is used to specify a concurrency mode for the // read/query operations. It works for a read/write transaction only. ReadLockMode sppb.TransactionOptions_ReadWrite_ReadLockMode + + // Controls whether to exclude recording modifications in current transaction + // from the allowed tracking change streams(with DDL option allow_txn_exclusion=true). + ExcludeTxnFromChangeStreams bool } // merge combines two TransactionOptions that the input parameter will have higher // order of precedence. func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions { merged := TransactionOptions{ - CommitOptions: to.CommitOptions.merge(opts.CommitOptions), - TransactionTag: to.TransactionTag, - CommitPriority: to.CommitPriority, + CommitOptions: to.CommitOptions.merge(opts.CommitOptions), + TransactionTag: to.TransactionTag, + CommitPriority: to.CommitPriority, + ExcludeTxnFromChangeStreams: to.ExcludeTxnFromChangeStreams, } if opts.TransactionTag != "" { merged.TransactionTag = opts.TransactionTag @@ -134,6 +139,9 @@ func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions { if opts.ReadLockMode != sppb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED { merged.ReadLockMode = opts.ReadLockMode } + if opts.ExcludeTxnFromChangeStreams { + merged.ExcludeTxnFromChangeStreams = opts.ExcludeTxnFromChangeStreams + } return merged } @@ -401,18 +409,24 @@ type QueryOptions struct { // QueryOptions option used to set the DirectedReadOptions for all ExecuteSqlRequests which indicate // which replicas or regions should be used for executing queries. DirectedReadOptions *sppb.DirectedReadOptions + + // Controls whether to exclude recording modifications in current partitioned update operation + // from the allowed tracking change streams(with DDL option allow_txn_exclusion=true). Setting + // this value for any sql/dml requests other than partitioned udpate will receive an error. + ExcludeTxnFromChangeStreams bool } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, - RequestTag: qo.RequestTag, - Priority: qo.Priority, - DataBoostEnabled: qo.DataBoostEnabled, - DirectedReadOptions: qo.DirectedReadOptions, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + RequestTag: qo.RequestTag, + Priority: qo.Priority, + DataBoostEnabled: qo.DataBoostEnabled, + DirectedReadOptions: qo.DirectedReadOptions, + ExcludeTxnFromChangeStreams: qo.ExcludeTxnFromChangeStreams, } if opts.Mode != nil { merged.Mode = opts.Mode @@ -429,6 +443,9 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { if opts.DirectedReadOptions != nil { merged.DirectedReadOptions = opts.DirectedReadOptions } + if opts.ExcludeTxnFromChangeStreams { + merged.ExcludeTxnFromChangeStreams = opts.ExcludeTxnFromChangeStreams + } proto.Merge(merged.Options, qo.Options) proto.Merge(merged.Options, opts.Options) return merged @@ -622,6 +639,13 @@ func errUnexpectedTxState(ts txState) error { return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) } +// errExcludeRequestLevelDmlFromChangeStreams returns error for passing +// QueryOptions.ExcludeTxnFromChangeStreams to request-level DML functions. This +// options should only be used for partitioned update. +func errExcludeRequestLevelDmlFromChangeStreams() error { + return spannerErrorf(codes.InvalidArgument, "cannot set exclude transaction from change streams for a request-level DML statement.") +} + // ReadOnlyTransaction provides a snapshot transaction with guaranteed // consistency across reads, but does not allow writes. Read-only transactions // can be configured to read at timestamps in the past. @@ -1104,6 +1128,10 @@ func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowC // the number of affected rows. The given QueryOptions will be used for the // execution of this statement. func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { + if opts.ExcludeTxnFromChangeStreams { + return 0, errExcludeRequestLevelDmlFromChangeStreams() + } + return t.update(ctx, stmt, t.qo.merge(opts)) } @@ -1176,6 +1204,9 @@ func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statemen // The request tag and priority given in the QueryOptions are included with the // RPC. Any other options that are set in the QueryOptions struct are ignored. func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { + if opts.ExcludeTxnFromChangeStreams { + return nil, errExcludeRequestLevelDmlFromChangeStreams() + } return t.batchUpdateWithOptions(ctx, stmts, t.qo.merge(opts)) } @@ -1296,6 +1327,7 @@ func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sp Mode: &sppb.TransactionOptions_ReadWrite_{ ReadWrite: &sppb.TransactionOptions_ReadWrite{}, }, + ExcludeTxnFromChangeStreams: t.txOpts.ExcludeTxnFromChangeStreams, }, }, } @@ -1355,6 +1387,7 @@ func (t *ReadWriteTransaction) getTransactionSelector() *sppb.TransactionSelecto ReadLockMode: t.txOpts.ReadLockMode, }, }, + ExcludeTxnFromChangeStreams: t.txOpts.ExcludeTxnFromChangeStreams, }, }, } @@ -1411,6 +1444,7 @@ func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts ReadLockMode: opts.ReadLockMode, }, }, + ExcludeTxnFromChangeStreams: opts.ExcludeTxnFromChangeStreams, }, }) if err != nil { @@ -1778,6 +1812,10 @@ type writeOnlyTransaction struct { commitPriority sppb.RequestOptions_Priority // disableRouteToLeader specifies if we want to disable RW/PDML requests to be routed to leader. disableRouteToLeader bool + // ExcludeTxnFromChangeStreams controls whether to exclude recording modifications in + // current transaction from the allowed tracking change streams with DDL option + // allow_txn_exclusion=true. + excludeTxnFromChangeStreams bool } // applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, @@ -1824,6 +1862,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta Mode: &sppb.TransactionOptions_ReadWrite_{ ReadWrite: &sppb.TransactionOptions_ReadWrite{}, }, + ExcludeTxnFromChangeStreams: t.excludeTxnFromChangeStreams, }, }, Mutations: mPb,