diff --git a/observability-test/batch-transaction.ts b/observability-test/batch-transaction.ts index 13ce59c5e..50e25abc5 100644 --- a/observability-test/batch-transaction.ts +++ b/observability-test/batch-transaction.ts @@ -139,7 +139,7 @@ describe('BatchTransaction', () => { batchTransaction = new BatchTransaction(SESSION as {} as Session); batchTransaction.session = SESSION as {} as Session; batchTransaction.id = ID; - batchTransaction.observabilityOptions = {tracerProvider: provider}; + batchTransaction.observabilityOptions_ = {tracerProvider: provider}; REQUEST.callsFake((_, callback) => callback(null, RESPONSE)); }); diff --git a/observability-test/database.ts b/observability-test/database.ts index 8329e81eb..945b7afdc 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -241,7 +241,7 @@ describe('Database', () => { database = new Database(INSTANCE, NAME, POOL_OPTIONS); database.parent = INSTANCE; database.databaseRole = 'parent_role'; - database.observabilityConfig = { + database.observabilityOptions_ = { tracerProvider: provider, enableExtendedTracing: false, }; diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 933e9bf08..979f27c27 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -149,7 +149,7 @@ describe('EndToEnd', () => { const instance = spanner.instance('instance'); database = instance.database('database'); - database.observabilityConfig = { + database.observabilityOptions_ = { tracerProvider: provider, enableExtendedTracing: false, }; @@ -202,7 +202,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + assert.strictEqual(spans.length, 3, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -213,14 +213,18 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Database.getSnapshot', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = ['Begin Transaction']; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -310,7 +314,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + assert.strictEqual(spans.length, 3, 'Exactly 2 spans expected'); // Sort the spans by duration. spans.sort((spanA, spanB) => { @@ -329,6 +333,7 @@ describe('EndToEnd', () => { const expectedSpanNames = [ 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', + 'CloudSpanner.Snapshot.runStream', ]; assert.deepStrictEqual( actualSpanNames, @@ -372,7 +377,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -383,7 +388,10 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + const expectedSpanNames = [ + 'CloudSpanner.Database.runTransaction', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, @@ -410,7 +418,7 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -421,14 +429,21 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + const expectedSpanNames = [ + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.writeAtLeastOnce', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Starting Commit', + 'Commit Done', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts new file mode 100644 index 000000000..d56b7a35a --- /dev/null +++ b/observability-test/transaction.ts @@ -0,0 +1,203 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {grpc} from 'google-gax'; +import {google} from '../protos/protos'; +import {Database, Spanner, Transaction} from '../src'; +import protobuf = google.spanner.v1; +import * as mock from '../test/mockserver/mockspanner'; +import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; +import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +/** A simple result set for SELECT 1. */ +function createSelect1ResultSet(): protobuf.ResultSet { + const fields = [ + protobuf.StructType.Field.create({ + name: 'NUM', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + ]; + const metadata = new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + return protobuf.ResultSet.create({ + metadata, + rows: [{values: [{stringValue: '1'}]}], + }); +} + +interface setupResults { + server: grpc.Server; + spanner: Spanner; + spannerMock: mock.MockSpanner; +} + +const selectSql = 'SELECT 1'; +const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + +async function setup(): Promise { + const server = new grpc.Server(); + + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + + const port: number = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + const spanner = new Spanner({ + projectId: 'observability-project-id', + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + }); + + return Promise.resolve({ + spanner: spanner, + server: server, + spannerMock: spannerMock, + }); +} + +describe('Transaction', () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + }); + + before(async () => { + const setupResult = await setup(); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + const upsertSql = 'INSERTORUPDATE INTO FOO(BAR, BAZ) VALUES(@bar, @baz)'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + upsertSql, + mock.StatementResult.updateCount(1) + ); + + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const instance = spanner.instance('instance'); + database = instance.database('database'); + database.observabilityOptions_ = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + }); + + beforeEach(() => { + spannerMock.resetRequests(); + }); + + afterEach(() => { + traceExporter.reset(); + }); + + it('run', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + tx!.run('SELECT 1', (err, rows) => { + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getTransaction', + 'CloudSpanner.Snapshot.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.strictEqual( + actualEventNames.every(value => expectedEventNames.includes(value)), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); +}); diff --git a/src/batch-transaction.ts b/src/batch-transaction.ts index cea784b03..067539812 100644 --- a/src/batch-transaction.ts +++ b/src/batch-transaction.ts @@ -139,7 +139,7 @@ class BatchTransaction extends Snapshot { const traceConfig: traceConfig = { sql: query, - opts: this.observabilityOptions, + opts: this.observabilityOptions_, }; return startTrace( 'BatchTransaction.createQueryPartitions', @@ -182,7 +182,7 @@ class BatchTransaction extends Snapshot { */ createPartitions_(config, callback) { const traceConfig: traceConfig = { - opts: this.observabilityOptions, + opts: this.observabilityOptions_, }; return startTrace( @@ -259,7 +259,7 @@ class BatchTransaction extends Snapshot { */ createReadPartitions(options, callback) { const traceConfig: traceConfig = { - opts: this.observabilityOptions, + opts: this.observabilityOptions_, }; return startTrace( diff --git a/src/database.ts b/src/database.ts index a805d7a84..f353261fd 100644 --- a/src/database.ts +++ b/src/database.ts @@ -344,7 +344,7 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; - observabilityConfig: ObservabilityOptions | undefined; + observabilityOptions_?: ObservabilityOptions; constructor( instance: Instance, name: string, @@ -467,7 +467,7 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); - this.observabilityConfig = instance.observabilityConfig; + this.observabilityOptions_ = instance.observabilityOptions_; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -693,7 +693,7 @@ class Database extends common.GrpcServiceObject { const sessions = (resp!.session || []).map(metadata => { const session = this.session(metadata.name!); - session.observabilityConfig = this.observabilityConfig; + session.observabilityOptions_ = this.observabilityOptions_; session.metadata = metadata; return session; }); @@ -739,6 +739,7 @@ class Database extends common.GrpcServiceObject { const transaction = new BatchTransaction(session, options); transaction.id = id; transaction.readTimestamp = identifier.timestamp as PreciseDate; + transaction.observabilityOptions_ = this.observabilityOptions_; return transaction; } /** @@ -827,7 +828,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; return startTrace('Database.createBatchTransaction', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -1873,7 +1874,7 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; return startTrace('Database.getSessions', q, span => { this.request< google.spanner.v1.ISession, @@ -1895,7 +1896,7 @@ class Database extends common.GrpcServiceObject { sessionInstances = sessions.map(metadata => { const session = self.session(metadata.name!); session.metadata = metadata; - session.observabilityConfig = this.observabilityConfig; + session.observabilityOptions_ = this.observabilityOptions_; return session; }); } @@ -2056,7 +2057,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; return startTrace('Database.getSnapshot', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -2157,7 +2158,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as GetTransactionOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; return startTrace('Database.getTransaction', q, span => { this.pool_.getSession((err, session, transaction) => { if (options.requestOptions) { @@ -2175,6 +2176,7 @@ class Database extends common.GrpcServiceObject { if (!err) { span.addEvent('Using Session', {'session.id': session?.id}); + transaction!.observabilityOptions_ = this.observabilityOptions_; this._releaseOnEnd(session!, transaction!, span); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -2784,7 +2786,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {sql: query, opts: this.observabilityConfig}; + const q = {sql: query, opts: this.observabilityOptions_}; return startTrace('Database.run', q, span => { this.runStream(query, options) .on('error', err => { @@ -3005,7 +3007,7 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const q = {sql: query, opts: this.observabilityConfig}; + const q = {sql: query, opts: this.observabilityOptions_}; return startTrace('Database.runStream', q, span => { this.pool_.getSession((err, session) => { if (err) { @@ -3183,7 +3185,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; startTrace('Database.runTransaction', q, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { @@ -3204,6 +3206,8 @@ class Database extends common.GrpcServiceObject { runFn!(err as grpc.ServiceError); return; } + + transaction!.observabilityOptions_ = this.observabilityOptions_; if (options.optimisticLock) { transaction!.useOptimisticLock(); } @@ -3576,7 +3580,7 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as CallOptions) : {}; - const q = {opts: this.observabilityConfig}; + const q = {opts: this.observabilityOptions_}; return startTrace('Database.writeAtLeastOnce', q, span => { this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { diff --git a/src/transaction.ts b/src/transaction.ts index 1a375ff0f..b8296d92d 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,7 +22,7 @@ import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; import {common as p} from 'protobufjs'; -import {Readable, PassThrough} from 'stream'; +import {finished, Readable, PassThrough, Stream} from 'stream'; import {codec, Json, JSONOptions, Type, Value} from './codec'; import { @@ -46,7 +46,12 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -286,7 +291,7 @@ export class Snapshot extends EventEmitter { queryOptions?: IQueryOptions; resourceHeader_: {[k: string]: string}; requestOptions?: Pick; - observabilityOptions?: ObservabilityOptions; + observabilityOptions_?: ObservabilityOptions; /** * The transaction ID. @@ -351,6 +356,7 @@ export class Snapshot extends EventEmitter { }; this._waitingRequests = []; this._inlineBeginStarted = false; + this.observabilityOptions_ = session.observabilityOptions_; } /** @@ -416,9 +422,6 @@ export class Snapshot extends EventEmitter { options, }; - const span = getActiveOrNoopSpan(); - span.addEvent('Begin Transaction'); - // Only hand crafted read-write transactions will be able to set a // transaction tag for the BeginTransaction RPC. Also, this.requestOptions // is only set in the constructor of Transaction, which is the constructor @@ -436,26 +439,34 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'beginTransaction', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ITransaction - ) => { - if (err) { - callback!(err, resp); - return; + const q = {opts: this.observabilityOptions_}; + return startTrace('Snapshot.begin', q, span => { + span.addEvent('Begin Transaction'); + + this.request( + { + client: 'SpannerClient', + method: 'beginTransaction', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ITransaction + ) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, resp); + return; + } + this._update(resp); + span.end(); + callback!(null, resp); } - this._update(resp); - callback!(null, resp); - } - ); + ); + }); } /** @@ -692,45 +703,71 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (this.id && transaction.begin) { - delete transaction.begin; - transaction.id = this.id; - } - return this.requestStream({ - client: 'SpannerClient', - method: 'streamingRead', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - ?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); + const q = {tableName: table, opts: this.observabilityOptions_}; + return startTrace('Snapshot.createReadStream', q, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (this.id && transaction.begin) { + delete transaction.begin; + transaction.id = this.id; } + return this.requestStream({ + client: 'SpannerClient', + method: 'streamingRead', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const prs = partialResultStream(this._wrapWithIdWaiter(makeRequest), { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); - } - }); + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + setSpanError(span, err); + }) + .on('end', err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + + if (!prs) { + return prs; + } + + if (prs instanceof Stream) { + finished(prs, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return prs; + }); } /** @@ -925,10 +962,19 @@ export class Snapshot extends EventEmitter { callback = cb as ReadCallback; } - this.createReadStream(table, request) - .on('error', callback!) - .on('data', row => rows.push(row)) - .on('end', () => callback!(null, rows)); + const q = {tableName: table, opts: this.observabilityOptions_}; + return startTrace('Snapshot.read', q, span => { + this.createReadStream(table, request) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, null); + }) + .on('data', row => rows.push(row)) + .on('end', () => { + span.end(); + callback!(null, rows); + }); + }); } /** @@ -1018,19 +1064,29 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - this.runStream(query) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); + const q = {sql: query, opts: this.observabilityOptions_}; + return startTrace('Snapshot.run', q, span => { + this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + setSpanError(span, err); + span.end(); + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } - } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => callback!(null, rows, stats, metadata)); + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** @@ -1201,51 +1257,70 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts || (this.id && !reqOpts.transaction.id)) { - try { - sanitizeRequest(); - } catch (e) { - const errorStream = new PassThrough(); - setImmediate(() => errorStream.destroy(e as Error)); - return errorStream; + const q = {opts: this.observabilityOptions_}; + Object.assign(q, query); + return startTrace('Snapshot.runStream', q, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { + try { + sanitizeRequest(); + } catch (e) { + const errorStream = new PassThrough(); + setSpanErrorAndException(span, e as Error); + span.end(); + setImmediate(() => errorStream.destroy(e as Error)); + return errorStream; + } } - } - - return this.requestStream({ - client: 'SpannerClient', - method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - .on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } + return this.requestStream({ + client: 'SpannerClient', + method: 'executeStreamingSql', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const prs = partialResultStream(this._wrapWithIdWaiter(makeRequest), { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); - } - }); + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + setSpanError(span, err as Error); + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + }); + + if (prs instanceof Stream) { + finished(prs, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return prs; + }); } /** @@ -1543,22 +1618,31 @@ export class Dml extends Snapshot { query = {sql: query} as ExecuteSqlRequest; } - this.run( - query, - ( - err: null | grpc.ServiceError, - rows: Rows, - stats: spannerClient.spanner.v1.ResultSetStats - ) => { - let rowCount = 0; - - if (stats && stats.rowCount) { - rowCount = Math.floor(stats[stats.rowCount] as number); - } + const q = {opts: this.observabilityOptions_}; + Object.assign(q, query); + return startTrace('Dml.runUpdate', q, span => { + this.run( + query, + ( + err: null | grpc.ServiceError, + rows: Rows, + stats: spannerClient.spanner.v1.ResultSetStats + ) => { + let rowCount = 0; + + if (stats && stats.rowCount) { + rowCount = Math.floor(stats[stats.rowCount] as number); + } - callback!(err, rowCount); - } - ); + if (err) { + setSpanError(span, err); + } + + span.end(); + callback!(err, rowCount); + } + ); + }); } } @@ -1812,57 +1896,64 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'executeBatchDml', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse - ) => { - let batchUpdateError: BatchUpdateError; - - if (err) { - const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); - callback!(batchUpdateError, rowCounts, resp); - return; - } + const q = {opts: this.observabilityOptions_}; + return startTrace('Transaction.batchUpdate', q, span => { + this.request( + { + client: 'SpannerClient', + method: 'executeBatchDml', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse + ) => { + let batchUpdateError: BatchUpdateError; + + if (err) { + const rowCounts: number[] = []; + batchUpdateError = Object.assign(err, {rowCounts}); + setSpanError(span, batchUpdateError); + span.end(); + callback!(batchUpdateError, rowCounts, resp); + return; + } - const {resultSets, status} = resp; - for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata?.transaction) { - this._update(resultSet.metadata.transaction); + const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } + const rowCounts: number[] = resultSets.map(({stats}) => { + return ( + (stats && + Number( + stats[ + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + ] + )) || + 0 + ); + }); + + if (status && status.code !== 0) { + const error = new Error(status.message!); + batchUpdateError = Object.assign(error, { + code: status.code, + metadata: Transaction.extractKnownMetadata(status.details!), + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchUpdateError); } - } - const rowCounts: number[] = resultSets.map(({stats}) => { - return ( - (stats && - Number( - stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! - ] - )) || - 0 - ); - }); - if (status && status.code !== 0) { - const error = new Error(status.message!); - batchUpdateError = Object.assign(error, { - code: status.code, - metadata: Transaction.extractKnownMetadata(status.details!), - rowCounts, - }) as BatchUpdateError; + span.end(); + callback!(batchUpdateError!, rowCounts, resp); } - - callback!(batchUpdateError!, rowCounts, resp); - } - ); + ); + }); } private static extractKnownMetadata( @@ -1963,81 +2054,91 @@ export class Transaction extends Dml { optionsOrCallback?: CommitOptions | CallOptions | CommitCallback, cb?: CommitCallback ): void | Promise { - const options = - typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; - const callback = - typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; - const gaxOpts = - 'gaxOptions' in options ? (options as CommitOptions).gaxOptions : options; - - const mutations = this._queuedMutations; - const session = this.session.formattedName_!; - const requestOptions = (options as CommitOptions).requestOptions; - const reqOpts: CommitRequest = {mutations, session, requestOptions}; - - const span = getActiveOrNoopSpan(); - - if (this.id) { - reqOpts.transactionId = this.id as Uint8Array; - } else if (!this._useInRunner) { - reqOpts.singleUseTransaction = this._options; - } else { - this.begin().then(() => this.commit(options, callback), callback); - return; - } + const q = {opts: this.observabilityOptions_}; + return startTrace('Transaction.commit', q, span => { + const options = + typeof optionsOrCallback === 'object' ? optionsOrCallback : {}; + const callback = + typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!; + const gaxOpts = + 'gaxOptions' in options + ? (options as CommitOptions).gaxOptions + : options; + + const mutations = this._queuedMutations; + const session = this.session.formattedName_!; + const requestOptions = (options as CommitOptions).requestOptions; + const reqOpts: CommitRequest = {mutations, session, requestOptions}; - if ( - 'returnCommitStats' in options && - (options as CommitOptions).returnCommitStats - ) { - reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats; - } - if ( - 'maxCommitDelay' in options && - (options as CommitOptions).maxCommitDelay - ) { - reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; - } - reqOpts.requestOptions = Object.assign( - requestOptions || {}, - this.requestOptions - ); + if (this.id) { + reqOpts.transactionId = this.id as Uint8Array; + } else if (!this._useInRunner) { + reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => { + span.end(); + this.commit(options, callback); + }, callback); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + if ( + 'returnCommitStats' in options && + (options as CommitOptions).returnCommitStats + ) { + reqOpts.returnCommitStats = ( + options as CommitOptions + ).returnCommitStats; + } + if ( + 'maxCommitDelay' in options && + (options as CommitOptions).maxCommitDelay + ) { + reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; + } + reqOpts.requestOptions = Object.assign( + requestOptions || {}, + this.requestOptions + ); - span.addEvent('Starting Commit'); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'commit', - reqOpts, - gaxOpts: gaxOpts, - headers: headers, - }, - (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { - this.end(); + span.addEvent('Starting Commit'); + + this.request( + { + client: 'SpannerClient', + method: 'commit', + reqOpts, + gaxOpts: gaxOpts, + headers: headers, + }, + (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { + this.end(); + + if (err) { + span.addEvent('Commit failed'); + setSpanError(span, err); + } else { + span.addEvent('Commit Done'); + } - if (err) { - span.addEvent('Commit failed'); - } else { - span.addEvent('Commit Done'); - } + if (resp && resp.commitTimestamp) { + this.commitTimestampProto = resp.commitTimestamp; + this.commitTimestamp = new PreciseDate( + resp.commitTimestamp as DateStruct + ); + } + err = Transaction.decorateCommitError(err as ServiceError, mutations); - if (resp && resp.commitTimestamp) { - this.commitTimestampProto = resp.commitTimestamp; - this.commitTimestamp = new PreciseDate( - resp.commitTimestamp as DateStruct - ); + span.end(); + callback!(err as ServiceError | null, resp); } - err = Transaction.decorateCommitError(err as ServiceError, mutations); - - callback!(err as ServiceError | null, resp); - } - ); + ); + }); } /** @@ -2323,45 +2424,53 @@ export class Transaction extends Dml { | spannerClient.spanner.v1.Spanner.RollbackCallback, cb?: spannerClient.spanner.v1.Spanner.RollbackCallback ): void | Promise { - const gaxOpts = - typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; - const callback = - typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - - if (!this.id) { - callback!( - new Error( + const q = {opts: this.observabilityOptions_}; + return startTrace('Transaction.rollback', q, span => { + const gaxOpts = + typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {}; + const callback = + typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; + + if (!this.id) { + const err = new Error( 'Transaction ID is unknown, nothing to rollback.' - ) as ServiceError - ); - return; - } - - const session = this.session.formattedName_!; - const transactionId = this.id; - const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { - session, - transactionId, - }; + ) as ServiceError; + setSpanError(span, err); + span.end(); + callback!(err); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const session = this.session.formattedName_!; + const transactionId = this.id; + const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { + session, + transactionId, + }; - this.request( - { - client: 'SpannerClient', - method: 'rollback', - reqOpts, - gaxOpts, - headers: headers, - }, - (err: null | ServiceError) => { - this.end(); - callback!(err); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + + this.request( + { + client: 'SpannerClient', + method: 'rollback', + reqOpts, + gaxOpts, + headers: headers, + }, + (err: null | ServiceError) => { + this.end(); + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err); + } + ); + }); } /** @@ -2813,9 +2922,16 @@ export class PartitionedDml extends Dml { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - super.runUpdate(query, (err, count) => { - this.end(); - callback!(err, count); + const q = {sql: query, opts: this.observabilityOptions_}; + return startTrace('PartitionedDml.runUpdate', q, span => { + super.runUpdate(query, (err, count) => { + if (err) { + setSpanError(span, err); + } + this.end(); + span.end(); + callback!(err, count); + }); }); } } diff --git a/test/spanner.ts b/test/spanner.ts index cdb41b689..2b49a7904 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -5015,7 +5015,7 @@ describe('Spanner with mock server', () => { const opts: typeof ObservabilityOptions = {tracerProvider: provider}; startTrace('aSpan', {opts: opts}, span => { const database = newTestDatabase(); - database.observabilityConfig = opts; + database.observabilityOptions_ = opts; async function runIt() { const query = {