diff --git a/observability-test/database.ts b/observability-test/database.ts index f2884be1d..7351f5297 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -38,7 +38,10 @@ const { InMemorySpanExporter, } = require('@opentelemetry/sdk-trace-node'); // eslint-disable-next-line n/no-extraneous-require -const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const { + ReadableSpan, + SimpleSpanProcessor, +} = require('@opentelemetry/sdk-trace-base'); import * as db from '../src/database'; import {Instance, MutationGroup, Spanner} from '../src'; import * as pfy from '@google-cloud/promisify'; @@ -1953,4 +1956,220 @@ describe('Database', () => { fakeStream2.push(null); }); }); + + describe('runPartitionedUpdate', () => { + const QUERY = { + sql: 'INSERT INTO `MyTable` (Key, Thing) VALUES(@key, @thing)', + params: { + key: 'k999', + thing: 'abc', + }, + }; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakePartitionedDml = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.PartitionedDml + ); + + let getSessionStub; + let beginStub; + let runUpdateStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakePartitionedDml = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.PartitionedDml + ); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => { + callback(null, fakeSession); + }); + + sandbox.stub(fakeSession, 'partitionedDml').returns(fakePartitionedDml); + + beginStub = ( + sandbox.stub(fakePartitionedDml, 'begin') as sinon.SinonStub + ).callsFake(callback => callback(null)); + + runUpdateStub = ( + sandbox.stub(fakePartitionedDml, 'runUpdate') as sinon.SinonStub + ).callsFake((_, callback) => callback(null)); + }); + + interface traceExportResults { + spanNames: string[]; + spans: (typeof ReadableSpan)[]; + eventNames: string[]; + } + + async function getTraceExportResults(): Promise { + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + return Promise.resolve({ + spanNames: actualSpanNames, + spans: spans, + eventNames: actualEventNames, + }); + } + + it('with pool errors', done => { + const fakeError = new Error('err'); + const fakeCallback = sandbox.spy(); + + getSessionStub.callsFake(callback => callback(fakeError)); + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(rowCount, 0); + + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.ERROR, + parentSpan.status.code, + 'Expected an ERROR span status' + ); + assert.deepStrictEqual( + fakeError.message, + parentSpan.status.message.toString(), + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with begin errors', done => { + const fakeError = new Error('err'); + + beginStub.callsFake(callback => callback(fakeError)); + + const releaseStub = ( + sandbox.stub(fakePool, 'release') as sinon.SinonStub + ).withArgs(fakeSession); + + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(rowCount, 0); + assert.strictEqual(releaseStub.callCount, 1); + + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.ERROR, + parentSpan.status.code, + 'Expected an ERROR span status' + ); + assert.deepStrictEqual( + fakeError.message, + parentSpan.status.message.toString(), + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); + }); + }); + + it('session released on transaction end', done => { + const releaseStub = ( + sandbox.stub(fakePool, 'release') as sinon.SinonStub + ).withArgs(fakeSession); + + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.UNSET, + parentSpan.status.code, + 'Unexpected span status' + ); + assert.deepStrictEqual( + undefined, + parentSpan.status.message, + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); + }); + + fakePartitionedDml.emit('end'); + assert.strictEqual(releaseStub.callCount, 1); + }); + }); }); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index d5fcd409a..ccbb0561c 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -53,8 +53,6 @@ const { } = require('@opentelemetry/context-async-hooks'); const {ObservabilityOptions} = require('../src/instrument'); -import {SessionPool} from '../src/session-pool'; - const selectSql = 'SELECT 1'; const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; @@ -267,70 +265,18 @@ describe('EndToEnd', async () => { }); it('run', async () => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); const [rows] = await database.run('SELECT 1'); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - // Sort the spans by duration. - spans.sort((spanA, spanB) => { - spanA.duration < spanB.duration; - }); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - // Ensure that RunStream is a child span of createQueryPartitions. - const spanRunStream = spans[0]; - const spanRun = spans[1]; - assert.ok( - spanRun.spanContext().traceId, - 'Expected that createQueryPartitions has a defined traceId' - ); - assert.ok( - spanRunStream.spanContext().traceId, - 'Expected that RunStream has a defined traceId' - ); - assert.ok( - spanRun.spanContext().spanId, - 'Expected that createQueryPartitions has a defined spanId' - ); - assert.ok( - spanRunStream.spanContext().spanId, - 'Expected that RunStream has a defined spanId' - ); - const expectedEventNames = [ 'Starting stream', ...cacheSessionEvents, 'Using Session', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); + await verifySpansAndEvents(traceExporter, expectedSpanNames, expectedEventNames); }); it('runTransaction', done => { @@ -422,82 +368,53 @@ describe('EndToEnd', async () => { done(); }); }); - }); -}); -describe('SessionPool', async () => { - const traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); - - const spanner = setupResult.spanner; - const server = setupResult.server; - const spannerMock = setupResult.spannerMock; - const instance = spanner.instance('instance'); - - after(async () => { - traceExporter.reset(); - await provider.shutdown(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); - - it('_createSessions', async () => { - // The first invocation of new SessionPool shall implicitly happen in here. - const database = instance.database('database'); - await database.run('SELECT 1'); - - await provider.forceFlush(); - traceExporter.reset(); - - // Explicitly invoking new SessionPool. - const sessionPool = new SessionPool(database); - - const OPTIONS = 3; - await sessionPool._createSessions(OPTIONS); + it('runPartitionedUpdate', async () => { + const [rowCount] = await database.runPartitionedUpdate({ + sql: updateSql, + }); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); + await tracerProvider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); }); - }); - const expectedSpanNames = [ - 'CloudSpanner.Database.batchCreateSessions', - 'CloudSpanner.SessionPool.createSessions', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.PartitionedDml.runUpdate', + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); - const expectedEventNames = [ - 'Requesting 3 sessions', - 'Creating 3 sessions', - 'Requested for 3 sessions returned 3', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); }); }); diff --git a/src/database.ts b/src/database.ts index 3d8321355..12db176c6 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2858,13 +2858,27 @@ class Database extends common.GrpcServiceObject { query: string | RunPartitionedUpdateOptions, callback?: RunUpdateCallback ): void | Promise<[number]> { - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError, 0); - return; - } + const traceConfig = { + sql: query, + ...this._traceConfig, + }; + return startTrace('Database.runPartitionedUpdate', traceConfig, span => { + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err as ServiceError, 0); + return; + } - this._runPartitionedUpdate(session!, query, callback); + this._runPartitionedUpdate(session!, query, (err, count) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, count); + }); + }); }); }