From 87ebbf799f9a52ad12e6fa0bbc64a9e2d81ec393 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 20 Sep 2024 00:15:03 -1000 Subject: [PATCH] feat(observability): trace Database methods Adds trace spans for Database methods, as well as tests. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/database.ts | 336 +++++++++ src/database.ts | 1225 +++++++++++++++++++------------- 2 files changed, 1050 insertions(+), 511 deletions(-) create mode 100644 observability-test/database.ts diff --git a/observability-test/database.ts b/observability-test/database.ts new file mode 100644 index 000000000..328714143 --- /dev/null +++ b/observability-test/database.ts @@ -0,0 +1,336 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {grpc} from 'google-gax'; +import {google} from '../protos/protos'; +import {Database, Instance, SessionPool, Snapshot, Spanner} from '../src'; +import protobuf = google.spanner.v1; +import * as mock from '../test/mockserver/mockspanner'; +import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; +import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); + +/** A simple result set for SELECT 1. */ +function createSelect1ResultSet(): protobuf.ResultSet { + const fields = [ + protobuf.StructType.Field.create({ + name: 'NUM', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + ]; + const metadata = new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + return protobuf.ResultSet.create({ + metadata, + rows: [{values: [{stringValue: '1'}]}], + }); +} + +interface setupResults { + server: grpc.Server; + spanner: Spanner; + spannerMock: mock.MockSpanner; +} + +async function setup(): Promise { + const server = new grpc.Server(); + + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + + const port: number = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + const spanner = new Spanner({ + projectId: 'observability-project-id', + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + }); + + return Promise.resolve({ + spanner: spanner, + server: server, + spannerMock: spannerMock, + }); +} + +describe('Database', () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + }); + + before(async () => { + const setupResult = await setup(); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const instance = spanner.instance('instance'); + database = instance.database('database'); + database.observabilityConfig = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + }); + + beforeEach(() => { + spannerMock.resetRequests(); + }); + + afterEach(() => { + traceExporter.reset(); + }); + + it('run', async () => { + const [rows] = await database.run('SELECT 1'); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that RunStream is a child span of createQueryPartitions. + const spanRunStream = spans[0]; + const spanRun = spans[1]; + assert.ok( + spanRun.spanContext().traceId, + 'Expected that createQueryPartitions has a defined traceId' + ); + assert.ok( + spanRunStream.spanContext().traceId, + 'Expected that RunStream has a defined traceId' + ); + assert.deepStrictEqual( + spanRunStream.spanContext().traceId, + spanRun.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + spanRun.spanContext().spanId, + 'Expected that createQueryPartitions has a defined spanId' + ); + assert.ok( + spanRunStream.spanContext().spanId, + 'Expected that RunStream has a defined spanId' + ); + assert.deepStrictEqual( + spanRunStream.parentSpanId, + spanRun.spanContext().spanId, + 'Expected that run is the parent to runStream' + ); + }); + + it('runTransaction', done => { + database.runTransaction((err, transaction) => { + assert.ifError(err); + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + }); + + it('getSessions', async () => { + const [rows] = await database.getSessions(); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + }); + + /* + it('getDatabaseRoles', done => { + database.getDatabaseRoles((err, roles) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + */ + + it('getSnapshot', done => { + database.getSnapshot((err, transaction) => { + assert.ifError(err); + + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + done(); + }); + }); + }); +}); diff --git a/src/database.ts b/src/database.ts index 70a650675..50bc9e323 100644 --- a/src/database.ts +++ b/src/database.ts @@ -92,7 +92,7 @@ import { Schema, addLeaderAwareRoutingHeader, } from './common'; -import {Duplex, Readable, Transform} from 'stream'; +import {finished, Duplex, Readable, Transform} from 'stream'; import {PreciseDate} from '@google-cloud/precise-date'; import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.'; import arrify = require('arrify'); @@ -102,7 +102,13 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + getActiveOrNoopSpan, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -337,6 +343,7 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; + observabilityConfig: ObservabilityOptions | undefined; constructor( instance: Instance, name: string, @@ -459,6 +466,7 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); + this.observabilityConfig = instance.observabilityConfig; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -649,48 +657,57 @@ class Database extends common.GrpcServiceObject { options: number | BatchCreateSessionsOptions, callback?: BatchCreateSessionsCallback ): void | Promise { - if (typeof options === 'number') { - options = {count: options}; - } + const q = {opts: this.observabilityConfig}; + return startTrace('Database.batchCreateSessions', q, span => { + if (typeof options === 'number') { + options = {count: options}; + } - const count = options.count; - const labels = options.labels || {}; - const databaseRole = options.databaseRole || this.databaseRole || null; + const count = options.count; + span.setAttribute('session.count.requested', count); + const labels = options.labels || {}; + const databaseRole = options.databaseRole || this.databaseRole || null; - const reqOpts: google.spanner.v1.IBatchCreateSessionsRequest = { - database: this.formattedName_, - sessionTemplate: {labels: labels, creatorRole: databaseRole}, - sessionCount: count, - }; + const reqOpts: google.spanner.v1.IBatchCreateSessionsRequest = { + database: this.formattedName_, + sessionTemplate: {labels: labels, creatorRole: databaseRole}, + sessionCount: count, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'batchCreateSessions', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback!(err, null, resp!); - return; - } + this.request( + { + client: 'SpannerClient', + method: 'batchCreateSessions', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } - const sessions = (resp!.session || []).map(metadata => { - const session = this.session(metadata.name!); - session.metadata = metadata; - return session; - }); + const sessions = (resp!.session || []).map(metadata => { + const session = this.session(metadata.name!); + session.metadata = metadata; + return session; + }); - callback!(null, sessions, resp!); - } - ); + span.setAttribute('session.count.created', sessions.length); + + span.end(); + callback!(null, sessions, resp!); + } + ); + }); } /** @@ -817,26 +834,32 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError, null, undefined); - return; - } - const transaction = this.batchTransaction({session: session!}, options); - this._releaseOnEnd(session!, transaction); - transaction.begin((err, resp) => { - const span = getActiveOrNoopSpan(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.createBatchTransaction', q, span => { + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - } - callback!(err, null, resp!); + setSpanError(span, err); + span.end(); + callback!(err as ServiceError, null, undefined); return; } - span.addEvent('Using Session', {'session.id': session?.id}); - callback!(null, transaction, resp!); + const transaction = this.batchTransaction({session: session!}, options); + this._releaseOnEnd(session!, transaction); + transaction.begin((err, resp) => { + if (err) { + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + } + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } + span.end(); + callback!(null, transaction, resp!); + }); }); }); } @@ -943,29 +966,35 @@ class Database extends common.GrpcServiceObject { reqOpts.session.creatorRole = options.databaseRole || this.databaseRole || null; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + const q = {opts: this.observabilityConfig}; + return startTrace('Database.createSession', q, span => { + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'createSession', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback(err, null, resp!); - return; + this.request( + { + client: 'SpannerClient', + method: 'createSession', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback(err, null, resp!); + return; + } + const session = this.session(resp!.name!); + session.metadata = resp; + span.end(); + callback(null, session, resp!); } - const session = this.session(resp!.name!); - session.metadata = resp; - callback(null, session, resp!); - } - ); + ); + }); } /** * @typedef {array} CreateTableResponse @@ -1059,16 +1088,21 @@ class Database extends common.GrpcServiceObject { const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - this.updateSchema(schema, gaxOptions, (err, operation, resp) => { - if (err) { - callback!(err, null, null, resp!); - return; - } - const tableName = (schema as string).match( - /CREATE TABLE `*([^\s`(]+)/ - )![1]; - const table = this.table(tableName!); - callback!(null, table, operation!, resp!); + const tableName = (schema as string).match(/CREATE TABLE `*([^\s`(]+)/)![1]; + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.createTable', q, span => { + this.updateSchema(schema, gaxOptions, (err, operation, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, null, resp!); + return; + } + const table = this.table(tableName!); + span.end(); + callback!(null, table, operation!, resp!); + }); }); } /** @@ -1153,17 +1187,28 @@ class Database extends common.GrpcServiceObject { { database: this.formattedName_, }; - this.close(() => { - this.request( - { - client: 'DatabaseAdminClient', - method: 'dropDatabase', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - callback! - ); + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.delete', q, span => { + this.close(() => { + this.request( + { + client: 'DatabaseAdminClient', + method: 'dropDatabase', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, apiResponse) => { + if (err) { + setSpanError(span, err); + span.end(); + } + + callback!(err, apiResponse); + } + ); + }); }); } /** @@ -1288,30 +1333,43 @@ class Database extends common.GrpcServiceObject { : ({} as GetDatabaseConfig); const callback = typeof optionsOrCallback === 'function' ? optionsOrCallback : cb; - this.getMetadata(options.gaxOptions!, (err, metadata) => { - if (err) { - if (options.autoCreate && (err as ApiError).code === 5) { - this.create( - options, - (err, database: Database, operation: GaxOperation) => { - if (err) { - callback!(err as grpc.ServiceError); - return; + const q = {opts: this.observabilityConfig}; + return startTrace('Database.get', q, span => { + this.getMetadata(options.gaxOptions!, (err, metadata) => { + if (err) { + setSpanError(span, err); + if (options.autoCreate && (err as ApiError).code === 5) { + this.create( + options, + (err, database: Database, operation: GaxOperation) => { + if (err) { + span.end(); + callback!(err as grpc.ServiceError); + return; + } + operation + .on('error', err => { + setSpanError(span, err); + span.end(); + callback!(err); + }) + .on('complete', (metadata: Metadata) => { + this.metadata = metadata; + span.end(); + callback!(null, this, metadata as r.Response); + }); } - operation - .on('error', callback!) - .on('complete', (metadata: Metadata) => { - this.metadata = metadata; - callback!(null, this, metadata as r.Response); - }); - } - ); + ); + return; + } + span.end(); + callback!(err); return; } - callback!(err); - return; - } - callback!(null, this, metadata as r.Response); + + span.end(); + callback!(null, this, metadata as r.Response); + }); }); } /** @@ -1386,21 +1444,29 @@ class Database extends common.GrpcServiceObject { { name: this.formattedName_, }; - return this.request( - { - client: 'DatabaseAdminClient', - method: 'getDatabase', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, resp) => { - if (resp) { - this.metadata = resp; + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getMetadata', q, span => { + return this.request( + { + client: 'DatabaseAdminClient', + method: 'getDatabase', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, resp) => { + if (resp) { + this.metadata = resp; + } + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, resp); } - callback!(err, resp); - } - ); + ); + }); } /** @@ -1640,19 +1706,27 @@ class Database extends common.GrpcServiceObject { { database: this.formattedName_, }; - this.request( - { - client: 'DatabaseAdminClient', - method: 'getDatabaseDdl', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (err, statements, ...args: any[]) => { - callback!(err, statements ? statements.statements : null, ...args); - } - ); + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSchema', q, span => { + this.request( + { + client: 'DatabaseAdminClient', + method: 'getDatabaseDdl', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (err, statements, ...args: any[]) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, statements ? statements.statements : null, ...args); + } + ); + }); } /** @@ -1720,18 +1794,25 @@ class Database extends common.GrpcServiceObject { requestedPolicyVersion: options.requestedPolicyVersion || null, }, }; - this.request( - { - client: 'DatabaseAdminClient', - method: 'getIamPolicy', - reqOpts, - gaxOpts: options.gaxOptions, - headers: this.resourceHeader_, - }, - (err, resp) => { - callback!(err, resp); - } - ); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getIamPolicy', q, span => { + this.request( + { + client: 'DatabaseAdminClient', + method: 'getIamPolicy', + reqOpts, + gaxOpts: options.gaxOptions, + headers: this.resourceHeader_, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, resp); + } + ); + }); } /** @@ -1855,32 +1936,39 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - this.request< - google.spanner.v1.ISession, - google.spanner.v1.IListSessionsResponse - >( - { - client: 'SpannerClient', - method: 'listSessions', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, sessions, nextPageRequest, ...args) => { - let sessionInstances: Session[] | null = null; - if (sessions) { - sessionInstances = sessions.map(metadata => { - const session = self.session(metadata.name!); - session.metadata = metadata; - return session; - }); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSessions', q, span => { + this.request< + google.spanner.v1.ISession, + google.spanner.v1.IListSessionsResponse + >( + { + client: 'SpannerClient', + method: 'listSessions', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, sessions, nextPageRequest, ...args) => { + let sessionInstances: Session[] | null = null; + if (sessions) { + sessionInstances = sessions.map(metadata => { + const session = self.session(metadata.name!); + session.metadata = metadata; + return session; + }); + } + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, sessionInstances!, nextQuery, ...args); } - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; - callback!(err, sessionInstances!, nextQuery, ...args); - } - ); + ); + }); } /** @@ -2021,43 +2109,51 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | GetSnapshotCallback, cb?: GetSnapshotCallback ): void | Promise<[Snapshot]> { - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as GetSnapshotCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError); - return; - } - - const snapshot = session!.snapshot(options, this.queryOptions_); - - snapshot.begin(err => { - const span = getActiveOrNoopSpan(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSnapshot', q, span => { + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as GetSnapshotCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - session!.lastError = err; - this.pool_.release(session!); - this.getSnapshot(options, callback!); - } else { - span.addEvent('Using Session', {'session.id': session?.id}); - this.pool_.release(session!); - callback!(err); - } + setSpanError(span, err); + span.end(); + callback!(err as ServiceError); return; } - this._releaseOnEnd(session!, snapshot); - callback!(err, snapshot); + const snapshot = session!.snapshot(options, this.queryOptions_); + + snapshot.begin(err => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + session!.lastError = err; + this.pool_.release(session!); + this.getSnapshot(options, callback!); + span.end(); + } else { + span.addEvent('Using Session', {'session.id': session?.id}); + this.pool_.release(session!); + span.end(); + callback!(err); + } + return; + } + + this._releaseOnEnd(session!, snapshot); + span.end(); + callback!(err, snapshot); + }); }); }); } @@ -2133,12 +2229,13 @@ class Database extends common.GrpcServiceObject { if (options.excludeTxnFromChangeStreams) { transaction!.excludeTxnFromChangeStreams(); } + + const span = getActiveOrNoopSpan(); + if (!err) { - const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); this._releaseOnEnd(session!, transaction!); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { - const span = getActiveOrNoopSpan(); span.addEvent('No session available', { 'session.id': session?.id, }); @@ -2324,25 +2421,33 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetDatabaseRolesOptions).pageSize; delete (gaxOpts as GetDatabaseRolesOptions).pageToken; } - this.request< - IDatabaseRole, - databaseAdmin.spanner.admin.database.v1.ListDatabaseRolesResponse - >( - { - client: 'DatabaseAdminClient', - method: 'listDatabaseRoles', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, roles, nextPageRequest, ...args) => { - const nextQuery = nextPageRequest! - ? extend({}, gaxOpts, nextPageRequest!) - : null; - callback!(err, roles, nextQuery, ...args); - } - ); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getDatabaseRoles', q, span => { + this.request< + IDatabaseRole, + databaseAdmin.spanner.admin.database.v1.ListDatabaseRolesResponse + >( + { + client: 'DatabaseAdminClient', + method: 'listDatabaseRoles', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, roles, nextPageRequest, ...args) => { + const nextQuery = nextPageRequest! + ? extend({}, gaxOpts, nextPageRequest!) + : null; + + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, roles, nextQuery, ...args); + } + ); + }); } /** @@ -2363,12 +2468,14 @@ class Database extends common.GrpcServiceObject { callback?: PoolRequestCallback ): void | Promise { const pool = this.pool_; + const span = getActiveOrNoopSpan(); + pool.getSession((err, session) => { if (err) { callback!(err as ServiceError, null); return; } - const span = getActiveOrNoopSpan(); + span.addEvent('Using Session', {'session.id': session?.id}); config.reqOpts.session = session!.formattedName_; this.request(config, (err, ...args) => { @@ -2409,9 +2516,9 @@ class Database extends common.GrpcServiceObject { session = null; } } + const span = getActiveOrNoopSpan(); waitForSessionStream.on('reading', () => { pool.getSession((err, session_) => { - const span = getActiveOrNoopSpan(); if (err) { if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -2540,22 +2647,28 @@ class Database extends common.GrpcServiceObject { reqOpts.encryptionConfig = (options as RestoreOptions).encryptionConfig; } - return this.request( - { - client: 'DatabaseAdminClient', - method: 'restoreDatabase', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, operation, resp) => { - if (err) { - callback!(err, null, null, resp); - return; + const q = {opts: this.observabilityConfig}; + return startTrace('Database.restore', q, span => { + return this.request( + { + client: 'DatabaseAdminClient', + method: 'restoreDatabase', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, operation, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, null, resp); + return; + } + span.end(); + callback!(null, this, operation, resp); } - callback!(null, this, operation, resp); - } - ); + ); + }); } /** @@ -2727,32 +2840,39 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: TimestampBounds | RunCallback, cb?: RunCallback ): void | Promise { - let stats: ResultSetStats; - let metadata: ResultSetMetadata; - const rows: Row[] = []; - const callback = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as RunCallback) - : cb; - const options = - typeof optionsOrCallback === 'object' - ? (optionsOrCallback as TimestampBounds) - : {}; - - this.runStream(query, options) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - } - }) - .on('stats', _stats => (stats = _stats)) - .on('data', row => { - rows.push(row); - }) - .on('end', () => { - callback!(null, rows, stats, metadata); - }); + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.run', q, span => { + let stats: ResultSetStats; + let metadata: ResultSetMetadata; + const rows: Row[] = []; + const callback = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as RunCallback) + : cb; + const options = + typeof optionsOrCallback === 'object' + ? (optionsOrCallback as TimestampBounds) + : {}; + + this.runStream(query, options) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + } + }) + .on('stats', _stats => (stats = _stats)) + .on('data', row => { + rows.push(row); + }) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** * Partitioned DML transactions are used to execute DML statements with a @@ -2779,13 +2899,23 @@ class Database extends common.GrpcServiceObject { query: string | RunPartitionedUpdateOptions, callback?: RunUpdateCallback ): void | Promise<[number]> { - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError, 0); - return; - } + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.runPartitionedUpdate', q, span => { + this.pool_.getSession((err, session) => { + if (err) { + span.end(); + callback!(err as ServiceError, 0); + return; + } - this._runPartitionedUpdate(session!, query, callback); + this._runPartitionedUpdate(session!, query, (err, rowCount) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, rowCount); + }); + }); }); } @@ -2799,26 +2929,36 @@ class Database extends common.GrpcServiceObject { if (typeof query !== 'string' && query.excludeTxnFromChangeStreams) { transaction.excludeTxnFromChangeStreams(); } - transaction.begin(err => { - if (err) { - this.pool_.release(session!); - callback!(err, 0); - return; - } - transaction.runUpdate(query, (err, updateCount) => { + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database._runPartitionedUpdate', q, span => { + transaction.begin(err => { if (err) { - if (err.code !== grpc.status.ABORTED) { - this.pool_.release(session!); - callback!(err, 0); - return; - } - this._runPartitionedUpdate(session, query, callback); - } else { this.pool_.release(session!); - callback!(null, updateCount); + setSpanError(span, err); + span.end(); + callback!(err, 0); return; } + + transaction.runUpdate(query, (err, updateCount) => { + if (err) { + setSpanError(span, err); + + if (err.code !== grpc.status.ABORTED) { + this.pool_.release(session!); + callback!(err, 0); + span.end(); + return; + } + this._runPartitionedUpdate(session, query, callback); + } else { + this.pool_.release(session!); + callback!(null, updateCount); + span.end(); + return; + } + }); }); }); } @@ -2952,61 +3092,78 @@ class Database extends common.GrpcServiceObject { query: string | ExecuteSqlRequest, options?: TimestampBounds ): PartialResultStream { - const proxyStream: Transform = through.obj(); - const span = getActiveOrNoopSpan(); + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.runStream', q, span => { + const proxyStream: Transform = through.obj(); - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; + } - const span = getActiveOrNoopSpan(); - span.addEvent('Using Session', {'session.id': session?.id}); + span.addEvent('Using Session', {'session.id': session?.id}); + + const snapshot = session!.snapshot(options, this.queryOptions_); + + this._releaseOnEnd(session!, snapshot); - const snapshot = session!.snapshot(options, this.queryOptions_); - - this._releaseOnEnd(session!, snapshot); - - let dataReceived = false; - let dataStream = snapshot.runStream(query); - const endListener = () => snapshot.end(); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If it is a 'Session not found' error and we have not yet received - // any data, we can safely retry the query on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; + let dataReceived = false; + let dataStream = snapshot.runStream(query); + + const endListener = () => { + snapshot.end(); + }; + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + setSpanError(span, err); + + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If it is a 'Session not found' error and we have not yet received + // any data, we can safely retry the query on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', { + 'session.id': session?.id, + }); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.removeListener('end', endListener); + dataStream.end(); + snapshot.end(); + // Create a new data stream and add it to the end user stream. + dataStream = this.runStream(query, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + snapshot.end(); } - span.addEvent('No session available', { - 'session.id': session?.id, - }); - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.removeListener('end', endListener); - dataStream.end(); - snapshot.end(); - // Create a new data stream and add it to the end user stream. - dataStream = this.runStream(query, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); - snapshot.end(); - } - }) - .on('stats', stats => proxyStream.emit('stats', stats)) - .on('response', response => proxyStream.emit('response', response)) - .once('end', endListener) - .pipe(proxyStream); - }); + }) + .on('stats', stats => proxyStream.emit('stats', stats)) + .on('response', response => proxyStream.emit('response', response)) + .once('end', () => { + endListener(); + }) + .pipe(proxyStream); + }); + + finished(proxyStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); - return proxyStream as PartialResultStream; + return proxyStream as PartialResultStream; + }); } /** @@ -3107,58 +3264,75 @@ class Database extends common.GrpcServiceObject { optionsOrRunFn: RunTransactionOptions | RunTransactionCallback, fn?: RunTransactionCallback ): void { - const runFn = - typeof optionsOrRunFn === 'function' - ? (optionsOrRunFn as RunTransactionCallback) - : fn; - const options = - typeof optionsOrRunFn === 'object' && optionsOrRunFn - ? (optionsOrRunFn as RunTransactionOptions) - : {}; - - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.runTransaction(options, runFn!); - return; - } - if (err) { - runFn!(err as grpc.ServiceError); - return; - } - if (options.optimisticLock) { - transaction!.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction!.excludeTxnFromChangeStreams(); - } - - const release = this.pool_.release.bind(this.pool_, session!); - const runner = new TransactionRunner( - session!, - transaction!, - runFn!, - options - ); + const q = {opts: this.observabilityConfig}; + startTrace('Database.runTransaction', q, span => { + const runFn = + typeof optionsOrRunFn === 'function' + ? (optionsOrRunFn as RunTransactionCallback) + : fn; + const options = + typeof optionsOrRunFn === 'object' && optionsOrRunFn + ? (optionsOrRunFn as RunTransactionOptions) + : {}; + + this.pool_.getSession((err, session?, transaction?) => { + if (err) { + setSpanError(span, err); + } - runner.run().then(release, err => { - const span = getActiveOrNoopSpan(); - if (isSessionNotFoundError(err)) { + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { 'session.id': session?.id, }); - release(); + span.end(); this.runTransaction(options, runFn!); - } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } - setImmediate(runFn!, err); - release(); + return; + } + + if (err) { + span.end(); + runFn!(err as grpc.ServiceError); + return; } + if (options.optimisticLock) { + transaction!.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction!.excludeTxnFromChangeStreams(); + } + + const release = () => { + span.end(); + this.pool_.release(session!); + }; + + const runner = new TransactionRunner( + session!, + transaction!, + (err, resp) => { + span.end(); + runFn!(err, resp); + }, + options + ); + + runner.run().then(release, err => { + setSpanError(span, err!); + + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + release(); + this.runTransaction(options, runFn!); + } else { + if (!err) { + span.addEvent('Using Session', {'session.id': session!.id}); + } + setImmediate(runFn!, err); + release(); + } + }); }); }); } @@ -3243,47 +3417,55 @@ class Database extends common.GrpcServiceObject { : {}; let sessionId = ''; - const getSession = this.pool_.getSession.bind(this.pool_); - const span = getActiveOrNoopSpan(); - // Loop to retry 'Session not found' errors. - // (and yes, we like while (true) more than for (;;) here) - // eslint-disable-next-line no-constant-condition - while (true) { - try { - const [session, transaction] = await promisify(getSession)(); - transaction.requestOptions = Object.assign( - transaction.requestOptions || {}, - options.requestOptions - ); - if (options.optimisticLock) { - transaction.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction.excludeTxnFromChangeStreams(); - } - sessionId = session?.id; - span.addEvent('Using Session', {'session.id': sessionId}); - const runner = new AsyncTransactionRunner( - session, - transaction, - runFn, - options - ); - + const q = {opts: this.observabilityConfig}; + return startTrace('Database.runTransactionAsync', q, async span => { + const getSession = this.pool_.getSession.bind(this.pool_); + // Loop to retry 'Session not found' errors. + // (and yes, we like while (true) more than for (;;) here) + // eslint-disable-next-line no-constant-condition + while (true) { try { - return await runner.run(); - } finally { - this.pool_.release(session); - } - } catch (e) { - if (!isSessionNotFoundError(e as ServiceError)) { - span.addEvent('No session available', { - 'session.id': sessionId, - }); - throw e; + const [session, transaction] = await promisify(getSession)(); + transaction.requestOptions = Object.assign( + transaction.requestOptions || {}, + options.requestOptions + ); + if (options.optimisticLock) { + transaction.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction.excludeTxnFromChangeStreams(); + } + + sessionId = session?.id; + span.addEvent('Using Session', {'session.id': sessionId}); + const runner = new AsyncTransactionRunner( + session, + transaction, + runFn, + options + ); + + try { + return await runner.run(); + } catch (e) { + setSpanErrorAndException(span, e as Error); + } finally { + this.pool_.release(session); + span.end(); + } + } catch (e) { + if (!isSessionNotFoundError(e as ServiceError)) { + span.addEvent('No session available', { + 'session.id': sessionId, + }); + setSpanErrorAndException(span, e as Error); + span.end(); + throw e; + } } } - } + }); } /** @@ -3348,64 +3530,75 @@ class Database extends common.GrpcServiceObject { mutationGroups: MutationGroup[], options?: BatchWriteOptions ): NodeJS.ReadableStream { - const proxyStream: Transform = through.obj(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.batchWriteAtLeastOnce', q, span => { + const proxyStream: Transform = through.obj(); - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; + } - const span = getActiveOrNoopSpan(); - span.addEvent('Using Session', {'session.id': session?.id}); + span.addEvent('Using Session', {'session.id': session?.id}); + const gaxOpts = extend(true, {}, options?.gaxOptions); + const reqOpts = Object.assign( + {} as spannerClient.spanner.v1.BatchWriteRequest, + { + session: session!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, + } + ); + let dataReceived = false; + let dataStream = this.requestStream({ + client: 'SpannerClient', + method: 'batchWrite', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If there's a 'Session not found' error and we have not yet received + // any data, we can safely retry the writes on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', { + 'session.id': session?.id, + }); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.end(); + // Create a new stream and add it to the end user stream. + dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + } - const gaxOpts = extend(true, {}, options?.gaxOptions); - const reqOpts = Object.assign( - {} as spannerClient.spanner.v1.BatchWriteRequest, - { - session: session!.formattedName_!, - mutationGroups: mutationGroups.map(mg => mg.proto()), - requestOptions: options?.requestOptions, - excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, - } - ); - let dataReceived = false; - let dataStream = this.requestStream({ - client: 'SpannerClient', - method: 'batchWrite', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, + setSpanError(span, err); + span.end(); + }) + .once('end', () => { + this.pool_.release(session!); + span.end(); + }) + .pipe(proxyStream); }); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If there's a 'Session not found' error and we have not yet received - // any data, we can safely retry the writes on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; - } - span.addEvent('No session available', {'session.id': session?.id}); - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.end(); - // Create a new stream and add it to the end user stream. - dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); - } - }) - .once('end', () => this.pool_.release(session!)) - .pipe(proxyStream); - }); - return proxyStream as NodeJS.ReadableStream; + return proxyStream as NodeJS.ReadableStream; + }); } /** @@ -3474,31 +3667,41 @@ class Database extends common.GrpcServiceObject { optionsOrCallback?: CallOptions | CommitCallback, callback?: CommitCallback ): void | Promise { - const cb = - typeof optionsOrCallback === 'function' - ? (optionsOrCallback as CommitCallback) - : callback; - const options = - typeof optionsOrCallback === 'object' && optionsOrCallback - ? (optionsOrCallback as CallOptions) - : {}; - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.writeAtLeastOnce(mutations, options, cb!); - return; - } - if (err) { - cb!(err as grpc.ServiceError); - return; - } - span.addEvent('Using Session', {'session.id': session?.id}); - this._releaseOnEnd(session!, transaction!); - transaction?.setQueuedMutations(mutations.proto()); - return transaction?.commit(options, cb!); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.writeAtLeastOnce', q, span => { + const cb = + typeof optionsOrCallback === 'function' + ? (optionsOrCallback as CommitCallback) + : callback; + const options = + typeof optionsOrCallback === 'object' && optionsOrCallback + ? (optionsOrCallback as CallOptions) + : {}; + this.pool_.getSession((err, session?, transaction?) => { + const spanEndAndCallback = (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + cb!(err, resp); + }; + + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + this.writeAtLeastOnce(mutations, options, spanEndAndCallback); + return; + } + if (err) { + cb!(err as grpc.ServiceError); + return; + } + span.addEvent('Using Session', {'session.id': session?.id}); + this._releaseOnEnd(session!, transaction!); + transaction?.setQueuedMutations(mutations.proto()); + return transaction?.commit(options, spanEndAndCallback); + }); }); }