From d51aae9c9c3c0e6319d81c2809573ae54675acf3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 19 Sep 2024 23:46:25 -1000 Subject: [PATCH] feat: (observability) add spans for BatchTransaction and Table (#2115) This change is part of a series of changes to add OpenTelemetry traces, focused on BatchTransaction and Table. While here, made the tests for sessionPool spans much more precise to avoid flakes. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/batch-transaction.ts | 255 +++++++++++++++++++ observability-test/table.ts | 316 ++++++++++++++++++++++++ src/batch-transaction.ts | 166 ++++++++----- src/instrument.ts | 4 +- src/table.ts | 61 +++-- src/transaction.ts | 2 + test/session-pool.ts | 61 +---- 7 files changed, 731 insertions(+), 134 deletions(-) create mode 100644 observability-test/batch-transaction.ts create mode 100644 observability-test/table.ts diff --git a/observability-test/batch-transaction.ts b/observability-test/batch-transaction.ts new file mode 100644 index 000000000..13ce59c5e --- /dev/null +++ b/observability-test/batch-transaction.ts @@ -0,0 +1,255 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable prefer-rest-params */ + +import {util} from '@google-cloud/common'; +import * as pfy from '@google-cloud/promisify'; +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import * as extend from 'extend'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +import {Session, Spanner} from '../src'; +import * as bt from '../src/batch-transaction'; + +const fakePfy = extend({}, pfy, { + promisifyAll(klass, options) { + if (klass.name !== 'BatchTransaction') { + return; + } + assert.deepStrictEqual(options.exclude, ['identifier']); + }, +}); + +class FakeTimestamp { + calledWith_: IArguments; + constructor() { + this.calledWith_ = arguments; + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const fakeCodec: any = { + encode: util.noop, + Timestamp: FakeTimestamp, + Int() {}, + Float() {}, + SpannerDate() {}, + convertProtoTimestampToDate() {}, +}; + +const SPANNER = { + routeToLeaderEnabled: true, +}; + +const INSTANCE = { + parent: SPANNER, +}; + +const DATABASE = { + formattedName_: 'database', + parent: INSTANCE, +}; + +class FakeTransaction { + calledWith_: IArguments; + session; + constructor(session) { + this.calledWith_ = arguments; + this.session = session; + } + static encodeKeySet(): object { + return {}; + } + static encodeParams(): object { + return {}; + } + + _getSpanner(): Spanner { + return SPANNER as Spanner; + } + + run() {} + read() {} +} + +describe('BatchTransaction', () => { + const sandbox = sinon.createSandbox(); + + // tslint:disable-next-line variable-name + let BatchTransaction: typeof bt.BatchTransaction; + let batchTransaction: bt.BatchTransaction; + + before(() => { + BatchTransaction = proxyquire('../src/batch-transaction.js', { + '@google-cloud/precise-date': {PreciseDate: FakeTimestamp}, + '@google-cloud/promisify': fakePfy, + './codec.js': {codec: fakeCodec}, + './transaction.js': {Snapshot: FakeTransaction}, + }).BatchTransaction; + }); + + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + afterEach(() => { + traceExporter.reset(); + sandbox.restore(); + }); + + const REQUEST = sandbox.stub(); + const SESSION = { + parent: DATABASE, + formattedName_: 'abcdef', + request: REQUEST, + }; + const ID = '0xdeadbeef'; + + const PARTITIONS = [{partitionToken: 'a'}, {partitionToken: 'b'}]; + const RESPONSE = {partitions: PARTITIONS}; + + beforeEach(() => { + batchTransaction = new BatchTransaction(SESSION as {} as Session); + batchTransaction.session = SESSION as {} as Session; + batchTransaction.id = ID; + batchTransaction.observabilityOptions = {tracerProvider: provider}; + REQUEST.callsFake((_, callback) => callback(null, RESPONSE)); + }); + + const GAX_OPTS = {}; + + const QUERY = { + sql: 'SELECT * FROM Singers', + gaxOptions: GAX_OPTS, + params: {}, + types: {}, + }; + + it('createQueryPartitions', done => { + const REQUEST = sandbox.stub(); + + const res = batchTransaction.createQueryPartitions( + QUERY, + (err, part, resp) => { + assert.ifError(err); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.BatchTransaction.createPartitions_', + 'CloudSpanner.BatchTransaction.createQueryPartitions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that createPartitions_ is a child span of createQueryPartitions. + const spanCreatePartitions_ = spans[0]; + const spanCreateQueryPartitions = spans[1]; + assert.ok( + spanCreateQueryPartitions.spanContext().traceId, + 'Expected that createQueryPartitions has a defined traceId' + ); + assert.ok( + spanCreatePartitions_.spanContext().traceId, + 'Expected that createPartitions_ has a defined traceId' + ); + assert.deepStrictEqual( + spanCreatePartitions_.spanContext().traceId, + spanCreateQueryPartitions.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + spanCreateQueryPartitions.spanContext().spanId, + 'Expected that createQueryPartitions has a defined spanId' + ); + assert.ok( + spanCreatePartitions_.spanContext().spanId, + 'Expected that createPartitions_ has a defined spanId' + ); + assert.deepStrictEqual( + spanCreatePartitions_.parentSpanId, + spanCreateQueryPartitions.spanContext().spanId, + 'Expected that createQueryPartitions is the parent to createPartitions_' + ); + done(); + } + ); + }); + + it('createReadPartitions', done => { + const REQUEST = sandbox.stub(); + const response = {}; + REQUEST.callsFake((_, callback) => callback(null, response)); + + const res = batchTransaction.createReadPartitions( + QUERY, + (err, part, resp) => { + assert.ifError(err); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + const expectedSpanNames = [ + 'CloudSpanner.BatchTransaction.createPartitions_', + 'CloudSpanner.BatchTransaction.createReadPartitions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + done(); + } + ); + }); +}); diff --git a/observability-test/table.ts b/observability-test/table.ts new file mode 100644 index 000000000..9b20780ae --- /dev/null +++ b/observability-test/table.ts @@ -0,0 +1,316 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as pfy from '@google-cloud/promisify'; +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import * as extend from 'extend'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; +import * as through from 'through2'; + +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +import {SpanStatusCode} from '@opentelemetry/api'; + +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +const fakePfy = extend({}, pfy, { + promisifyAll(klass, options) { + if (klass.name !== 'Table') { + return; + } + assert.deepStrictEqual(options.exclude, ['delete', 'drop']); + }, +}); + +class FakeTransaction { + commit(gaxOptions, callback) { + callback(null, {}); + } + createReadStream() { + return through.obj(); + } + deleteRows() {} + end() {} + insert() {} + replace() {} + upsert() {} + update() {} +} + +describe('Table', () => { + const sandbox = sinon.createSandbox(); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Table: any; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let TableCached: any; + let table; + let transaction: FakeTransaction; + + const DATABASE = { + runTransaction: (opts, callback) => callback(null, transaction), + getSnapshot: (options, callback) => callback(null, transaction), + }; + + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const NAME = 'table-name'; + + before(() => { + Table = proxyquire('../src/table.js', { + '@google-cloud/promisify': fakePfy, + }).Table; + TableCached = extend({}, Table); + }); + + beforeEach(() => { + extend(Table, TableCached); + table = new Table(DATABASE, NAME); + transaction = new FakeTransaction(); + table.observabilityOptions = {tracerProvider: provider}; + }); + + afterEach(() => { + sandbox.restore(); + traceExporter.reset(); + }); + + function getExportedSpans(minCount: number) { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual( + spans.length >= minCount, + true, + `at least ${minCount} spans expected` + ); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + return spans; + } + + function spanNames(spans) { + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + return actualSpanNames; + } + + it('deleteRows', done => { + const KEYS = ['key']; + const stub = ( + sandbox.stub(transaction, 'deleteRows') as sinon.SinonStub + ).withArgs(table.name, KEYS); + + sandbox.stub(transaction, 'commit').callsFake((opts, callback) => { + callback(); + }); + + table.deleteRows(KEYS, err => { + assert.ifError(err); + assert.strictEqual(stub.callCount, 1); + const actualSpanNames = spanNames(getExportedSpans(1)); + const expectedSpanNames = ['CloudSpanner.Table.deleteRows']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + const ROW = {}; + + it('insert', done => { + const stub = ( + sandbox.stub(transaction, 'insert') as sinon.SinonStub + ).withArgs(table.name, ROW); + + table.insert(ROW, err => { + assert.ifError(err); + assert.strictEqual(stub.callCount, 1); + const actualSpanNames = spanNames(getExportedSpans(1)); + const expectedSpanNames = ['CloudSpanner.Table.insert']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('insert with an error', done => { + const fakeError = new Error('err'); + sandbox + .stub(DATABASE, 'runTransaction') + .callsFake((opts, callback) => callback(fakeError)); + + table.insert(ROW, err => { + assert.strictEqual(err, fakeError); + + const gotSpans = getExportedSpans(1); + const gotSpanStatus = gotSpans[0].status; + const wantSpanStatus = { + code: SpanStatusCode.ERROR, + message: fakeError.toString(), + }; + assert.deepStrictEqual( + gotSpanStatus, + wantSpanStatus, + `mismatch in span status:\n\tGot: ${JSON.stringify(gotSpanStatus)}\n\tWant: ${JSON.stringify(wantSpanStatus)}` + ); + + const actualSpanNames = spanNames(gotSpans); + const expectedSpanNames = ['CloudSpanner.Table.insert']; + + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('upsert', done => { + const stub = ( + sandbox.stub(transaction, 'upsert') as sinon.SinonStub + ).withArgs(table.name, ROW); + + table.upsert(ROW, err => { + assert.ifError(err); + assert.strictEqual(stub.callCount, 1); + + const actualSpanNames = spanNames(getExportedSpans(1)); + const expectedSpanNames = ['CloudSpanner.Table.upsert']; + + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('upsert with an error', done => { + const fakeError = new Error('err'); + sandbox + .stub(DATABASE, 'runTransaction') + .callsFake((opts, callback) => callback(fakeError)); + + table.upsert(ROW, err => { + assert.strictEqual(err, fakeError); + + const gotSpans = getExportedSpans(1); + const gotSpanStatus = gotSpans[0].status; + const wantSpanStatus = { + code: SpanStatusCode.ERROR, + message: fakeError.toString(), + }; + assert.deepStrictEqual( + gotSpanStatus, + wantSpanStatus, + `mismatch in span status:\n\tGot: ${JSON.stringify(gotSpanStatus)}\n\tWant: ${JSON.stringify(wantSpanStatus)}` + ); + + const actualSpanNames = spanNames(gotSpans); + const expectedSpanNames = ['CloudSpanner.Table.upsert']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('replace', done => { + const stub = ( + sandbox.stub(transaction, 'replace') as sinon.SinonStub + ).withArgs(table.name, ROW); + + table.replace(ROW, err => { + assert.ifError(err); + assert.strictEqual(stub.callCount, 1); + + const actualSpanNames = spanNames(getExportedSpans(1)); + const expectedSpanNames = ['CloudSpanner.Table.replace']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + + it('replace with an error', done => { + const fakeError = new Error('err'); + sandbox + .stub(DATABASE, 'runTransaction') + .callsFake((opts, callback) => callback(fakeError)); + + table.replace(ROW, err => { + assert.strictEqual(err, fakeError); + const gotSpans = getExportedSpans(1); + const gotSpanStatus = gotSpans[0].status; + const wantSpanStatus = { + code: SpanStatusCode.ERROR, + message: fakeError.toString(), + }; + assert.deepStrictEqual( + gotSpanStatus, + wantSpanStatus, + `mismatch in span status:\n\tGot: ${JSON.stringify(gotSpanStatus)}\n\tWant: ${JSON.stringify(wantSpanStatus)}` + ); + + const actualSpanNames = spanNames(gotSpans); + const expectedSpanNames = ['CloudSpanner.Table.replace']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); +}); diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index 842a82cdc..cea784b03 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -25,6 +25,7 @@ import { CLOUD_RESOURCE_HEADER, addLeaderAwareRoutingHeader, } from '../src/common'; +import {startTrace, setSpanError, traceConfig} from './instrument'; export interface TransactionIdentifier { session: string | Session; @@ -136,20 +137,37 @@ class BatchTransaction extends Snapshot { delete reqOpts.gaxOptions; delete reqOpts.types; - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const traceConfig: traceConfig = { + sql: query, + opts: this.observabilityOptions, + }; + return startTrace( + 'BatchTransaction.createQueryPartitions', + traceConfig, + span => { + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionQuery', - reqOpts, - gaxOpts: query.gaxOptions, - headers: headers, - }, - callback + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionQuery', + reqOpts, + gaxOpts: query.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } + + span.end(); + callback(err, partitions, resp); + } + ); + } ); } /** @@ -163,38 +181,52 @@ class BatchTransaction extends Snapshot { * @param {function} callback Callback function. */ createPartitions_(config, callback) { - const query = extend({}, config.reqOpts, { - session: this.session.formattedName_, - transaction: {id: this.id}, - }); - config.reqOpts = extend({}, query); - config.headers = { - [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, + const traceConfig: traceConfig = { + opts: this.observabilityOptions, }; - delete query.partitionOptions; - this.session.request(config, (err, resp) => { - if (err) { - callback(err, null, resp); - return; - } - const partitions = resp.partitions.map(partition => { - return extend({}, query, partition); - }); + return startTrace( + 'BatchTransaction.createPartitions_', + traceConfig, + span => { + const query = extend({}, config.reqOpts, { + session: this.session.formattedName_, + transaction: {id: this.id}, + }); + config.reqOpts = extend({}, query); + config.headers = { + [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database) + .formattedName_, + }; + delete query.partitionOptions; + this.session.request(config, (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err, null, resp); + return; + } - if (resp.transaction) { - const {id, readTimestamp} = resp.transaction; + const partitions = resp.partitions.map(partition => { + return extend({}, query, partition); + }); - this.id = id; + if (resp.transaction) { + const {id, readTimestamp} = resp.transaction; - if (readTimestamp) { - this.readTimestampProto = readTimestamp; - this.readTimestamp = new PreciseDate(readTimestamp); - } - } + this.id = id; + + if (readTimestamp) { + this.readTimestampProto = readTimestamp; + this.readTimestamp = new PreciseDate(readTimestamp); + } + } - callback(null, partitions, resp); - }); + span.end(); + callback(null, partitions, resp); + }); + } + ); } /** * @typedef {object} ReadPartition @@ -226,28 +258,45 @@ class BatchTransaction extends Snapshot { * @returns {Promise} */ createReadPartitions(options, callback) { - const reqOpts = Object.assign({}, options, { - keySet: Snapshot.encodeKeySet(options), - }); + const traceConfig: traceConfig = { + opts: this.observabilityOptions, + }; - delete reqOpts.gaxOptions; - delete reqOpts.keys; - delete reqOpts.ranges; + return startTrace( + 'BatchTransaction.createReadPartitions', + traceConfig, + span => { + const reqOpts = Object.assign({}, options, { + keySet: Snapshot.encodeKeySet(options), + }); - const headers: {[k: string]: string} = {}; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + delete reqOpts.gaxOptions; + delete reqOpts.keys; + delete reqOpts.ranges; - this.createPartitions_( - { - client: 'SpannerClient', - method: 'partitionRead', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - callback + const headers: {[k: string]: string} = {}; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } + + this.createPartitions_( + { + client: 'SpannerClient', + method: 'partitionRead', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, partitions, resp) => { + if (err) { + setSpanError(span, err); + } + + span.end(); + callback(err, partitions, resp); + } + ); + } ); } /** @@ -322,6 +371,7 @@ class BatchTransaction extends Snapshot { * ``` */ executeStream(partition) { + // TODO: Instrument the streams with Otel. if (is.string(partition.table)) { return this.createReadStream(partition.table, partition); } diff --git a/src/instrument.ts b/src/instrument.ts index 7420ab8a6..f6d0d0e5b 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -47,7 +47,7 @@ interface SQLStatement { interface observabilityOptions { tracerProvider: TracerProvider; - enableExtendedTracing: boolean; + enableExtendedTracing?: boolean; } export type {observabilityOptions as ObservabilityOptions}; @@ -81,7 +81,7 @@ interface traceConfig { } const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. -export {SPAN_NAMESPACE_PREFIX}; +export {SPAN_NAMESPACE_PREFIX, traceConfig}; /** * startTrace begins an active span in the current active context diff --git a/src/table.ts b/src/table.ts index 343d4364b..87e07db89 100644 --- a/src/table.ts +++ b/src/table.ts @@ -31,6 +31,12 @@ import { import {google as databaseAdmin} from '../protos/protos'; import {Schema, LongRunningCallback} from './common'; import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions; +import { + ObservabilityOptions, + startTrace, + setSpanError, + traceConfig, +} from './instrument'; export type Key = string | string[]; @@ -93,6 +99,7 @@ const POSTGRESQL = 'POSTGRESQL'; class Table { database: Database; name: string; + observabilityOptions?: ObservabilityOptions; constructor(database: Database, name: string) { /** * The {@link Database} instance of this {@link Table} instance. @@ -1072,29 +1079,43 @@ class Table { options: MutateRowsOptions | CallOptions = {}, callback: CommitCallback ): void { - const requestOptions = - 'requestOptions' in options ? options.requestOptions : {}; + const traceConfig: traceConfig = { + opts: this.observabilityOptions, + }; - const excludeTxnFromChangeStreams = - 'excludeTxnFromChangeStreams' in options - ? options.excludeTxnFromChangeStreams - : false; + startTrace('Table.' + method, traceConfig, span => { + const requestOptions = + 'requestOptions' in options ? options.requestOptions : {}; - this.database.runTransaction( - { - requestOptions: requestOptions, - excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, - }, - (err, transaction) => { - if (err) { - callback(err); - return; - } + const excludeTxnFromChangeStreams = + 'excludeTxnFromChangeStreams' in options + ? options.excludeTxnFromChangeStreams + : false; - transaction![method](this.name, rows as Key[]); - transaction!.commit(options, callback); - } - ); + this.database.runTransaction( + { + requestOptions: requestOptions, + excludeTxnFromChangeStreams: excludeTxnFromChangeStreams, + }, + (err, transaction) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err); + return; + } + + transaction![method](this.name, rows as Key[]); + transaction!.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + } + ); + }); } } diff --git a/src/transaction.ts b/src/transaction.ts index c8cd6d464..1a375ff0f 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -46,6 +46,7 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; +import {ObservabilityOptions} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -285,6 +286,7 @@ export class Snapshot extends EventEmitter { queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; requestOptions?: Pick; + observabilityOptions?: ObservabilityOptions; /** * The transaction ID. diff --git a/test/session-pool.ts b/test/session-pool.ts index 447b794f2..c0ea45e06 100644 --- a/test/session-pool.ts +++ b/test/session-pool.ts @@ -29,22 +29,7 @@ import {Session} from '../src/session'; import * as sp from '../src/session-pool'; import {Transaction} from '../src/transaction'; import {grpc} from 'google-gax'; - -const {ContextManager} = require('@opentelemetry/api'); -const { - AsyncHooksContextManager, -} = require('@opentelemetry/context-async-hooks'); -const { - AlwaysOnSampler, - NodeTracerProvider, - InMemorySpanExporter, -} = require('@opentelemetry/sdk-trace-node'); -const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); -const {SPAN_NAMESPACE_PREFIX, startTrace} = require('../src/instrument'); -const { - disableContextAndManager, - setGlobalContextManager, -} = require('../observability-test/helper'); +const {startTrace} = require('../src/instrument'); let pQueueOverride: typeof PQueue | null = null; @@ -1367,34 +1352,12 @@ describe('SessionPool', () => { }); describe('trace annotations on active span', () => { - const exporter = new InMemorySpanExporter(); - const globalProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: exporter, - }); - globalProvider.addSpanProcessor(new SimpleSpanProcessor(exporter)); - globalProvider.register(); - - const contextManager = new AsyncHooksContextManager(); - setGlobalContextManager(contextManager); - beforeEach(() => { sessionPool.isOpen = true; sessionPool._isValidSession = () => true; }); - afterEach(async () => { - exporter.forceFlush(); - exporter.reset(); - globalProvider.forceFlush(); - }); - - after(async () => { - disableContextAndManager(contextManager); - await globalProvider.shutdown(); - }); - - it('annotations when acquiring a session', () => { + it('annotations when acquiring a session', done => { const topLevelSpanName = 'testSessionPool.acquire'; startTrace(topLevelSpanName, {}, async span => { const fakeSession = createSession(); @@ -1412,25 +1375,13 @@ describe('SessionPool', () => { await sessionPool._release(session); span.end(); - const spans = exporter.getFinishedSpans(); - assert.strictEqual( - spans.length, - 1, - 'Exactly 1 span should have been exported' - ); + const events = span.events; + assert.strictEqual(!events, false, 'Events must be set'); assert.strictEqual( - spans[0].name, - `${SPAN_NAMESPACE_PREFIX}.${topLevelSpanName}`, - 'Expected only the top-level created span' - ); - const span0 = spans[0]; - assert.strictEqual(!span0.events, false, 'Events must be set'); - assert.strictEqual( - span0.events.length > 0, + events.length > 0, true, 'Expecting at least 1 event' ); - const events = span0.events; // Sort the events by earliest time of occurence. events.sort((evtA, evtB) => { @@ -1448,6 +1399,8 @@ describe('SessionPool', () => { wantEventNames, `Mismatched events\n\tGot: ${gotEventNames}\n\tWant: ${wantEventNames}` ); + + done(); }); }); });