From fc49c78c9503c6dd4cbcba8c15e887415a744136 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Tue, 20 Jun 2023 19:09:28 +0530 Subject: [PATCH] feat(spanner): add databoost property for batch transactions (#8152) * feat(spanner): add databoost property for batch transactions * feat(spanner): add databoost property * feat(spanner): disable databoost property in tests * feat(spanner): comment refactor --- spanner/batch.go | 68 ++++++++++++++++++---------------- spanner/integration_test.go | 4 +- spanner/transaction.go | 74 ++++++++++++++++++++++++------------- 3 files changed, 86 insertions(+), 60 deletions(-) diff --git a/spanner/batch.go b/spanner/batch.go index 51a2530ce55b..e1ed21f23cbc 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -150,13 +150,14 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex } // Prepare ReadRequest. req := &sppb.ReadRequest{ - Session: sid, - Transaction: ts, - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""), + Session: sid, + Transaction: ts, + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""), + DataBoostEnabled: readOptions.DataBoostEnabled, } // Generate partitions. for _, p := range resp.GetPartitions() { @@ -212,13 +213,14 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement // prepare ExecuteSqlRequest r := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: statement.SQL, - Params: params, - ParamTypes: paramTypes, - QueryOptions: qOpts.Options, - RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""), + Session: sid, + Transaction: ts, + Sql: statement.SQL, + Params: params, + ParamTypes: paramTypes, + QueryOptions: qOpts.Options, + RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""), + DataBoostEnabled: qOpts.DataBoostEnabled, } // generate Partitions @@ -308,15 +310,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R if p.rreq != nil { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: p.rreq.Session, - Transaction: p.rreq.Transaction, - Table: p.rreq.Table, - Index: p.rreq.Index, - Columns: p.rreq.Columns, - KeySet: p.rreq.KeySet, - PartitionToken: p.pt, - RequestOptions: p.rreq.RequestOptions, - ResumeToken: resumeToken, + Session: p.rreq.Session, + Transaction: p.rreq.Transaction, + Table: p.rreq.Table, + Index: p.rreq.Index, + Columns: p.rreq.Columns, + KeySet: p.rreq.KeySet, + PartitionToken: p.pt, + RequestOptions: p.rreq.RequestOptions, + ResumeToken: resumeToken, + DataBoostEnabled: p.rreq.DataBoostEnabled, }) if err != nil { return client, err @@ -332,15 +335,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R } else { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{ - Session: p.qreq.Session, - Transaction: p.qreq.Transaction, - Sql: p.qreq.Sql, - Params: p.qreq.Params, - ParamTypes: p.qreq.ParamTypes, - QueryOptions: p.qreq.QueryOptions, - PartitionToken: p.pt, - RequestOptions: p.qreq.RequestOptions, - ResumeToken: resumeToken, + Session: p.qreq.Session, + Transaction: p.qreq.Transaction, + Sql: p.qreq.Sql, + Params: p.qreq.Params, + ParamTypes: p.qreq.ParamTypes, + QueryOptions: p.qreq.QueryOptions, + PartitionToken: p.pt, + RequestOptions: p.qreq.RequestOptions, + ResumeToken: resumeToken, + DataBoostEnabled: p.qreq.DataBoostEnabled, }) if err != nil { return client, err diff --git a/spanner/integration_test.go b/spanner/integration_test.go index ae53dd6ebc6e..8d9a448daba4 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -3141,7 +3141,7 @@ func TestIntegration_BatchQuery(t *testing.T) { t.Fatal(err) } defer txn.Cleanup(ctx) - if partitions, err = txn.PartitionQuery(ctx, stmt, PartitionOptions{0, 3}); err != nil { + if partitions, err = txn.PartitionQueryWithOptions(ctx, stmt, PartitionOptions{0, 3}, QueryOptions{DataBoostEnabled: false}); err != nil { t.Fatal(err) } @@ -3224,7 +3224,7 @@ func TestIntegration_BatchRead(t *testing.T) { t.Fatal(err) } defer txn.Cleanup(ctx) - if partitions, err = txn.PartitionRead(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}); err != nil { + if partitions, err = txn.PartitionReadWithOptions(ctx, "test", AllKeys(), simpleDBTableColumns, PartitionOptions{0, 3}, ReadOptions{DataBoostEnabled: false}); err != nil { t.Fatal(err) } diff --git a/spanner/transaction.go b/spanner/transaction.go index 79fb05813a62..85de18327f89 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -165,16 +165,21 @@ type ReadOptions struct { // The request tag to use for this request. RequestTag string + + // If this is for a partitioned read and DataBoostEnabled field is set to true, the request will be executed + // via Spanner independent compute resources. Setting this option for regular read operations has no effect. + DataBoostEnabled bool } // merge combines two ReadOptions that the input parameter will have higher // order of precedence. func (ro ReadOptions) merge(opts ReadOptions) ReadOptions { merged := ReadOptions{ - Index: ro.Index, - Limit: ro.Limit, - Priority: ro.Priority, - RequestTag: ro.RequestTag, + Index: ro.Index, + Limit: ro.Limit, + Priority: ro.Priority, + RequestTag: ro.RequestTag, + DataBoostEnabled: ro.DataBoostEnabled, } if opts.Index != "" { merged.Index = opts.Index @@ -188,6 +193,9 @@ func (ro ReadOptions) merge(opts ReadOptions) ReadOptions { if opts.RequestTag != "" { merged.RequestTag = opts.RequestTag } + if opts.DataBoostEnabled { + merged.DataBoostEnabled = opts.DataBoostEnabled + } return merged } @@ -218,6 +226,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key limit := t.ro.Limit prio := t.ro.Priority requestTag := t.ro.RequestTag + dataBoostEnabled := t.ro.DataBoostEnabled if opts != nil { index = opts.Index if opts.Limit > 0 { @@ -225,6 +234,9 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key } prio = opts.Priority requestTag = opts.RequestTag + if opts.DataBoostEnabled { + dataBoostEnabled = opts.DataBoostEnabled + } } var setTransactionID func(transactionID) if _, ok := ts.Selector.(*sppb.TransactionSelector_Begin); ok { @@ -238,15 +250,16 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ - Session: t.sh.getID(), - Transaction: t.getTransactionSelector(), - Table: table, - Index: index, - Columns: columns, - KeySet: kset, - ResumeToken: resumeToken, - Limit: int64(limit), - RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), + Session: t.sh.getID(), + Transaction: t.getTransactionSelector(), + Table: table, + Index: index, + Columns: columns, + KeySet: kset, + ResumeToken: resumeToken, + Limit: int64(limit), + RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), + DataBoostEnabled: dataBoostEnabled, }) if err != nil { if _, ok := t.getTransactionSelector().GetSelector().(*sppb.TransactionSelector_Begin); ok { @@ -357,16 +370,21 @@ type QueryOptions struct { // The request tag to use for this request. RequestTag string + + // If this is for a partitioned query and DataBoostEnabled field is set to true, the request will be executed + // via Spanner independent compute resources. Setting this option for regular query operations has no effect. + DataBoostEnabled bool } // merge combines two QueryOptions that the input parameter will have higher // order of precedence. func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { merged := QueryOptions{ - Mode: qo.Mode, - Options: &sppb.ExecuteSqlRequest_QueryOptions{}, - RequestTag: qo.RequestTag, - Priority: qo.Priority, + Mode: qo.Mode, + Options: &sppb.ExecuteSqlRequest_QueryOptions{}, + RequestTag: qo.RequestTag, + Priority: qo.Priority, + DataBoostEnabled: qo.DataBoostEnabled, } if opts.Mode != nil { merged.Mode = opts.Mode @@ -377,6 +395,9 @@ func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { merged.Priority = opts.Priority } + if opts.DataBoostEnabled { + merged.DataBoostEnabled = opts.DataBoostEnabled + } proto.Merge(merged.Options, qo.Options) proto.Merge(merged.Options, opts.Options) return merged @@ -517,15 +538,16 @@ func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, opti mode = *options.Mode } req := &sppb.ExecuteSqlRequest{ - Session: sid, - Transaction: ts, - Sql: stmt.SQL, - QueryMode: mode, - Seqno: atomic.AddInt64(&t.sequenceNumber, 1), - Params: params, - ParamTypes: paramTypes, - QueryOptions: options.Options, - RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), + Session: sid, + Transaction: ts, + Sql: stmt.SQL, + QueryMode: mode, + Seqno: atomic.AddInt64(&t.sequenceNumber, 1), + Params: params, + ParamTypes: paramTypes, + QueryOptions: options.Options, + RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), + DataBoostEnabled: options.DataBoostEnabled, } return req, sh, nil }