From 1f06871f7aca386756e8691013602b069697bb87 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 29 Sep 2024 23:58:25 -0700 Subject: [PATCH] feat(observability): trace Database methods (#2119) Adds trace spans for Database methods, as well as tests for methods: * getSession * getSnapshot * run * runStream * runTransaction tracing of other methods shall come in follow-up PRs. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/database.ts | 1450 ++++++++++++++++++++++++++++++++ observability-test/spanner.ts | 442 ++++++++++ observability-test/table.ts | 6 +- src/database.ts | 539 +++++++----- src/instrument.ts | 9 +- test/database.ts | 12 +- test/spanner.ts | 17 +- 7 files changed, 2242 insertions(+), 233 deletions(-) create mode 100644 observability-test/database.ts create mode 100644 observability-test/spanner.ts diff --git a/observability-test/database.ts b/observability-test/database.ts new file mode 100644 index 000000000..8329e81eb --- /dev/null +++ b/observability-test/database.ts @@ -0,0 +1,1450 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* eslint-disable prefer-rest-params */ + +import * as through from 'through2'; +import {EventEmitter} from 'events'; +import * as assert from 'assert'; +import * as extend from 'extend'; +import {google} from '../protos/protos'; +import {CommitCallback, CommitOptions, MutationSet} from '../src/transaction'; +import {util} from '@google-cloud/common'; +import {Transform} from 'stream'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; +const {SpanStatusCode} = require('@opentelemetry/api'); +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +import * as db from '../src/database'; +import {Instance, Spanner} from '../src'; +import * as pfy from '@google-cloud/promisify'; +import {grpc} from 'google-gax'; +import {MockError} from '../test/mockserver/mockspanner'; + +const fakePfy = extend({}, pfy, { + promisifyAll(klass, options) { + if (klass.name !== 'Database') { + return; + } + assert.deepStrictEqual(options.exclude, [ + 'batchTransaction', + 'batchWriteAtLeastOnce', + 'getRestoreInfo', + 'getState', + 'getDatabaseDialect', + 'getOperations', + 'runTransaction', + 'runTransactionAsync', + 'table', + 'session', + ]); + }, +}); + +class FakeBatchTransaction { + calledWith_: IArguments; + id?: string; + readTimestamp?: {seconds: number; nanos: number}; + constructor() { + this.calledWith_ = arguments; + } +} + +class FakeGrpcServiceObject extends EventEmitter { + calledWith_: IArguments; + constructor() { + super(); + this.calledWith_ = arguments; + } +} + +function fakePartialResultStream(this: Function & {calledWith_: IArguments}) { + this.calledWith_ = arguments; + return this; +} + +class FakeSession { + calledWith_: IArguments; + formattedName_: any; + constructor() { + this.calledWith_ = arguments; + } + partitionedDml(): FakeTransaction { + return new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.PartitionedDml + ); + } + snapshot(): FakeTransaction { + return new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + } +} + +class FakeSessionPool extends EventEmitter { + calledWith_: IArguments; + constructor() { + super(); + this.calledWith_ = arguments; + } + open() {} + getSession() {} + release() {} +} + +class FakeTable { + calledWith_: IArguments; + constructor() { + this.calledWith_ = arguments; + } +} + +class FakeTransaction extends EventEmitter { + calledWith_: IArguments; + _options!: google.spanner.v1.ITransactionOptions; + private _queuedMutations: google.spanner.v1.Mutation[]; + constructor(options) { + super(); + this._options = options; + this.calledWith_ = arguments; + this._queuedMutations = []; + } + begin() {} + end() {} + runStream(): Transform { + return through.obj(); + } + runUpdate() {} + setQueuedMutations(mutation) { + this._queuedMutations = mutation; + } + commit( + options?: CommitOptions, + callback?: CommitCallback + ): void | Promise { + if (callback) { + callback(null, {commitTimestamp: {seconds: 1, nanos: 0}}); + } + return Promise.resolve({commitTimestamp: {seconds: 1, nanos: 0}}); + } +} + +class FakeTransactionRunner { + calledWith_: IArguments; + constructor() { + this.calledWith_ = arguments; + // eslint-disable-next-line @typescript-eslint/no-this-alias + } + async run(): Promise {} +} + +class FakeAsyncTransactionRunner { + calledWith_: IArguments; + constructor() { + this.calledWith_ = arguments; + } + async run(): Promise { + return {} as T; + } +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const fakeCodec: any = { + encode: util.noop, + Int() {}, + Float() {}, + SpannerDate() {}, +}; + +class FakeAbortError { + error; + constructor(err) { + this.error = err; + } +} + +const fakeRetry = fn => { + return fn(); +}; + +fakeRetry.AbortError = FakeAbortError; + +describe('Database', () => { + const sandbox = sinon.createSandbox(); + + // tslint:disable-next-line variable-name + let Database: typeof db.Database; + // tslint:disable-next-line variable-name + let DatabaseCached: typeof db.Database; + + const SPANNER = { + routeToLeaderEnabled: true, + } as {} as Spanner; + + const INSTANCE = { + request: util.noop, + requestStream: util.noop, + formattedName_: 'instance-name', + databases_: new Map(), + parent: SPANNER, + } as {} as Instance; + + const NAME = 'table-name'; + + const POOL_OPTIONS = {}; + + let database; + + before(() => { + Database = proxyquire('../src/database.js', { + './common-grpc/service-object': { + GrpcServiceObject: FakeGrpcServiceObject, + }, + '@google-cloud/promisify': fakePfy, + 'p-retry': fakeRetry, + './batch-transaction': {BatchTransaction: FakeBatchTransaction}, + './codec': {codec: fakeCodec}, + './partial-result-stream': {partialResultStream: fakePartialResultStream}, + './session-pool': {SessionPool: FakeSessionPool}, + './session': {Session: FakeSession}, + './table': {Table: FakeTable}, + './transaction-runner': { + TransactionRunner: FakeTransactionRunner, + AsyncTransactionRunner: FakeAsyncTransactionRunner, + }, + }).Database; + DatabaseCached = Object.assign({}, Database); + }); + + beforeEach(() => { + fakeCodec.encode = util.noop; + extend(Database, DatabaseCached); + database = new Database(INSTANCE, NAME, POOL_OPTIONS); + database.parent = INSTANCE; + database.databaseRole = 'parent_role'; + database.observabilityConfig = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + const gaxOpts = {}; + const options: { + a: string; + gaxOptions?: {}; + } = {a: 'a', gaxOptions: gaxOpts}; + + const expectedReqOpts = extend({}, options, { + database: database.formattedName_, + }); + delete expectedReqOpts.gaxOptions; + }); + + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + afterEach(() => { + sandbox.restore(); + traceExporter.forceFlush(); + traceExporter.reset(); + }); + + it('getSessions without error', done => { + const ARGS = [null, [], {}]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.getSessions((err, sessions) => { + assert.ifError(err); + assert.ok(sessions); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span's status code is UNSET. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Expected an OK span status' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('getSessions with error', done => { + const ARGS = [new Error('our error'), null, {}]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.getSessions((err, sessions) => { + assert.ok(err); + assert.ok(!sessions); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'our error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + describe('getSnapshot', () => { + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeSnapshot: FakeTransaction; + + let beginSnapshotStub: sinon.SinonStub; + let getSessionStub: sinon.SinonStub; + let snapshotStub: sinon.SinonStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeSnapshot = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + + beginSnapshotStub = ( + sandbox.stub(fakeSnapshot, 'begin') as sinon.SinonStub + ).callsFake(callback => callback(null)); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeSession)); + + snapshotStub = sandbox + .stub(fakeSession, 'snapshot') + .returns(fakeSnapshot); + }); + + it('with error', done => { + const fakeError = new Error('our snapshot error'); + + getSessionStub.callsFake(callback => callback(fakeError, null)); + + database.getSnapshot((err, snapshot) => { + assert.strictEqual(err, fakeError); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'our snapshot error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with retries on `begin` errors with `Session not found`', done => { + const fakeError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as MockError; + + const fakeSession2 = new FakeSession(); + const fakeSnapshot2 = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + (sandbox.stub(fakeSnapshot2, 'begin') as sinon.SinonStub).callsFake( + callback => callback(null) + ); + sandbox.stub(fakeSession2, 'snapshot').returns(fakeSnapshot2); + + getSessionStub + .onFirstCall() + .callsFake(callback => callback(null, fakeSession)) + .onSecondCall() + .callsFake(callback => callback(null, fakeSession2)); + beginSnapshotStub.callsFake(callback => callback(fakeError)); + + // The first session that was not found should be released back into the + // pool, so that the pool can remove it from its inventory. + const releaseStub = sandbox.stub(fakePool, 'release'); + + database.getSnapshot((err, snapshot) => { + assert.ifError(err); + assert.strictEqual(snapshot, fakeSnapshot2); + // The first session that error should already have been released back + // to the pool. + assert.strictEqual(releaseStub.callCount, 1); + // Ending the valid snapshot will release its session back into the + // pool. + snapshot.emit('end'); + assert.strictEqual(releaseStub.callCount, 2); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getSnapshot', + 'CloudSpanner.Database.getSnapshot', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[1]; + assert.strictEqual( + SpanStatusCode.ERROR, + parentSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'Session not found', + parentSpan.status.message.toString(), + 'Mismatched span status message' + ); + + // Ensure that the second span is a child of the first span. + const secondRetrySpan = spans[0]; + assert.ok( + parentSpan.spanContext().traceId, + 'Expected that the initial parent span has a defined traceId' + ); + assert.ok( + secondRetrySpan.spanContext().traceId, + 'Expected that the second retry span has a defined traceId' + ); + assert.deepStrictEqual( + parentSpan.spanContext().traceId, + secondRetrySpan.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + parentSpan.spanContext().spanId, + 'Expected that the initial parent span has a defined spanId' + ); + assert.ok( + secondRetrySpan.spanContext().spanId, + 'Expected that the second retry span has a defined spanId' + ); + assert.deepStrictEqual( + secondRetrySpan.parentSpanId, + parentSpan.spanContext().spanId, + 'Expected that secondRetrySpan is the child to parentSpan' + ); + + const expectedEventNames = ['No session available']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + describe('createBatchTransaction', () => { + const SESSION = {}; + const RESPONSE = {a: 'b'}; + + beforeEach(() => { + database.pool_ = { + getSession(callback) { + callback(null, SESSION); + }, + }; + }); + + it('with session error', done => { + const error = new Error('with session error'); + + database.pool_ = { + getSession(callback) { + callback(error); + }, + }; + + database.createBatchTransaction((err, transaction, resp) => { + assert.strictEqual(err, error); + assert.strictEqual(transaction, null); + assert.strictEqual(resp, undefined); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.createBatchTransaction', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'with session error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with no error', done => { + const opts = {a: 'b'}; + + const fakeTransaction = { + begin(callback) { + callback(null, RESPONSE); + }, + once() {}, + }; + + database.batchTransaction = (identifier, options) => { + assert.deepStrictEqual(identifier, {session: SESSION}); + assert.strictEqual(options, opts); + return fakeTransaction; + }; + + database.createBatchTransaction(opts, (err, transaction, resp) => { + assert.strictEqual(err, null); + assert.strictEqual(transaction, fakeTransaction); + assert.strictEqual(resp, RESPONSE); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.createBatchTransaction', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + `No span status message expected\n\tGot: undefined\n\tWant: ${firstSpan.status.message}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with begin transaction error', done => { + const error = new Error('our createBatchTransaction error'); + + const fakeTransaction = { + begin(callback) { + callback(error, RESPONSE); + }, + + once() {}, + }; + + database.batchTransaction = () => { + return fakeTransaction; + }; + + database.createBatchTransaction((err, transaction, resp) => { + assert.strictEqual(err, error); + assert.strictEqual(transaction, null); + assert.strictEqual(resp, RESPONSE); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.createBatchTransaction', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'our createBatchTransaction error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + describe('getTransaction', () => { + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeTransaction: FakeTransaction; + + let getSessionStub: sinon.SinonStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeTransaction = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => { + callback(null, fakeSession, fakeTransaction); + }); + }); + + it('with pool errors', done => { + const fakeError = new Error('pool error'); + + getSessionStub.callsFake(callback => callback(fakeError)); + + database.getTransaction(err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // In the event of a sessionPool error, we should not have events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `event names mismatch:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'pool error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + }); + + it('with no errors', done => { + database.getTransaction((err, transaction) => { + assert.ifError(err); + assert.strictEqual(transaction, fakeTransaction); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that we have specific events. + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `event names mismatch:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + `Unexpected span status code: ${firstSpan.status.code}` + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + `Unexpected status message: ${firstSpan.status.message}` + ); + + done(); + }); + }); + }); + + describe('writeAtLeastOnce', () => { + const mutations = new MutationSet(); + mutations.insert('MyTable', { + Key: 'k3', + Thing: 'xyz', + }); + + const SESSION = new FakeSession(); + const RESPONSE = {commitTimestamp: {seconds: 1, nanos: 0}}; + const TRANSACTION = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + let pool: FakeSessionPool; + + beforeEach(() => { + pool = database.pool_; + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( + callback => { + callback(null, SESSION, TRANSACTION); + } + ); + }); + + it('should return any errors getting a session', done => { + const fakeErr = new Error('getting session error'); + + (pool.getSession as sinon.SinonStub).callsFake(callback => + callback(fakeErr, null, null) + ); + + database.writeAtLeastOnce(mutations, err => { + assert.deepStrictEqual(err, fakeErr); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getting session error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with empty mutation should return successful CommitResponse', done => { + const fakeMutations = new MutationSet(); + try { + database.writeAtLeastOnce(fakeMutations, (err, response) => { + assert.ifError(err); + assert.deepStrictEqual( + response.commitTimestamp, + RESPONSE.commitTimestamp + ); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + } catch (error) { + assert(error instanceof Error); + } + }); + + it('with error on null mutation should catch thrown error', done => { + const fakeError = new Error('err'); + try { + database.writeAtLeastOnce(null, (err, res) => {}); + } catch (err) { + // Performing a substring search on the error because + // depending on the version of Node.js, the error might be either of: + // * Cannot read properties of null (reading 'proto') + // * Cannot read property 'proto' of null + (err as grpc.ServiceError).message.includes('Cannot read propert'); + (err as grpc.ServiceError).message.includes('of null'); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + + const errorMessage = firstSpan.status.message; + assert.ok( + errorMessage.includes( + "Cannot read properties of null (reading 'proto')" + ) || errorMessage.includes("Cannot read property 'proto' of null") + ); + + // We expect an exception to have been caught as well as a Session event. + const expectedEventNames = ['Using Session', 'exception']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + } + }); + }); + + describe('runTransaction', () => { + const SESSION = new FakeSession(); + const TRANSACTION = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + let pool: FakeSessionPool; + + beforeEach(() => { + pool = database.pool_; + + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( + callback => { + callback(null, SESSION, TRANSACTION); + } + ); + }); + + it('with error getting session', done => { + const fakeErr = new Error('getting a session'); + + (pool.getSession as sinon.SinonStub).callsFake(callback => + callback(fakeErr) + ); + + database.runTransaction(err => { + assert.strictEqual(err, fakeErr); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getting a session', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with other errors when running the transaction', done => { + const fakeError = new Error('internal rejects err'); + + sandbox.stub(FakeTransactionRunner.prototype, 'run').rejects(fakeError); + + database.runTransaction(err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'internal rejects err', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + describe('runStream', () => { + const QUERY = { + sql: 'SELECT * FROM table', + a: 'b', + c: 'd', + }; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeSession2: FakeSession; + let fakeSnapshot: FakeTransaction; + let fakeSnapshot2: FakeTransaction; + let fakeStream: Transform; + let fakeStream2: Transform; + + let getSessionStub: sinon.SinonStub; + let snapshotStub: sinon.SinonStub; + let runStreamStub: sinon.SinonStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeSession2 = new FakeSession(); + fakeSnapshot = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + fakeSnapshot2 = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + fakeStream = through.obj(); + fakeStream2 = through.obj(); + + getSessionStub = (sandbox.stub(fakePool, 'getSession') as sinon.SinonStub) + .onFirstCall() + .callsFake(callback => callback(null, fakeSession)) + .onSecondCall() + .callsFake(callback => callback(null, fakeSession2)); + + snapshotStub = sandbox + .stub(fakeSession, 'snapshot') + .returns(fakeSnapshot); + + sandbox.stub(fakeSession2, 'snapshot').returns(fakeSnapshot2); + + runStreamStub = sandbox + .stub(fakeSnapshot, 'runStream') + .returns(fakeStream); + + sandbox.stub(fakeSnapshot2, 'runStream').returns(fakeStream2); + }); + + it('with error on `getSession`', done => { + const fakeError = new Error('getSession error'); + + getSessionStub.onFirstCall().callsFake(callback => callback(fakeError)); + + database.runStream(QUERY).on('error', err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getSession error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('propagation on stream/transaction errors', done => { + const fakeError = new Error('propagation err'); + const endStub = sandbox.stub(fakeSnapshot, 'end'); + + database.runStream(QUERY).on('error', err => { + assert.strictEqual(err, fakeError); + assert.strictEqual(endStub.callCount, 1); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'propagation err', + firstSpan.status.message, + 'Mismatched span status message' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + + fakeStream.destroy(fakeError); + }); + + it('retries with "Session not found" error', done => { + const sessionNotFoundError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as grpc.ServiceError; + const endStub = sandbox.stub(fakeSnapshot, 'end'); + const endStub2 = sandbox.stub(fakeSnapshot2, 'end'); + let rows = 0; + + database + .runStream(QUERY) + .on('data', () => rows++) + .on('error', err => { + assert.fail(err); + }) + .on('end', () => { + assert.strictEqual(endStub.callCount, 1); + assert.strictEqual(endStub2.callCount, 1); + assert.strictEqual(rows, 1); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const secondSpan = spans[1]; + assert.strictEqual( + SpanStatusCode.ERROR, + secondSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'Session not found', + secondSpan.status.message, + 'Mismatched span status message' + ); + + // Ensure that the final span that got retries did not error. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + const expectedEventNames = [ + 'Using Session', + 'Using Session', + 'No session available', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + + fakeStream.emit('error', sessionNotFoundError); + fakeStream2.push('row1'); + fakeStream2.push(null); + }); + }); +}); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts new file mode 100644 index 000000000..933e9bf08 --- /dev/null +++ b/observability-test/spanner.ts @@ -0,0 +1,442 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {grpc} from 'google-gax'; +import {google} from '../protos/protos'; +import {Database, Spanner} from '../src'; +import {MutationSet} from '../src/transaction'; +import protobuf = google.spanner.v1; +import * as mock from '../test/mockserver/mockspanner'; +import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; +import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const {disableContextAndManager, setGlobalContextManager} = require('./helper'); +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); + +/** A simple result set for SELECT 1. */ +function createSelect1ResultSet(): protobuf.ResultSet { + const fields = [ + protobuf.StructType.Field.create({ + name: 'NUM', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + ]; + const metadata = new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + return protobuf.ResultSet.create({ + metadata, + rows: [{values: [{stringValue: '1'}]}], + }); +} + +interface setupResults { + server: grpc.Server; + spanner: Spanner; + spannerMock: mock.MockSpanner; +} + +async function setup(): Promise { + const server = new grpc.Server(); + + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + + const port: number = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + const spanner = new Spanner({ + projectId: 'observability-project-id', + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + }); + + return Promise.resolve({ + spanner: spanner, + server: server, + spannerMock: spannerMock, + }); +} + +describe('EndToEnd', () => { + describe('Database', () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); + + afterEach(() => { + disableContextAndManager(contextManager); + }); + + beforeEach(async () => { + const setupResult = await setup(); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const selectSql = 'SELECT 1'; + const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(1) + ); + + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const instance = spanner.instance('instance'); + database = instance.database('database'); + database.observabilityConfig = { + tracerProvider: provider, + enableExtendedTracing: false, + }; + }); + + afterEach(() => { + traceExporter.reset(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + it('getSessions', async () => { + const [rows] = await database.getSessions(); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('getSnapshot', done => { + database.getSnapshot((err, transaction) => { + assert.ifError(err); + + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('getTransaction', done => { + database.getTransaction((err, transaction) => { + assert.ifError(err); + assert.ok(transaction); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('runStream', done => { + database + .runStream('SELECT 1') + .on('data', row => {}) + .on('error', assert.ifError) + .on('end', () => { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('run', async () => { + const [rows] = await database.run('SELECT 1'); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that RunStream is a child span of createQueryPartitions. + const spanRunStream = spans[0]; + const spanRun = spans[1]; + assert.ok( + spanRun.spanContext().traceId, + 'Expected that createQueryPartitions has a defined traceId' + ); + assert.ok( + spanRunStream.spanContext().traceId, + 'Expected that RunStream has a defined traceId' + ); + assert.ok( + spanRun.spanContext().spanId, + 'Expected that createQueryPartitions has a defined spanId' + ); + assert.ok( + spanRunStream.spanContext().spanId, + 'Expected that RunStream has a defined spanId' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('runTransaction', done => { + database.runTransaction((err, transaction) => { + assert.ifError(err); + transaction!.run('SELECT 1', (err, rows) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('writeAtLeastOnce', done => { + const blankMutations = new MutationSet(); + database.writeAtLeastOnce(blankMutations, (err, response) => { + assert.ifError(err); + assert.ok(response); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); +}); diff --git a/observability-test/table.ts b/observability-test/table.ts index 9b20780ae..00071510c 100644 --- a/observability-test/table.ts +++ b/observability-test/table.ts @@ -185,7 +185,7 @@ describe('Table', () => { const gotSpanStatus = gotSpans[0].status; const wantSpanStatus = { code: SpanStatusCode.ERROR, - message: fakeError.toString(), + message: fakeError.message, }; assert.deepStrictEqual( gotSpanStatus, @@ -241,7 +241,7 @@ describe('Table', () => { const gotSpanStatus = gotSpans[0].status; const wantSpanStatus = { code: SpanStatusCode.ERROR, - message: fakeError.toString(), + message: fakeError.message, }; assert.deepStrictEqual( gotSpanStatus, @@ -294,7 +294,7 @@ describe('Table', () => { const gotSpanStatus = gotSpans[0].status; const wantSpanStatus = { code: SpanStatusCode.ERROR, - message: fakeError.toString(), + message: fakeError.message, }; assert.deepStrictEqual( gotSpanStatus, diff --git a/src/database.ts b/src/database.ts index 70a650675..a805d7a84 100644 --- a/src/database.ts +++ b/src/database.ts @@ -92,7 +92,7 @@ import { Schema, addLeaderAwareRoutingHeader, } from './common'; -import {Duplex, Readable, Transform} from 'stream'; +import {finished, Duplex, Readable, Transform} from 'stream'; import {PreciseDate} from '@google-cloud/precise-date'; import {EnumKey, RequestConfig, TranslateEnumKeys, Spanner} from '.'; import arrify = require('arrify'); @@ -102,7 +102,14 @@ import Policy = google.iam.v1.Policy; import FieldMask = google.protobuf.FieldMask; import IDatabase = google.spanner.admin.database.v1.IDatabase; import snakeCase = require('lodash.snakecase'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + Span, + getActiveOrNoopSpan, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type GetDatabaseRolesCallback = RequestCallback< IDatabaseRole, @@ -337,6 +344,7 @@ class Database extends common.GrpcServiceObject { databaseDialect?: EnumKey< typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect > | null; + observabilityConfig: ObservabilityOptions | undefined; constructor( instance: Instance, name: string, @@ -459,6 +467,7 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); + this.observabilityConfig = instance.observabilityConfig; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -684,6 +693,7 @@ class Database extends common.GrpcServiceObject { const sessions = (resp!.session || []).map(metadata => { const session = this.session(metadata.name!); + session.observabilityConfig = this.observabilityConfig; session.metadata = metadata; return session; }); @@ -817,26 +827,33 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError, null, undefined); - return; - } - const transaction = this.batchTransaction({session: session!}, options); - this._releaseOnEnd(session!, transaction); - transaction.begin((err, resp) => { - const span = getActiveOrNoopSpan(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.createBatchTransaction', q, span => { + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - } - callback!(err, null, resp!); + setSpanError(span, err); + span.end(); + callback!(err as ServiceError, null, undefined); return; } - span.addEvent('Using Session', {'session.id': session?.id}); - callback!(null, transaction, resp!); + const transaction = this.batchTransaction({session: session!}, options); + this._releaseOnEnd(session!, transaction, span); + transaction.begin((err, resp) => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + } + span.end(); + callback!(err, null, resp!); + return; + } + span.addEvent('Using Session', {'session.id': session?.id}); + span.end(); + callback!(null, transaction, resp!); + }); }); }); } @@ -1081,11 +1098,12 @@ class Database extends common.GrpcServiceObject { * @param {Transaction} transaction The transaction to observe. * @returns {Transaction} */ - private _releaseOnEnd(session: Session, transaction: Snapshot) { + private _releaseOnEnd(session: Session, transaction: Snapshot, span: Span) { transaction.once('end', () => { try { this.pool_.release(session); } catch (e) { + setSpanErrorAndException(span, e as Error); this.emit('error', e); } }); @@ -1855,32 +1873,40 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - this.request< - google.spanner.v1.ISession, - google.spanner.v1.IListSessionsResponse - >( - { - client: 'SpannerClient', - method: 'listSessions', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }, - (err, sessions, nextPageRequest, ...args) => { - let sessionInstances: Session[] | null = null; - if (sessions) { - sessionInstances = sessions.map(metadata => { - const session = self.session(metadata.name!); - session.metadata = metadata; - return session; - }); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSessions', q, span => { + this.request< + google.spanner.v1.ISession, + google.spanner.v1.IListSessionsResponse + >( + { + client: 'SpannerClient', + method: 'listSessions', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }, + (err, sessions, nextPageRequest, ...args) => { + if (err) { + setSpanError(span, err); + } + let sessionInstances: Session[] | null = null; + if (sessions) { + sessionInstances = sessions.map(metadata => { + const session = self.session(metadata.name!); + session.metadata = metadata; + session.observabilityConfig = this.observabilityConfig; + return session; + }); + } + span.end(); + const nextQuery = nextPageRequest! + ? extend({}, options, nextPageRequest!) + : null; + callback!(err, sessionInstances!, nextQuery, ...args); } - const nextQuery = nextPageRequest! - ? extend({}, options, nextPageRequest!) - : null; - callback!(err, sessionInstances!, nextQuery, ...args); - } - ); + ); + }); } /** @@ -2030,34 +2056,44 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError); - return; - } - - const snapshot = session!.snapshot(options, this.queryOptions_); - - snapshot.begin(err => { - const span = getActiveOrNoopSpan(); + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getSnapshot', q, span => { + this.pool_.getSession((err, session) => { if (err) { - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - session!.lastError = err; - this.pool_.release(session!); - this.getSnapshot(options, callback!); - } else { - span.addEvent('Using Session', {'session.id': session?.id}); - this.pool_.release(session!); - callback!(err); - } + setSpanError(span, err); + span.end(); + callback!(err as ServiceError); return; } - this._releaseOnEnd(session!, snapshot); - callback!(err, snapshot); + const snapshot = session!.snapshot(options, this.queryOptions_); + + snapshot.begin(err => { + if (err) { + setSpanError(span, err); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + session!.lastError = err; + this.pool_.release(session!); + this.getSnapshot(options, (err, snapshot) => { + span.end(); + callback!(err, snapshot); + }); + } else { + span.addEvent('Using Session', {'session.id': session?.id}); + this.pool_.release(session!); + span.end(); + callback!(err); + } + return; + } + + this._releaseOnEnd(session!, snapshot, span); + span.end(); + callback!(err, snapshot); + }); }); }); } @@ -2120,30 +2156,37 @@ class Database extends common.GrpcServiceObject { typeof optionsOrCallback === 'object' && optionsOrCallback ? (optionsOrCallback as GetTransactionOptions) : {}; - this.pool_.getSession((err, session, transaction) => { - if (options.requestOptions) { - transaction!.requestOptions = Object.assign( - transaction!.requestOptions || {}, - options.requestOptions - ); - } - if (options.optimisticLock) { - transaction!.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction!.excludeTxnFromChangeStreams(); - } - if (!err) { - const span = getActiveOrNoopSpan(); - span.addEvent('Using Session', {'session.id': session?.id}); - this._releaseOnEnd(session!, transaction!); - } else if (isSessionNotFoundError(err as grpc.ServiceError)) { - const span = getActiveOrNoopSpan(); - span.addEvent('No session available', { - 'session.id': session?.id, - }); - } - cb!(err as grpc.ServiceError | null, transaction); + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.getTransaction', q, span => { + this.pool_.getSession((err, session, transaction) => { + if (options.requestOptions) { + transaction!.requestOptions = Object.assign( + transaction!.requestOptions || {}, + options.requestOptions + ); + } + if (options.optimisticLock) { + transaction!.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction!.excludeTxnFromChangeStreams(); + } + + if (!err) { + span.addEvent('Using Session', {'session.id': session?.id}); + this._releaseOnEnd(session!, transaction!, span); + } else if (isSessionNotFoundError(err as grpc.ServiceError)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + setSpanError(span, err); + } else { + setSpanError(span, err); + } + span.end(); + cb!(err as grpc.ServiceError | null, transaction); + }); }); } @@ -2324,6 +2367,7 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetDatabaseRolesOptions).pageSize; delete (gaxOpts as GetDatabaseRolesOptions).pageToken; } + this.request< IDatabaseRole, databaseAdmin.spanner.admin.database.v1.ListDatabaseRolesResponse @@ -2368,6 +2412,7 @@ class Database extends common.GrpcServiceObject { callback!(err as ServiceError, null); return; } + const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); config.reqOpts.session = session!.formattedName_; @@ -2739,20 +2784,27 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - this.runStream(query, options) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - } - }) - .on('stats', _stats => (stats = _stats)) - .on('data', row => { - rows.push(row); - }) - .on('end', () => { - callback!(null, rows, stats, metadata); - }); + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.run', q, span => { + this.runStream(query, options) + .on('error', err => { + setSpanError(span, err); + callback!(err as grpc.ServiceError, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + } + }) + .on('stats', _stats => (stats = _stats)) + .on('data', row => { + rows.push(row); + }) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** * Partitioned DML transactions are used to execute DML statements with a @@ -2953,60 +3005,75 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const span = getActiveOrNoopSpan(); - - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } + const q = {sql: query, opts: this.observabilityConfig}; + return startTrace('Database.runStream', q, span => { + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + proxyStream.destroy(err); + span.end(); + return; + } - const span = getActiveOrNoopSpan(); - span.addEvent('Using Session', {'session.id': session?.id}); + span.addEvent('Using Session', {'session.id': session?.id}); - const snapshot = session!.snapshot(options, this.queryOptions_); + const snapshot = session!.snapshot(options, this.queryOptions_); + + this._releaseOnEnd(session!, snapshot, span); + + let dataReceived = false; + let dataStream = snapshot.runStream(query); + + const endListener = () => { + span.end(); + snapshot.end(); + }; + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + setSpanError(span, err); + + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If it is a 'Session not found' error and we have not yet received + // any data, we can safely retry the query on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', { + 'session.id': session?.id, + }); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.removeListener('end', endListener); + dataStream.end(); + snapshot.end(); + // Create a new data stream and add it to the end user stream. + dataStream = this.runStream(query, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + snapshot.end(); + } + }) + .on('stats', stats => proxyStream.emit('stats', stats)) + .on('response', response => proxyStream.emit('response', response)) + .once('end', endListener) + .pipe(proxyStream); + }); - this._releaseOnEnd(session!, snapshot); + finished(proxyStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); - let dataReceived = false; - let dataStream = snapshot.runStream(query); - const endListener = () => snapshot.end(); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If it is a 'Session not found' error and we have not yet received - // any data, we can safely retry the query on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; - } - span.addEvent('No session available', { - 'session.id': session?.id, - }); - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.removeListener('end', endListener); - dataStream.end(); - snapshot.end(); - // Create a new data stream and add it to the end user stream. - dataStream = this.runStream(query, options); - dataStream.pipe(proxyStream); - } else { - proxyStream.destroy(err); - snapshot.end(); - } - }) - .on('stats', stats => proxyStream.emit('stats', stats)) - .on('response', response => proxyStream.emit('response', response)) - .once('end', endListener) - .pipe(proxyStream); + return proxyStream as PartialResultStream; }); - - return proxyStream as PartialResultStream; } /** @@ -3116,49 +3183,71 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.runTransaction(options, runFn!); - return; - } - if (err) { - runFn!(err as grpc.ServiceError); - return; - } - if (options.optimisticLock) { - transaction!.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction!.excludeTxnFromChangeStreams(); - } - - const release = this.pool_.release.bind(this.pool_, session!); - const runner = new TransactionRunner( - session!, - transaction!, - runFn!, - options - ); + const q = {opts: this.observabilityConfig}; + startTrace('Database.runTransaction', q, span => { + this.pool_.getSession((err, session?, transaction?) => { + if (err) { + setSpanError(span, err); + } - runner.run().then(release, err => { - const span = getActiveOrNoopSpan(); - if (isSessionNotFoundError(err)) { + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { 'session.id': session?.id, }); - release(); this.runTransaction(options, runFn!); - } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } - setImmediate(runFn!, err); - release(); + span.end(); + return; + } + + if (err) { + span.end(); + runFn!(err as grpc.ServiceError); + return; } + if (options.optimisticLock) { + transaction!.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction!.excludeTxnFromChangeStreams(); + } + + const release = () => { + span.end(); + this.pool_.release(session!); + }; + + const runner = new TransactionRunner( + session!, + transaction!, + (err, resp) => { + if (err) { + setSpanError(span, err!); + } + span.end(); + runFn!(err, resp); + }, + options + ); + + runner.run().then(release, err => { + if (err) { + setSpanError(span, err!); + } + + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + release(); + this.runTransaction(options, runFn!); + } else { + if (!err) { + span.addEvent('Using Session', {'session.id': session!.id}); + } + setImmediate(runFn!, err); + release(); + } + }); }); }); } @@ -3350,15 +3439,15 @@ class Database extends common.GrpcServiceObject { ): NodeJS.ReadableStream { const proxyStream: Transform = through.obj(); + const span = getActiveOrNoopSpan(); + this.pool_.getSession((err, session) => { if (err) { proxyStream.destroy(err); return; } - const span = getActiveOrNoopSpan(); span.addEvent('Using Session', {'session.id': session?.id}); - const gaxOpts = extend(true, {}, options?.gaxOptions); const reqOpts = Object.assign( {} as spannerClient.spanner.v1.BatchWriteRequest, @@ -3390,7 +3479,9 @@ class Database extends common.GrpcServiceObject { if (session) { session.lastError = err as grpc.ServiceError; } - span.addEvent('No session available', {'session.id': session?.id}); + span.addEvent('No session available', { + 'session.id': session?.id, + }); // Remove the current data stream from the end user stream. dataStream.unpipe(proxyStream); dataStream.end(); @@ -3401,7 +3492,9 @@ class Database extends common.GrpcServiceObject { proxyStream.destroy(err); } }) - .once('end', () => this.pool_.release(session!)) + .once('end', () => { + this.pool_.release(session!); + }) .pipe(proxyStream); }); @@ -3482,23 +3575,41 @@ class Database extends common.GrpcServiceObject { typeof optionsOrCallback === 'object' && optionsOrCallback ? (optionsOrCallback as CallOptions) : {}; - this.pool_.getSession((err, session?, transaction?) => { - const span = getActiveOrNoopSpan(); - if (err && isSessionNotFoundError(err as grpc.ServiceError)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - this.writeAtLeastOnce(mutations, options, cb!); - return; - } - if (err) { - cb!(err as grpc.ServiceError); - return; - } - span.addEvent('Using Session', {'session.id': session?.id}); - this._releaseOnEnd(session!, transaction!); - transaction?.setQueuedMutations(mutations.proto()); - return transaction?.commit(options, cb!); + + const q = {opts: this.observabilityConfig}; + return startTrace('Database.writeAtLeastOnce', q, span => { + this.pool_.getSession((err, session?, transaction?) => { + if (err && isSessionNotFoundError(err as grpc.ServiceError)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + this.writeAtLeastOnce(mutations, options, cb!); + span.end(); + return; + } + if (err) { + setSpanError(span, err); + span.end(); + cb!(err as grpc.ServiceError); + return; + } + span.addEvent('Using Session', {'session.id': session?.id}); + this._releaseOnEnd(session!, transaction!, span); + try { + transaction?.setQueuedMutations(mutations.proto()); + return transaction?.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + cb!(err, resp); + }); + } catch (e) { + setSpanErrorAndException(span, e as Error); + span.end(); + throw e; + } + }); }); } diff --git a/src/instrument.ts b/src/instrument.ts index c7a4059e4..f0676ea46 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -51,6 +51,7 @@ interface observabilityOptions { } export type {observabilityOptions as ObservabilityOptions}; +export type {Span}; const TRACER_NAME = 'cloud.google.com/nodejs/spanner'; const TRACER_VERSION = require('../../package.json').version; @@ -149,9 +150,15 @@ export function setSpanError(span: Span, err: Error | String): boolean { return false; } + let message = ''; + if (typeof err === 'object' && 'message' in err) { + message = err.message as string; + } else { + message = err.toString(); + } span.setStatus({ code: SpanStatusCode.ERROR, - message: err.toString(), + message: message, }); return true; } diff --git a/test/database.ts b/test/database.ts index e061b0ef5..ca8ee896e 100644 --- a/test/database.ts +++ b/test/database.ts @@ -840,9 +840,13 @@ describe('Database', () => { try { database.writeAtLeastOnce(null, (err, res) => {}); } catch (err) { - (err as grpc.ServiceError).message.includes( - "Cannot read properties of null (reading 'proto')" + const errorMessage = (err as grpc.ServiceError).message; + assert.ok( + errorMessage.includes( + "Cannot read properties of null (reading 'proto')" + ) || errorMessage.includes("Cannot read property 'proto' of null") ); + done(); } }); @@ -2961,7 +2965,9 @@ describe('Database', () => { assert.strictEqual(session, SESSION); assert.strictEqual(transaction, TRANSACTION); - assert.strictEqual(runFn, fakeRunFn); + // Given that we've wrapped the transaction runner with observability + // tracing, directly comparing values runFn and fakeRunFn. + // assert.strictEqual(runFn, fakeRunFn); assert.deepStrictEqual(options, {}); }); diff --git a/test/spanner.ts b/test/spanner.ts index 544388c4d..cdb41b689 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -73,7 +73,7 @@ const { InMemorySpanExporter, } = require('@opentelemetry/sdk-trace-node'); const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); -const {startTrace} = require('../src/instrument'); +const {startTrace, ObservabilityOptions} = require('../src/instrument'); function numberToEnglishWord(num: number): string { switch (num) { @@ -5012,8 +5012,10 @@ describe('Spanner with mock server', () => { await provider.shutdown(); }); - startTrace('aSpan', {opts: {tracerProvider: provider}}, span => { + const opts: typeof ObservabilityOptions = {tracerProvider: provider}; + startTrace('aSpan', {opts: opts}, span => { const database = newTestDatabase(); + database.observabilityConfig = opts; async function runIt() { const query = { @@ -5043,16 +5045,7 @@ describe('Spanner with mock server', () => { gotEventNames.push(event.name); }); - const wantEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Acquiring session', - 'Waiting for a session to become available', - // 'Requested for 25 sessions returned 25 sessions', - // 'Acquired session', - // 'Creating Transaction', - // 'Transaction Creation Done', - ]; + const wantEventNames = ['Requesting 25 sessions', 'Creating 25 sessions']; assert.deepEqual( gotEventNames,