From f489c9479fa5402f0c960cf896fd3be0e946f182 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 10 Oct 2024 02:44:26 -0700 Subject: [PATCH] feat: (observability) trace Database.batchCreateSessions + SessionPool.createSessions (#2145) This change adds tracing for Database.batchCreateSessions as well as SessionPool.createSessions which was raised as a big need. This change is a premise to finishing up tracing Transaction. While here, also folded in the async/await fix to avoid day+ long code review lag and then 3+ hours just to run tests per PR: OpenTelemetry cannot work correctly for async/await if there isn't a set AsyncHooksManager, but we should not burden our customers with this type of specialist knowledge, their code should just work and this change performs such a check. Later on we shall file a feature request with the OpenTelemetry-JS API group to give us a hook to detect if we've got a live asyncHooksManager instead of this mandatory comparison to ROOT_CONTEXT each time. Fixes #2146 Updates #2079 Spun out of PR #2122 Supersedes PR #2147 --- observability-test/database.ts | 112 +++++- observability-test/session-pool.ts | 222 +++++++++++ observability-test/spanner.ts | 583 ++++++++++++++++++++++++----- observability-test/transaction.ts | 5 +- package.json | 2 +- src/database.ts | 60 +-- src/instance.ts | 1 + src/instrument.ts | 27 ++ src/session-pool.ts | 84 +++-- src/table.ts | 1 + test/spanner.ts | 71 ++-- 11 files changed, 982 insertions(+), 186 deletions(-) create mode 100644 observability-test/session-pool.ts diff --git a/observability-test/database.ts b/observability-test/database.ts index cbcc73572..d4dcea825 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -375,6 +375,115 @@ describe('Database', () => { }); }); + describe('batchCreateSessions', () => { + it('without error', done => { + const ARGS = [null, [{}]]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ifError(err); + assert.ok(sessions); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span didn't encounter an error. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with error', done => { + const ARGS = [new Error('batchCreateSessions.error'), null]; + database.request = (config, callback) => { + callback(...ARGS); + }; + + database.batchCreateSessions(10, (err, sessions) => { + assert.ok(err); + assert.ok(!sessions); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'batchCreateSessions.error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + // We don't expect events. + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + describe('getSnapshot', () => { let fakePool: FakeSessionPool; let fakeSession: FakeSession; @@ -409,7 +518,7 @@ describe('Database', () => { getSessionStub.callsFake(callback => callback(fakeError, null)); - database.getSnapshot((err, snapshot) => { + database.getSnapshot(err => { assert.strictEqual(err, fakeError); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -1027,7 +1136,6 @@ describe('Database', () => { }); it('with error on null mutation should catch thrown error', done => { - const fakeError = new Error('err'); try { database.writeAtLeastOnce(null, (err, res) => {}); } catch (err) { diff --git a/observability-test/session-pool.ts b/observability-test/session-pool.ts new file mode 100644 index 000000000..e92b42b0a --- /dev/null +++ b/observability-test/session-pool.ts @@ -0,0 +1,222 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import * as extend from 'extend'; +import PQueue from 'p-queue'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; +import stackTrace = require('stack-trace'); +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +// eslint-disable-next-line n/no-extraneous-require +const {SpanStatusCode} = require('@opentelemetry/api'); + +import {Database} from '../src/database'; +import {Session} from '../src/session'; +import * as sp from '../src/session-pool'; + +let pQueueOverride: typeof PQueue | null = null; + +function FakePQueue(options) { + return new (pQueueOverride || PQueue)(options); +} + +FakePQueue.default = FakePQueue; + +class FakeTransaction { + options; + constructor(options?) { + this.options = options; + } + async begin(): Promise {} +} + +const fakeStackTrace = extend({}, stackTrace); + +describe('SessionPool', () => { + let sessionPool: sp.SessionPool; + // tslint:disable-next-line variable-name + let SessionPool: typeof sp.SessionPool; + + function noop() {} + const DATABASE = { + batchCreateSessions: noop, + databaseRole: 'parent_role', + } as unknown as Database; + + const sandbox = sinon.createSandbox(); + const shouldNotBeCalled = sandbox.stub().throws('Should not be called.'); + + const createSession = (name = 'id', props?): Session => { + props = props || {}; + + return Object.assign(new Session(DATABASE, name), props, { + create: sandbox.stub().resolves(), + delete: sandbox.stub().resolves(), + keepAlive: sandbox.stub().resolves(), + transaction: sandbox.stub().returns(new FakeTransaction()), + }); + }; + + before(() => { + SessionPool = proxyquire('../src/session-pool.js', { + 'p-queue': FakePQueue, + 'stack-trace': fakeStackTrace, + }).SessionPool; + }); + + afterEach(() => { + pQueueOverride = null; + sandbox.restore(); + }); + + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + beforeEach(() => { + DATABASE.session = createSession; + DATABASE._observabilityOptions = { + tracerProvider: provider, + }; + sessionPool = new SessionPool(DATABASE); + sessionPool._observabilityOptions = DATABASE._observabilityOptions; + traceExporter.reset(); + }); + + describe('_createSessions', () => { + const OPTIONS = 3; + it('on exception from Database.batchCreateSessions', async () => { + const ourException = new Error('this fails intentionally'); + const stub = sandbox + .stub(DATABASE, 'batchCreateSessions') + .throws(ourException); + const releaseStub = sandbox.stub(sessionPool, 'release'); + + assert.rejects(async () => { + await sessionPool._createSessions(OPTIONS); + }, ourException); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.SessionPool.createSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 0', + 'exception', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + ourException.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + + it('without error', async () => { + const RESPONSE = [[{}, {}, {}]]; + + const stub = sandbox + .stub(DATABASE, 'batchCreateSessions') + .resolves(RESPONSE); + const releaseStub = sandbox.stub(sessionPool, 'release'); + + await sessionPool._createSessions(OPTIONS); + assert.strictEqual(sessionPool.size, 3); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.SessionPool.createSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 3', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + }); +}); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 3e9cc295b..322fcc1b9 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -36,6 +36,7 @@ const { } = require('@opentelemetry/context-async-hooks'); const {ObservabilityOptions} = require('../src/instrument'); +import {SessionPool} from '../src/session-pool'; const selectSql = 'SELECT 1'; const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; @@ -113,65 +114,61 @@ async function setup( } describe('EndToEnd', () => { - describe('Database', () => { - let server: grpc.Server; - let spanner: Spanner; - let database: Database; - let spannerMock: mock.MockSpanner; - let traceExporter: typeof InMemorySpanExporter; + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; - const contextManager = new AsyncHooksContextManager(); - setGlobalContextManager(contextManager); + const contextManager = new AsyncHooksContextManager(); + setGlobalContextManager(contextManager); - afterEach(() => { - disableContextAndManager(contextManager); + afterEach(() => { + disableContextAndManager(contextManager); + }); + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - beforeEach(async () => { - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); - spanner = setupResult.spanner; - server = setupResult.server; - spannerMock = setupResult.spannerMock; - - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; - spannerMock.putStatementResult( - selectSql, - mock.StatementResult.resultSet(createSelect1ResultSet()) - ); - spannerMock.putStatementResult( - updateSql, - mock.StatementResult.updateCount(1) - ); + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, + }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; - const instance = spanner.instance('instance'); - database = instance.database('database'); - }); + const instance = spanner.instance('instance'); + database = instance.database('database'); + + // To deflake expectations of session creation, let's + // issue out a warm-up request request that'll ensure + // that the SessionPool is created deterministically. + const [rows] = await database.run('SELECT 1'); + // Clear out any present traces to make a clean slate for testing. + traceExporter.forceFlush(); + traceExporter.reset(); + }); - afterEach(() => { - traceExporter.reset(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); + afterEach(() => { + traceExporter.reset(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); - traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -228,7 +225,13 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Begin Transaction']; + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -247,7 +250,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -265,7 +267,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -304,7 +311,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -366,7 +378,12 @@ describe('EndToEnd', () => { 'Expected that RunStream has a defined spanId' ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -403,7 +420,12 @@ describe('EndToEnd', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Transaction Creation Done', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -446,6 +468,9 @@ describe('EndToEnd', () => { const expectedEventNames = [ 'Starting Commit', 'Commit Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', 'Using Session', ]; assert.deepStrictEqual( @@ -457,50 +482,146 @@ describe('EndToEnd', () => { done(); }); }); + + it('batchCreateSessions', done => { + database.batchCreateSessions(5, (err, sessions) => { + assert.ifError(err); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); }); }); -describe('ObservabilityOptions injection and propagation', async () => { - const globalTraceExporter = new InMemorySpanExporter(); - const globalTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: globalTraceExporter, +describe('SessionPool', async () => { + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const provider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, }); - globalTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(globalTraceExporter) - ); - globalTracerProvider.register(); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - const injectedTraceExporter = new InMemorySpanExporter(); - const injectedTracerProvider = new NodeTracerProvider({ - sampler: new AlwaysOnSampler(), - exporter: injectedTraceExporter, + const setupResult = await setup({ + tracerProvider: provider, + enableExtendedTracing: false, }); - injectedTracerProvider.addSpanProcessor( - new SimpleSpanProcessor(injectedTraceExporter) - ); - const observabilityOptions: typeof ObservabilityOptions = { - tracerProvider: injectedTracerProvider, - enableExtendedTracing: true, - }; - - const setupResult = await setup(observabilityOptions); const spanner = setupResult.spanner; const server = setupResult.server; const spannerMock = setupResult.spannerMock; + const instance = spanner.instance('instance'); after(async () => { - globalTraceExporter.reset(); - injectedTraceExporter.reset(); - await globalTracerProvider.shutdown(); - await injectedTracerProvider.shutdown(); + traceExporter.reset(); + await provider.shutdown(); spannerMock.resetRequests(); spanner.close(); server.tryShutdown(() => {}); }); - it('Passed into Spanner, Instance and Database', done => { + it('_createSessions', async () => { + // The first invocation of new SessionPool shall implicitly happen in here. + const database = instance.database('database'); + await database.run('SELECT 1'); + + await provider.forceFlush(); + traceExporter.reset(); + + // Explicitly invoking new SessionPool. + const sessionPool = new SessionPool(database); + + const OPTIONS = 3; + await sessionPool._createSessions(OPTIONS); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 3 sessions', + 'Creating 3 sessions', + 'Requested for 3 sessions returned 3', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); +}); + +describe('ObservabilityOptions injection and propagation', async () => { + it('Passed into Spanner, Instance and Database', async () => { + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + traceExporter.reset(); + await tracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + // Ensure that the same observability configuration is set on the Spanner client. assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions); @@ -534,23 +655,50 @@ describe('ObservabilityOptions injection and propagation', async () => { databaseByConstructor._observabilityOptions, observabilityOptions ); - - done(); }); - afterEach(async () => { - await injectedTracerProvider.forceFlush(); - injectedTraceExporter.reset(); - }); + describe('Transaction', async () => { + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + traceExporter.reset(); + await tracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); - let database: Database; - beforeEach(() => { - const instance = spanner.instance('instance'); - database = instance.database('db'); - }); + let database: Database; + beforeEach(async () => { + const instance = spanner.instance('instance'); + database = instance.database('database'); - describe('Transaction', () => { - const traceExporter = injectedTraceExporter; + // To deflake expectations of session creation, let's + // issue out a warm-up request request that'll ensure + // that the SessionPool is created deterministically. + const [rows] = await database.run('SELECT 1'); + // Clear out any present traces to make a clean slate for testing. + traceExporter.forceFlush(); + traceExporter.reset(); + }); + + afterEach(() => { + spannerMock.resetRequests(); + }); it('run', done => { database.getTransaction((err, tx) => { @@ -582,11 +730,8 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', 'Acquiring session', - 'Waiting for a session to become available', + 'Cache hit: has usable session', 'Acquired session', 'Using Session', 'Transaction Creation Done', @@ -643,7 +788,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Begin Transaction', 'Transaction Creation Done', ]; - assert.strictEqual( + assert.deepStrictEqual( actualEventNames.every(value => expectedEventNames.includes(value)), true, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` @@ -689,7 +834,12 @@ describe('ObservabilityOptions injection and propagation', async () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -758,7 +908,43 @@ describe('ObservabilityOptions injection and propagation', async () => { }); }); - it('Propagates spans to the injected not global TracerProvider', done => { + it('Propagates spans to the injected not global TracerProvider', async () => { + const globalTraceExporter = new InMemorySpanExporter(); + const globalTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: globalTraceExporter, + }); + globalTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(globalTraceExporter) + ); + globalTracerProvider.register(); + + const injectedTraceExporter = new InMemorySpanExporter(); + const injectedTracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: injectedTraceExporter, + }); + injectedTracerProvider.addSpanProcessor( + new SimpleSpanProcessor(injectedTraceExporter) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: injectedTracerProvider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + const spanner = setupResult.spanner; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + + after(async () => { + injectedTraceExporter.reset(); + await injectedTracerProvider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + const instance = spanner.instance('instance'); const database = instance.database('database'); @@ -794,6 +980,8 @@ describe('ObservabilityOptions injection and propagation', async () => { }); const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', @@ -805,6 +993,9 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -815,7 +1006,199 @@ describe('ObservabilityOptions injection and propagation', async () => { expectedEventNames, `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + }); + }); +}); + +describe('E2E traces with async/await', async () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + let provider: typeof TracerProvider; + let observabilityOptions: typeof ObservabilityOptions; + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + provider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + observabilityOptions = { + tracerProvider: provider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + }); + + afterEach(async () => { + traceExporter.reset(); + provider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + function assertAsyncAwaitExpectations() { + // See https://github.com/googleapis/nodejs-spanner/issues/2146. + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // We need to ensure a strict relationship between the spans. + // runSpan -------------------| + // |-runStream ----------| + const runStreamSpan = spans[spans.length - 2]; + const runSpan = spans[spans.length - 1]; + assert.ok( + runSpan.spanContext().traceId, + 'Expected that runSpan has a defined traceId' + ); + assert.ok( + runStreamSpan.spanContext().traceId, + 'Expected that runStreamSpan has a defined traceId' + ); + assert.deepStrictEqual( + runStreamSpan.parentSpanId, + runSpan.spanContext().spanId, + `Expected that runSpan(spanId=${runSpan.spanContext().spanId}) is the parent to runStreamSpan(parentSpanId=${runStreamSpan.parentSpanId})` + ); + assert.deepStrictEqual( + runSpan.spanContext().traceId, + runStreamSpan.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + runStreamSpan.spanContext().spanId, + 'Expected that runStreamSpan has a defined spanId' + ); + assert.ok( + runSpan.spanContext().spanId, + 'Expected that runSpan has a defined spanId' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + runSpan.spanContext().traceId, + 'Did not expect the same traceId' + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('async/await correctly parents trace spans', async () => { + // See https://github.com/googleapis/nodejs-spanner/issues/2146. + async function main() { + const instance = spanner.instance('testing'); + const database = instance.database('db-1'); + + const query = { + sql: selectSql, + }; + + const [rows] = await database.run(query); + + rows.forEach(row => { + const json = row.toJSON(); + }); + + provider.forceFlush(); + } + + await main(); + assertAsyncAwaitExpectations(); + }); + + it('callback correctly parents trace spans', done => { + function main(onComplete) { + const instance = spanner.instance('testing'); + const database = instance.database('db-1'); + + const query = { + sql: selectSql, + }; + + database.run(query, (err, rows) => { + rows.forEach(row => { + const json = row.toJSON(); + }); + + provider.forceFlush(); + onComplete(); + }); + } + main(() => { + assertAsyncAwaitExpectations(); done(); }); }); diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 7d1795e49..ad1fc0b47 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -178,7 +178,10 @@ describe('Transaction', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Begin Transaction']; + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/package.json b/package.json index ac111fc84..27ed2f743 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "@google-cloud/promisify": "^4.0.0", "@grpc/proto-loader": "^0.7.0", "@opentelemetry/api": "^1.9.0", + "@opentelemetry/context-async-hooks": "^1.26.0", "@opentelemetry/semantic-conventions": "^1.25.1", "@types/big.js": "^6.0.0", "@types/stack-trace": "0.0.33", @@ -85,7 +86,6 @@ "through2": "^4.0.0" }, "devDependencies": { - "@opentelemetry/context-async-hooks": "^1.25.1", "@opentelemetry/sdk-trace-base": "^1.26.0", "@opentelemetry/sdk-trace-node": "^1.26.0", "@types/concat-stream": "^2.0.0", diff --git a/src/database.ts b/src/database.ts index aad10f111..9f67bf01b 100644 --- a/src/database.ts +++ b/src/database.ts @@ -450,6 +450,11 @@ class Database extends common.GrpcServiceObject { typeof poolOptions === 'function' ? new (poolOptions as SessionPoolConstructor)(this, null) : new SessionPool(this, poolOptions); + const sessionPoolInstance = this.pool_ as SessionPool; + if (sessionPoolInstance) { + sessionPoolInstance._observabilityOptions = + instance._observabilityOptions; + } if (typeof poolOptions === 'object') { this.databaseRole = poolOptions.databaseRole || null; } @@ -459,6 +464,7 @@ class Database extends common.GrpcServiceObject { [CLOUD_RESOURCE_HEADER]: this.formattedName_, }; this.request = instance.request; + this._observabilityOptions = instance._observabilityOptions; // eslint-disable-next-line @typescript-eslint/no-explicit-any this.requestStream = instance.requestStream as any; this.pool_.on('error', this.emit.bind(this, 'error')); @@ -467,7 +473,6 @@ class Database extends common.GrpcServiceObject { Object.assign({}, queryOptions), Database.getEnvironmentQueryOptions() ); - this._observabilityOptions = instance._observabilityOptions; } /** * @typedef {array} SetDatabaseMetadataResponse @@ -677,30 +682,36 @@ class Database extends common.GrpcServiceObject { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'batchCreateSessions', - reqOpts, - gaxOpts: options.gaxOptions, - headers: headers, - }, - (err, resp) => { - if (err) { - callback!(err, null, resp!); - return; - } + const traceConfig = {opts: this._observabilityOptions}; + startTrace('Database.batchCreateSessions', traceConfig, span => { + this.request( + { + client: 'SpannerClient', + method: 'batchCreateSessions', + reqOpts, + gaxOpts: options.gaxOptions, + headers: headers, + }, + (err, resp) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, null, resp!); + return; + } - const sessions = (resp!.session || []).map(metadata => { - const session = this.session(metadata.name!); - session._observabilityOptions = this._observabilityOptions; - session.metadata = metadata; - return session; - }); + const sessions = (resp!.session || []).map(metadata => { + const session = this.session(metadata.name!); + session._observabilityOptions = this._observabilityOptions; + session.metadata = metadata; + return session; + }); - callback!(null, sessions, resp!); - } - ); + span.end(); + callback!(null, sessions, resp!); + } + ); + }); } /** @@ -2177,6 +2188,7 @@ class Database extends common.GrpcServiceObject { if (!err) { span.addEvent('Using Session', {'session.id': session?.id}); + transaction!._observabilityOptions = this._observabilityOptions; this._releaseOnEnd(session!, transaction!, span); } else if (isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { @@ -3206,6 +3218,8 @@ class Database extends common.GrpcServiceObject { runFn!(err as grpc.ServiceError); return; } + + transaction!._observabilityOptions = this._observabilityOptions; if (options.optimisticLock) { transaction!.useOptimisticLock(); } diff --git a/src/instance.ts b/src/instance.ts index b72f24622..4986e3ecd 100644 --- a/src/instance.ts +++ b/src/instance.ts @@ -1363,6 +1363,7 @@ class Instance extends common.GrpcServiceObject { databases = rowDatabases.map(database => { const databaseInstance = self.database(database.name!, {min: 0}); databaseInstance.metadata = database; + databaseInstance._observabilityOptions = this._observabilityOptions; return databaseInstance; }); } diff --git a/src/instrument.ts b/src/instrument.ts index 6cad7bc4a..99b260bf4 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -26,8 +26,10 @@ import { import { Span, SpanStatusCode, + context, trace, INVALID_SPAN_CONTEXT, + ROOT_CONTEXT, SpanAttributes, TimeInput, TracerProvider, @@ -93,6 +95,29 @@ interface traceConfig { const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. export {SPAN_NAMESPACE_PREFIX, traceConfig}; +const { + AsyncHooksContextManager, +} = require('@opentelemetry/context-async-hooks'); + +/* + * This function ensures that async/await works correctly by + * checking if context.active() returns an invalid/unset context + * and if so, sets a global AsyncHooksContextManager otherwise + * spans resulting from async/await invocations won't be correctly + * associated in their respective hierarchies. + */ +function ensureInitialContextManagerSet() { + if (context.active() === ROOT_CONTEXT) { + // If no active context was set previously, trace context propagation cannot + // function correctly with async/await for OpenTelemetry + // See {@link https://opentelemetry.io/docs/languages/js/context/#active-context} + context.disable(); // Disable any prior contextManager. + const contextManager = new AsyncHooksContextManager(); + contextManager.enable(); + context.setGlobalContextManager(contextManager); + } +} + /** * startTrace begins an active span in the current active context * and passes it back to the set callback function. Each span will @@ -111,6 +136,8 @@ export function startTrace( config = {} as traceConfig; } + ensureInitialContextManagerSet(); + return getTracer(config.opts?.tracerProvider).startActiveSpan( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, diff --git a/src/session-pool.ts b/src/session-pool.ts index 09300ecde..71be508da 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -24,7 +24,12 @@ import {Transaction} from './transaction'; import {NormalCallback} from './common'; import {GoogleError, grpc, ServiceError} from 'google-gax'; import trace = require('stack-trace'); -import {getActiveOrNoopSpan} from './instrument'; +import { + ObservabilityOptions, + getActiveOrNoopSpan, + setSpanErrorAndException, + startTrace, +} from './instrument'; /** * @callback SessionPoolCloseCallback @@ -353,6 +358,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { _pingHandle!: NodeJS.Timer; _requests: PQueue; _traces: Map; + _observabilityOptions?: ObservabilityOptions; /** * Formats stack trace objects into Node-like stack trace. @@ -485,6 +491,7 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { }); this._traces = new Map(); + this._observabilityOptions = database._observabilityOptions; } /** @@ -738,9 +745,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { * @emits SessionPool#createError */ async _createSessions(amount: number): Promise { - const span = getActiveOrNoopSpan(); - span.addEvent(`Requesting ${amount} sessions`); - const labels = this.options.labels!; const databaseRole = this.options.databaseRole!; @@ -752,41 +756,51 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { let nReturned = 0; const nRequested: number = amount; - // while we can request as many sessions be created as we want, the backend - // will return at most 100 at a time, hence the need for a while loop. - while (amount > 0) { - let sessions: Session[] | null = null; - - span.addEvent(`Creating ${amount} sessions`); + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('SessionPool.createSessions', traceConfig, async span => { + span.addEvent(`Requesting ${amount} sessions`); + + // while we can request as many sessions be created as we want, the backend + // will return at most 100 at a time, hence the need for a while loop. + while (amount > 0) { + let sessions: Session[] | null = null; + + span.addEvent(`Creating ${amount} sessions`); + + try { + [sessions] = await this.database.batchCreateSessions({ + count: amount, + labels: labels, + databaseRole: databaseRole, + }); + + amount -= sessions.length; + nReturned += sessions.length; + } catch (e) { + this._pending -= amount; + this.emit('createError', e); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + setSpanErrorAndException(span, e as Error); + span.end(); + throw e; + } - try { - [sessions] = await this.database.batchCreateSessions({ - count: amount, - labels: labels, - databaseRole: databaseRole, + sessions.forEach((session: Session) => { + setImmediate(() => { + this._inventory.borrowed.add(session); + this._pending -= 1; + this.release(session); + }); }); - - amount -= sessions.length; - nReturned += sessions.length; - } catch (e) { - this._pending -= amount; - this.emit('createError', e); - span.addEvent( - `Requested for ${nRequested} sessions returned ${nReturned}` - ); - throw e; } - sessions.forEach((session: Session) => { - setImmediate(() => { - this._inventory.borrowed.add(session); - this._pending -= 1; - this.release(session); - }); - }); - } - - span.addEvent(`Requested for ${nRequested} sessions returned ${nReturned}`); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}` + ); + span.end(); + }); } /** diff --git a/src/table.ts b/src/table.ts index a435b7a40..74d0e0375 100644 --- a/src/table.ts +++ b/src/table.ts @@ -1082,6 +1082,7 @@ class Table { ): void { const traceConfig: traceConfig = { opts: this._observabilityOptions, + tableName: this.name, }; startTrace('Table.' + method, traceConfig, span => { diff --git a/test/spanner.ts b/test/spanner.ts index fc4e11b91..032d18493 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -4999,7 +4999,7 @@ describe('Spanner with mock server', () => { // and tests the database/instance suffix is an iteration of // each afresh invocation of newTestDatabase, which has been // causing test flakes. - it('Check for span annotations', () => { + it('Check for span annotations', done => { const exporter = new InMemorySpanExporter(); const provider = new NodeTracerProvider({ sampler: new AlwaysOnSampler(), @@ -5013,45 +5013,68 @@ describe('Spanner with mock server', () => { }); const opts: typeof ObservabilityOptions = {tracerProvider: provider}; - startTrace('aSpan', {opts: opts}, span => { + startTrace('aSpan', {opts: opts}, async span => { + instance._observabilityOptions = opts; const database = newTestDatabase(); database._observabilityOptions = opts; - async function runIt() { - const query = { - sql: 'SELECT 1', - }; - - const [rows] = await database.run(query); - assert.strictEqual(rows.length, 1); - } + const query = { + sql: 'SELECT 1', + }; - runIt(); + const [rows] = await database.run(query); + assert.strictEqual(rows.length, 1); span.end(); + exporter.forceFlush(); const spans = exporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span'); - const span0 = spans[0]; - const events = span0.events; - // Sort the events by earliest time of occurence. - events.sort((evtA, evtB) => { - return evtA.time < evtB.time; + // Sort the spans by startTime. + spans.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; }); - const gotEventNames: string[] = []; - events.forEach(event => { - gotEventNames.push(event.name); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); }); - const wantEventNames = ['Requesting 25 sessions', 'Creating 25 sessions']; + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + 'CloudSpanner.aSpan', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; assert.deepEqual( - gotEventNames, - wantEventNames, - `Mismatched events\n\tGot: ${gotEventNames}\n\tWant: ${wantEventNames}` + actualEventNames, + expectedEventNames, + `Mismatched events\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` ); + + done(); }); }); });