diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 842a82cdc..48025267e 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -25,6 +25,7 @@ import { CLOUD_RESOURCE_HEADER, addLeaderAwareRoutingHeader, } from '../src/common'; +import {startTrace, setSpanError} from './instrument'; export interface TransactionIdentifier { session: string | Session; @@ -136,21 +137,30 @@ class BatchTransaction extends Snapshot { delete reqOpts.gaxOptions; delete reqOpts.types; - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + return startTrace('BatchTransaction.createQueryPartitions', query, span => { + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionQuery', + reqOpts, + gaxOpts: query.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionQuery', - reqOpts, - gaxOpts: query.gaxOptions, - headers: headers, - }, - callback - ); + span.end(); + callback(err, partitions, resp); + } + ); + }); } /** * Generic create partition method. Handles common parameters used in both @@ -163,37 +173,43 @@ class BatchTransaction extends Snapshot { * @param {function} callback Callback function. */ createPartitions_(config, callback) { - const query = extend({}, config.reqOpts, { - session: this.session.formattedName_, - transaction: {id: this.id}, - }); - config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, - }; - delete query.partitionOptions; - this.session.request(config, (err, resp) => { - if (err) { - callback(err, null, resp); - return; - } - - const partitions = resp.partitions.map(partition => { - return extend({}, query, partition); + return startTrace('BatchTransaction.createPartitions', {}, span => { + const query = extend({}, config.reqOpts, { + session: this.session.formattedName_, + transaction: {id: this.id}, }); + config.reqOpts = extend({}, query); + config.headers = { + [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) + .formattedName_, + }; + delete query.partitionOptions; + this.session.request(config, (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err, null, resp); + return; + } - if (resp.transaction) { - const {id, readTimestamp} = resp.transaction; + const partitions = resp.partitions.map(partition => { + return extend({}, query, partition); + }); - this.id = id; + if (resp.transaction) { + const {id, readTimestamp} = resp.transaction; - if (readTimestamp) { - this.readTimestampProto = readTimestamp; - this.readTimestamp = new PreciseDate(readTimestamp); + this.id = id; + + if (readTimestamp) { + this.readTimestampProto = readTimestamp; + this.readTimestamp = new PreciseDate(readTimestamp); + } } - } - callback(null, partitions, resp); + span.end(); + callback(null, partitions, resp); + }); }); } /** @@ -226,29 +242,38 @@ class BatchTransaction extends Snapshot { * @returns {Promise} */ createReadPartitions(options, callback) { - const reqOpts = Object.assign({}, options, { - keySet: Snapshot.encodeKeySet(options), - }); + return startTrace('BatchTransaction.createReadPartitions', {}, span => { + const reqOpts = Object.assign({}, options, { + keySet: Snapshot.encodeKeySet(options), + }); - delete reqOpts.gaxOptions; - delete reqOpts.keys; - delete reqOpts.ranges; + delete reqOpts.gaxOptions; + delete reqOpts.keys; + delete reqOpts.ranges; - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionRead', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionRead', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - callback - ); + span.end(); + callback(err, partitions, resp); + } + ); + }); } /** * Executes partition. @@ -322,6 +347,7 @@ class BatchTransaction extends Snapshot { * ``` */ executeStream(partition) { + // TODO: Instrument the streams with Otel. if (is.string(partition.table)) { return this.createReadStream(partition.table, partition); } diff --git a/src/table.ts b/src/table.ts index 343d4364b..822293245 100644 --- a/src/table.ts +++ b/src/table.ts @@ -31,6 +31,7 @@ import { import {google as databaseAdmin} from '../protos/protos'; import {Schema, LongRunningCallback} from './common'; import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions; +import {startTrace, setSpanError} from './instrument'; export type Key = string | string[]; @@ -1072,29 +1073,34 @@ class Table { options: MutateRowsOptions | CallOptions = {}, callback: CommitCallback ): void { - const requestOptions = - 'requestOptions' in options ? options.requestOptions : {}; + startTrace('Table.' + method, {}, span => { + const requestOptions = + 'requestOptions' in options ? options.requestOptions : {}; - const excludeTxnFromChangeStreams = - 'excludeTxnFromChangeStreams' in options - ? options.excludeTxnFromChangeStreams - : false; + const excludeTxnFromChangeStreams = + 'excludeTxnFromChangeStreams' in options + ? options.excludeTxnFromChangeStreams + : false; - this.database.runTransaction( - { - requestOptions: requestOptions, - excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, - }, - (err, transaction) => { - if (err) { - callback(err); - return; - } + this.database.runTransaction( + { + requestOptions: requestOptions, + excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, + }, + (err, transaction) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err); + return; + } - transaction![method](this.name, rows as Key[]); - transaction!.commit(options, callback); - } - ); + span.end(); + transaction![method](this.name, rows as Key[]); + transaction!.commit(options, callback); + } + ); + }); } }