diff --git a/spanner/client_test.go b/spanner/client_test.go index ccd25d8f49c9..f29b6dc72ac0 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -1036,6 +1036,88 @@ func TestClient_ReadWriteTransactionWithOptions(t *testing.T) { } } +func TestClient_ReadWriteTransactionWithOptimisticLockMode_ExecuteSqlRequest(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, + SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable"), status.Error(codes.Aborted, "Transaction aborted")}, + }) + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + defer iter.Stop() + _, err := iter.Next() + return err + }, TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC}) + if err != nil { + t.Fatalf("Failed to execute the transaction: %s", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if requests[1].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Transaction is not set to optimistic") + } + if requests[2].(*sppb.ExecuteSqlRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Transaction is not set to optimistic") + } + if requests[3].(*sppb.BeginTransactionRequest).GetOptions().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Begin Transaction is not set to optimistic") + } + if _, ok := requests[4].(*sppb.ExecuteSqlRequest).Transaction.GetSelector().(*sppb.TransactionSelector_Id); !ok { + t.Fatal("expected streaming query to use transactionID from explicit begin transaction") + } +} + +func TestClient_ReadWriteTransactionWithOptimisticLockMode_ReadRequest(t *testing.T) { + server, client, teardown := setupMockedTestServer(t) + defer teardown() + ctx := context.Background() + server.TestSpanner.PutExecutionTime(MethodStreamingRead, + SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Unavailable, "Temporary unavailable"), status.Error(codes.Aborted, "Transaction aborted")}, + }) + _, err := client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error { + iter := tx.Read(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}) + defer iter.Stop() + _, err := iter.Next() + return err + }, TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC}) + if err != nil { + t.Fatalf("Failed to execute the transaction: %s", err) + } + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests([]interface{}{ + &sppb.BatchCreateSessionsRequest{}, + &sppb.ReadRequest{}, + &sppb.ReadRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ReadRequest{}, + &sppb.CommitRequest{}}, requests); err != nil { + t.Fatal(err) + } + if requests[1].(*sppb.ReadRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Transaction is not set to optimistic") + } + if requests[2].(*sppb.ReadRequest).GetTransaction().GetBegin().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Transaction is not set to optimistic") + } + if requests[3].(*sppb.BeginTransactionRequest).GetOptions().GetReadWrite().GetReadLockMode() != sppb.TransactionOptions_ReadWrite_OPTIMISTIC { + t.Fatal("Begin Transaction is not set to optimistic") + } + if _, ok := requests[4].(*sppb.ReadRequest).Transaction.GetSelector().(*sppb.TransactionSelector_Id); !ok { + t.Fatal("expected streaming read to use transactionID from explicit begin transaction") + } +} + func TestClient_ReadWriteStmtBasedTransaction_TransactionOptions(t *testing.T) { for _, tt := range transactionOptionsTestCases() { t.Run(tt.name, func(t *testing.T) { @@ -2999,6 +3081,12 @@ func transactionOptionsTestCases() []TransactionOptionsTestCase { write: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM}, want: &TransactionOptions{CommitOptions: CommitOptions{ReturnCommitStats: true}, TransactionTag: "writeTransactionTag", CommitPriority: sppb.RequestOptions_PRIORITY_MEDIUM}, }, + { + name: "Read lock mode is optimistic", + client: &TransactionOptions{ReadLockMode: sppb.TransactionOptions_ReadWrite_OPTIMISTIC}, + write: &TransactionOptions{}, + want: &TransactionOptions{}, + }, } } diff --git a/spanner/transaction.go b/spanner/transaction.go index d28dc25ffc12..4179c8689bd0 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -103,6 +103,10 @@ type TransactionOptions struct { // CommitPriority is the priority to use for the Commit RPC for the // transaction. CommitPriority sppb.RequestOptions_Priority + + // the transaction lock mode is used to specify a concurrency mode for the + // read/query operations. It works for a read/write transaction only. + ReadLockMode sppb.TransactionOptions_ReadWrite_ReadLockMode } // merge combines two TransactionOptions that the input parameter will have higher @@ -119,6 +123,9 @@ func (to TransactionOptions) merge(opts TransactionOptions) TransactionOptions { if opts.CommitPriority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { merged.CommitPriority = opts.CommitPriority } + if opts.ReadLockMode != sppb.TransactionOptions_ReadWrite_READ_LOCK_MODE_UNSPECIFIED { + merged.ReadLockMode = opts.ReadLockMode + } return merged } @@ -1244,7 +1251,9 @@ func (t *ReadWriteTransaction) getTransactionSelector() *sppb.TransactionSelecto Selector: &sppb.TransactionSelector_Begin{ Begin: &sppb.TransactionOptions{ Mode: &sppb.TransactionOptions_ReadWrite_{ - ReadWrite: &sppb.TransactionOptions_ReadWrite{}, + ReadWrite: &sppb.TransactionOptions_ReadWrite{ + ReadLockMode: t.txOpts.ReadLockMode, + }, }, }, }, @@ -1278,12 +1287,14 @@ func (t *ReadWriteTransaction) release(err error) { } } -func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { +func beginTransaction(ctx context.Context, sid string, client *vkit.Client, opts TransactionOptions) (transactionID, error) { res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ Session: sid, Options: &sppb.TransactionOptions{ Mode: &sppb.TransactionOptions_ReadWrite_{ - ReadWrite: &sppb.TransactionOptions_ReadWrite{}, + ReadWrite: &sppb.TransactionOptions_ReadWrite{ + ReadLockMode: opts.ReadLockMode, + }, }, }, }) @@ -1344,7 +1355,7 @@ func (t *ReadWriteTransaction) begin(ctx context.Context) error { return err } } - tx, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.getID(), sh.getClient()) + tx, err = beginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.getID(), sh.getClient(), t.txOpts) if isSessionNotFoundError(err) { sh.destroy() continue