From c62584c861b36516410f41dd8ba426b17f27497a Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 11 Jan 2024 15:17:16 +0530 Subject: [PATCH 1/4] chore: integration test fix --- system-test/spanner.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/system-test/spanner.ts b/system-test/spanner.ts index d2bde030f..5575d5c5f 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -8731,10 +8731,10 @@ describe('Spanner', () => { }, err => { assert.strictEqual(err?.details, expectedErrorMessage); + transaction!.end(); + done(); } ); - transaction!.end(); - done(); }); }); }); From 4e85d4518d77b4c71e9b5b7d254d6d2f6f6d3209 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Mon, 28 Oct 2024 19:34:52 +0530 Subject: [PATCH 2/4] traces tests refactoring --- observability-test/batch-transaction.ts | 174 +++++----- observability-test/database.ts | 5 +- observability-test/helper.ts | 46 +++ observability-test/observability.ts | 2 - observability-test/session-pool.ts | 10 +- observability-test/spanner.ts | 406 +++++------------------- src/partial-result-stream.ts | 3 + src/transaction-runner.ts | 3 + test/partial-result-stream.ts | 2 +- test/spanner.ts | 2 +- 10 files changed, 220 insertions(+), 433 deletions(-) diff --git a/observability-test/batch-transaction.ts b/observability-test/batch-transaction.ts index 763fb7e36..619f705d7 100644 --- a/observability-test/batch-transaction.ts +++ b/observability-test/batch-transaction.ts @@ -153,68 +153,63 @@ describe('BatchTransaction', () => { }; it('createQueryPartitions', done => { - const REQUEST = sandbox.stub(); - - const res = batchTransaction.createQueryPartitions( - QUERY, - (err, part, resp) => { - assert.ifError(err); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); - - // Sort the spans by duration. - spans.sort((spanA, spanB) => { - spanA.duration < spanB.duration; - }); - - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - }); - - const expectedSpanNames = [ - 'CloudSpanner.BatchTransaction.createPartitions_', - 'CloudSpanner.BatchTransaction.createQueryPartitions', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - // Ensure that createPartitions_ is a child span of createQueryPartitions. - const spanCreatePartitions_ = spans[0]; - const spanCreateQueryPartitions = spans[1]; - assert.ok( - spanCreateQueryPartitions.spanContext().traceId, - 'Expected that createQueryPartitions has a defined traceId' - ); - assert.ok( - spanCreatePartitions_.spanContext().traceId, - 'Expected that createPartitions_ has a defined traceId' - ); - assert.deepStrictEqual( - spanCreatePartitions_.spanContext().traceId, - spanCreateQueryPartitions.spanContext().traceId, - 'Expected that both spans share a traceId' - ); - assert.ok( - spanCreateQueryPartitions.spanContext().spanId, - 'Expected that createQueryPartitions has a defined spanId' - ); - assert.ok( - spanCreatePartitions_.spanContext().spanId, - 'Expected that createPartitions_ has a defined spanId' - ); - assert.deepStrictEqual( - spanCreatePartitions_.parentSpanId, - spanCreateQueryPartitions.spanContext().spanId, - 'Expected that createQueryPartitions is the parent to createPartitions_' - ); - done(); - } - ); + const res = batchTransaction.createQueryPartitions(QUERY, err => { + assert.ifError(err); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.BatchTransaction.createPartitions_', + 'CloudSpanner.BatchTransaction.createQueryPartitions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that createPartitions_ is a child span of createQueryPartitions. + const spanCreatePartitions_ = spans[0]; + const spanCreateQueryPartitions = spans[1]; + assert.ok( + spanCreateQueryPartitions.spanContext().traceId, + 'Expected that createQueryPartitions has a defined traceId' + ); + assert.ok( + spanCreatePartitions_.spanContext().traceId, + 'Expected that createPartitions_ has a defined traceId' + ); + assert.deepStrictEqual( + spanCreatePartitions_.spanContext().traceId, + spanCreateQueryPartitions.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + spanCreateQueryPartitions.spanContext().spanId, + 'Expected that createQueryPartitions has a defined spanId' + ); + assert.ok( + spanCreatePartitions_.spanContext().spanId, + 'Expected that createPartitions_ has a defined spanId' + ); + assert.deepStrictEqual( + spanCreatePartitions_.parentSpanId, + spanCreateQueryPartitions.spanContext().spanId, + 'Expected that createQueryPartitions is the parent to createPartitions_' + ); + done(); + }); }); it('createReadPartitions', done => { @@ -222,34 +217,31 @@ describe('BatchTransaction', () => { const response = {}; REQUEST.callsFake((_, callback) => callback(null, response)); - const res = batchTransaction.createReadPartitions( - QUERY, - (err, part, resp) => { - assert.ifError(err); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); - - // Sort the spans by duration. - spans.sort((spanA, spanB) => { - spanA.duration < spanB.duration; - }); - - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - }); - const expectedSpanNames = [ - 'CloudSpanner.BatchTransaction.createPartitions_', - 'CloudSpanner.BatchTransaction.createReadPartitions', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - done(); - } - ); + const res = batchTransaction.createReadPartitions(QUERY, err => { + assert.ifError(err); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); + + // Sort the spans by duration. + spans.sort((spanA, spanB) => { + spanA.duration < spanB.duration; + }); + + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + const expectedSpanNames = [ + 'CloudSpanner.BatchTransaction.createPartitions_', + 'CloudSpanner.BatchTransaction.createReadPartitions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + done(); + }); }); }); diff --git a/observability-test/database.ts b/observability-test/database.ts index 3b6811e8c..7351f5297 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -1175,7 +1175,7 @@ describe('Database', () => { it('with error on null mutation should catch thrown error', done => { try { - database.writeAtLeastOnce(null, (err, res) => {}); + database.writeAtLeastOnce(null, () => {}); } catch (err) { // Performing a substring search on the error because // depending on the version of Node.js, the error might be either of: @@ -1320,7 +1320,6 @@ describe('Database', () => { 'Expected an ERROR span status' ); - const errorMessage = firstSpan.status.message; assert.deepStrictEqual( firstSpan.status.message, sessionNotFoundError.message @@ -1658,7 +1657,7 @@ describe('Database', () => { .throws(ourException); assert.rejects(async () => { - const value = await database.runTransactionAsync(async txn => { + await database.runTransactionAsync(async txn => { const result = await txn.run('SELECT 1'); await txn.commit(); return result; diff --git a/observability-test/helper.ts b/observability-test/helper.ts index b6d429d32..591171666 100644 --- a/observability-test/helper.ts +++ b/observability-test/helper.ts @@ -19,6 +19,25 @@ import * as assert from 'assert'; const {ReadableSpan} = require('@opentelemetry/sdk-trace-base'); import {SEMATTRS_DB_NAME} from '@opentelemetry/semantic-conventions'; +export const batchCreateSessionsEvents = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', +]; + +export const waitingSessionsEvents = [ + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', +]; + +export const cacheSessionEvents = [ + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', +]; + /** * This utility exists as a test helper because mocha has builtin "context" * and referring to context causes type/value collision errors. @@ -47,3 +66,30 @@ export function generateWithAllSpansHaveDBName(dbName: String): Function { }); }; } + +export async function verifySpansAndEvents( + traceExporter, + expectedSpans, + expectedEvents +) { + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + assert.deepStrictEqual( + actualSpanNames, + expectedSpans, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpans}` + ); + assert.deepStrictEqual( + actualEventNames, + expectedEvents, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEvents}` + ); +} diff --git a/observability-test/observability.ts b/observability-test/observability.ts index 576005687..224001951 100644 --- a/observability-test/observability.ts +++ b/observability-test/observability.ts @@ -382,7 +382,6 @@ describe('setError', () => { it('a non-empty string should set the message', () => { startTrace('aSpan', {opts: {tracerProvider: provider}}, span => { - const status1 = span.status; const res = setSpanError(span, 'this one'); assert.strictEqual(res, true, 'value was set'); span.end(); @@ -438,7 +437,6 @@ describe('setErrorAndException', () => { it('a non-empty string should set the message', () => { startTrace('aSpan', {opts: {tracerProvider: provider}}, span => { - const status1 = span.status; const res = setSpanErrorAndException(span, 'this one'); assert.strictEqual(res, true, 'value was set'); span.end(); diff --git a/observability-test/session-pool.ts b/observability-test/session-pool.ts index e92b42b0a..77a47a5af 100644 --- a/observability-test/session-pool.ts +++ b/observability-test/session-pool.ts @@ -65,7 +65,7 @@ describe('SessionPool', () => { } as unknown as Database; const sandbox = sinon.createSandbox(); - const shouldNotBeCalled = sandbox.stub().throws('Should not be called.'); + sandbox.stub().throws('Should not be called.'); const createSession = (name = 'id', props?): Session => { props = props || {}; @@ -112,9 +112,7 @@ describe('SessionPool', () => { const OPTIONS = 3; it('on exception from Database.batchCreateSessions', async () => { const ourException = new Error('this fails intentionally'); - const stub = sandbox - .stub(DATABASE, 'batchCreateSessions') - .throws(ourException); + sandbox.stub(DATABASE, 'batchCreateSessions').throws(ourException); const releaseStub = sandbox.stub(sessionPool, 'release'); assert.rejects(async () => { @@ -168,9 +166,7 @@ describe('SessionPool', () => { it('without error', async () => { const RESPONSE = [[{}, {}, {}]]; - const stub = sandbox - .stub(DATABASE, 'batchCreateSessions') - .resolves(RESPONSE); + sandbox.stub(DATABASE, 'batchCreateSessions').resolves(RESPONSE); const releaseStub = sandbox.stub(sessionPool, 'release'); await sessionPool._createSessions(OPTIONS); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c9ce60df2..e94e5bc23 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -33,6 +33,9 @@ const { NodeTracerProvider, InMemorySpanExporter, } = require('@opentelemetry/sdk-trace-node'); +const { + TraceExporter, +} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); const {SpanStatusCode} = require('@opentelemetry/api'); @@ -40,6 +43,10 @@ const { disableContextAndManager, generateWithAllSpansHaveDBName, setGlobalContextManager, + verifySpansAndEvents, + batchCreateSessionsEvents, + waitingSessionsEvents, + cacheSessionEvents, } = require('./helper'); const { AsyncHooksContextManager, @@ -125,12 +132,14 @@ async function setup( describe('EndToEnd', async () => { const contextManager = new AsyncHooksContextManager(); + const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); setGlobalContextManager(contextManager); afterEach(() => { disableContextAndManager(contextManager); }); const traceExporter = new InMemorySpanExporter(); + //const traceExporter = new TraceExporter(); const sampler = new AlwaysOnSampler(); const tracerProvider = new NodeTracerProvider({ sampler: sampler, @@ -156,6 +165,7 @@ describe('EndToEnd', async () => { afterEach(async () => { await tracerProvider.forceFlush(); await traceExporter.reset(); + // await sleep(80000); spannerMock.resetRequests(); }); @@ -174,186 +184,82 @@ describe('EndToEnd', async () => { describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); - - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); - withAllSpansHaveDBName(spans); - - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = ['CloudSpanner.Database.getSessions']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); }); it('getSnapshot', done => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); - database.getSnapshot((err, transaction) => { assert.ifError(err); transaction!.run('SELECT 1', async (err, rows) => { assert.ifError(err); transaction!.end(); - - await tracerProvider.forceFlush(); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Database.getSnapshot', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Starting stream', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); - done(); }); }); }); it('getTransaction', done => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); database.getTransaction(async (err, transaction) => { assert.ifError(err); assert.ok(transaction); transaction!.end(); transaction!.commit(); - await tracerProvider.forceFlush(); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = ['CloudSpanner.Database.getTransaction']; - assert.deepStrictEqual( - actualSpanNames, + const expectedEventNames = [...cacheSessionEvents, 'Using Session']; + await verifySpansAndEvents( + traceExporter, expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = [ - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', - 'Using Session', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + expectedEventNames ); - done(); }); }); it('runStream', done => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); database .runStream('SELECT 1') .on('data', row => {}) .once('error', assert.ifError) - .on('end', () => { - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - + .on('end', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = [ 'Starting stream', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); done(); @@ -417,9 +323,7 @@ describe('EndToEnd', async () => { const expectedEventNames = [ 'Starting stream', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', ]; assert.deepStrictEqual( @@ -430,186 +334,91 @@ describe('EndToEnd', async () => { }); it('runTransaction', done => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); - database.runTransaction(async (err, transaction) => { assert.ifError(err); await transaction!.run('SELECT 1'); await transaction!.commit(); await transaction!.end(); - await traceExporter.forceFlush(); - - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Database.runTransaction', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = [ 'Starting stream', 'Transaction Creation Done', 'Starting Commit', 'Commit Done', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); done(); }); }); it('runTransactionAsync', async () => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); await database.runTransactionAsync(async transaction => { await transaction!.run('SELECT 1'); }); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', 'CloudSpanner.Database.runTransactionAsync', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = [ 'Starting stream', 'Transaction Creation Done', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); }); it('writeAtLeastOnce', done => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); const blankMutations = new MutationSet(); - database.writeAtLeastOnce(blankMutations, (err, response) => { + database.writeAtLeastOnce(blankMutations, async (err, response) => { assert.ifError(err); assert.ok(response); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Transaction.commit', 'CloudSpanner.Database.writeAtLeastOnce', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = [ 'Starting Commit', 'Commit Done', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); - done(); }); }); it('batchCreateSessions', done => { - database.batchCreateSessions(5, (err, sessions) => { + database.batchCreateSessions(5, async (err, sessions) => { assert.ifError(err); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - const expectedEventNames = []; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); - done(); }); }); @@ -883,9 +692,7 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', 'Starting stream', 'Transaction Creation Done', @@ -994,9 +801,7 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', + ...cacheSessionEvents, 'Using Session', 'Starting stream', ]; @@ -1162,14 +967,9 @@ describe('ObservabilityOptions injection and propagation', async () => { ); const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...waitingSessionsEvents, ]; assert.deepStrictEqual( actualEventNames, @@ -1310,14 +1110,9 @@ describe('E2E traces with async/await', async () => { // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...waitingSessionsEvents, ]; assert.deepStrictEqual( actualEventNames, @@ -1548,14 +1343,10 @@ SELECT 1p // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + 'exception', + ...waitingSessionsEvents, ]; assert.deepStrictEqual( actualEventNames, @@ -1704,10 +1495,9 @@ SELECT 1p // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', + 'exception', 'Stream broken. Safe to retry', 'Begin Transaction', 'Transaction Creation Done', @@ -1715,10 +1505,7 @@ SELECT 1p 'Transaction Creation Done', 'Starting Commit', 'Commit Done', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...waitingSessionsEvents, 'exception', ]; assert.deepStrictEqual( @@ -2101,14 +1888,10 @@ describe('Traces for ExecuteStream broken stream retries', () => { // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + 'exception', + ...waitingSessionsEvents, 'Transaction Creation Done', ]; assert.deepStrictEqual( @@ -2172,19 +1955,14 @@ describe('Traces for ExecuteStream broken stream retries', () => { // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', 'Re-attempting start stream', 'Resuming stream', 'Resuming stream', 'Resuming stream', 'Resuming stream', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...waitingSessionsEvents, ]; assert.deepStrictEqual( actualEventNames, @@ -2243,19 +2021,14 @@ describe('Traces for ExecuteStream broken stream retries', () => { // Finally check for the collective expected event names. const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', + ...batchCreateSessionsEvents, 'Starting stream', 'Re-attempting start stream', 'Begin Transaction', 'Transaction Creation Done', 'Starting Commit', 'Commit Done', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...waitingSessionsEvents, ]; assert.deepStrictEqual( actualEventNames, @@ -2298,43 +2071,20 @@ describe('Traces for ExecuteStream broken stream retries', () => { 1, 'runTransactionAsync.attempt must be 1' ); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - const expectedSpanNames = [ 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', 'CloudSpanner.Database.runTransactionAsync', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); const expectedEventNames = [ - 'Requesting 25 sessions', - 'Creating 25 sessions', - 'Requested for 25 sessions returned 25', - 'Acquiring session', - 'Waiting for a session to become available', - 'Acquired session', - 'Using Session', + ...batchCreateSessionsEvents, + ...waitingSessionsEvents, ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); }); }); diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 69439f534..8bf414c7a 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -28,6 +28,7 @@ import {DeadlineError, isRetryableInternalError} from './transaction-runner'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; import * as stream from 'stream'; +import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument'; export type ResumeToken = string | Uint8Array; @@ -494,6 +495,7 @@ export function partialResultStream( let lastRequestStream: Readable; const startTime = Date.now(); const timeout = options?.gaxOptions?.timeout ?? Infinity; + const span = getActiveOrNoopSpan(); // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -568,6 +570,7 @@ export function partialResultStream( // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); + setSpanErrorAndException(span, err as Error); return; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 61d979e8c..a99e6bd12 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -25,6 +25,7 @@ import {NormalCallback} from './common'; import {isSessionNotFoundError} from './session-pool'; import {Database} from './database'; import {google} from '../protos/protos'; +import {getActiveOrNoopSpan} from './instrument'; import IRequestOptions = google.spanner.v1.IRequestOptions; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -226,6 +227,7 @@ export abstract class Runner { const timeout = this.options.timeout!; let lastError: grpc.ServiceError; + const span = getActiveOrNoopSpan(); // The transaction runner should always execute at least one attempt before // timing out. @@ -250,6 +252,7 @@ export abstract class Runner { } this.attempts += 1; + span.addEvent('Retrying transaction'); const delay = this.getNextDelay(lastError); await new Promise(resolve => setTimeout(resolve, delay)); diff --git a/test/partial-result-stream.ts b/test/partial-result-stream.ts index 799d29b00..0154f048c 100644 --- a/test/partial-result-stream.ts +++ b/test/partial-result-stream.ts @@ -335,7 +335,7 @@ describe('PartialResultStream', () => { }); partialResultStream(requestFnStub, {gaxOptions: {timeout: 0}}) - .on('data', row => {}) + .on('data', () => {}) .on('error', err => { assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED); assert.strictEqual(requestFnStub.callCount, 1); diff --git a/test/spanner.ts b/test/spanner.ts index d324ed911..0212f6b92 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -3556,7 +3556,7 @@ describe('Spanner with mock server', () => { requestOptions: {transactionTag: 'transaction-tag'}, }); const transaction = promise[0]; - await transaction.run('SELECT 1').then(results => { + await transaction.run('SELECT 1').then(() => { const request = spannerMock.getRequests().find(val => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; From a588465ff9612406dc4b1084eaad73cc57f46bf1 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 28 Oct 2024 06:48:24 -0700 Subject: [PATCH 3/4] feat: (observability): trace Database.runPartitionedUpdate (#2176) This change traces Database.runPartitionedUpdate along with the appropriate tests for it with and without errors. Updates #2079 --- observability-test/batch-transaction.ts | 4 +- observability-test/database.ts | 26 +-- observability-test/observability.ts | 4 +- observability-test/session-pool.ts | 4 +- observability-test/spanner.ts | 290 ++++++++---------------- observability-test/transaction.ts | 17 +- src/instrument.ts | 1 - src/table.ts | 1 - test/database.ts | 4 +- 9 files changed, 104 insertions(+), 247 deletions(-) diff --git a/observability-test/batch-transaction.ts b/observability-test/batch-transaction.ts index 619f705d7..89cd49b17 100644 --- a/observability-test/batch-transaction.ts +++ b/observability-test/batch-transaction.ts @@ -153,7 +153,7 @@ describe('BatchTransaction', () => { }; it('createQueryPartitions', done => { - const res = batchTransaction.createQueryPartitions(QUERY, err => { + batchTransaction.createQueryPartitions(QUERY, err => { assert.ifError(err); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -217,7 +217,7 @@ describe('BatchTransaction', () => { const response = {}; REQUEST.callsFake((_, callback) => callback(null, response)); - const res = batchTransaction.createReadPartitions(QUERY, err => { + batchTransaction.createReadPartitions(QUERY, err => { assert.ifError(err); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); diff --git a/observability-test/database.ts b/observability-test/database.ts index 7351f5297..39ebe9afc 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -507,7 +507,6 @@ describe('Database', () => { let beginSnapshotStub: sinon.SinonStub; let getSessionStub: sinon.SinonStub; - let snapshotStub: sinon.SinonStub; beforeEach(() => { fakePool = database.pool_; @@ -524,9 +523,7 @@ describe('Database', () => { sandbox.stub(fakePool, 'getSession') as sinon.SinonStub ).callsFake(callback => callback(null, fakeSession)); - snapshotStub = sandbox - .stub(fakeSession, 'snapshot') - .returns(fakeSnapshot); + sandbox.stub(fakeSession, 'snapshot').returns(fakeSnapshot); }); it('with error', done => { @@ -1250,7 +1247,6 @@ describe('Database', () => { let fakeSession: FakeSession; let fakeDataStream: Transform; let getSessionStub: sinon.SinonStub; - let requestStreamStub: sinon.SinonStub; const options = { requestOptions: { @@ -1269,9 +1265,7 @@ describe('Database', () => { sandbox.stub(fakePool, 'getSession') as sinon.SinonStub ).callsFake(callback => callback(null, fakeSession)); - requestStreamStub = sandbox - .stub(database, 'requestStream') - .returns(fakeDataStream); + sandbox.stub(database, 'requestStream').returns(fakeDataStream); }); it('on retry with "Session not found" error', done => { @@ -1723,8 +1717,6 @@ describe('Database', () => { let fakeStream2: Transform; let getSessionStub: sinon.SinonStub; - let snapshotStub: sinon.SinonStub; - let runStreamStub: sinon.SinonStub; beforeEach(() => { fakePool = database.pool_; @@ -1745,15 +1737,11 @@ describe('Database', () => { .onSecondCall() .callsFake(callback => callback(null, fakeSession2)); - snapshotStub = sandbox - .stub(fakeSession, 'snapshot') - .returns(fakeSnapshot); + sandbox.stub(fakeSession, 'snapshot').returns(fakeSnapshot); sandbox.stub(fakeSession2, 'snapshot').returns(fakeSnapshot2); - runStreamStub = sandbox - .stub(fakeSnapshot, 'runStream') - .returns(fakeStream); + sandbox.stub(fakeSnapshot, 'runStream').returns(fakeStream); sandbox.stub(fakeSnapshot2, 'runStream').returns(fakeStream2); }); @@ -1974,7 +1962,6 @@ describe('Database', () => { let getSessionStub; let beginStub; - let runUpdateStub; beforeEach(() => { fakePool = database.pool_; @@ -1995,7 +1982,7 @@ describe('Database', () => { sandbox.stub(fakePartitionedDml, 'begin') as sinon.SinonStub ).callsFake(callback => callback(null)); - runUpdateStub = ( + ( sandbox.stub(fakePartitionedDml, 'runUpdate') as sinon.SinonStub ).callsFake((_, callback) => callback(null)); }); @@ -2030,7 +2017,6 @@ describe('Database', () => { it('with pool errors', done => { const fakeError = new Error('err'); - const fakeCallback = sandbox.spy(); getSessionStub.callsFake(callback => callback(fakeError)); database.runPartitionedUpdate(QUERY, async (err, rowCount) => { @@ -2131,7 +2117,7 @@ describe('Database', () => { sandbox.stub(fakePool, 'release') as sinon.SinonStub ).withArgs(fakeSession); - database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + database.runPartitionedUpdate(QUERY, async () => { const exportResults = await getTraceExportResults(); const actualSpanNames = exportResults.spanNames; const spans = exportResults.spans; diff --git a/observability-test/observability.ts b/observability-test/observability.ts index 224001951..8270cd7e9 100644 --- a/observability-test/observability.ts +++ b/observability-test/observability.ts @@ -99,7 +99,7 @@ describe('startTrace', () => { 'aSpan', {opts: {tracerProvider: overridingProvider}}, async span => { - await new Promise((resolve, reject) => setTimeout(resolve, 400)); + await new Promise(resolve => setTimeout(resolve, 400)); span.end(); const gotSpansFromGlobal = globalExporter.getFinishedSpans(); @@ -250,7 +250,7 @@ describe('startTrace', () => { 'aSpan', {opts: {tracerProvider: overridingProvider}}, async span => { - await new Promise((resolve, reject) => setTimeout(resolve, 400)); + await new Promise(resolve => setTimeout(resolve, 400)); span.end(); const gotSpansFromGlobal = globalExporter.getFinishedSpans(); diff --git a/observability-test/session-pool.ts b/observability-test/session-pool.ts index 77a47a5af..f60553dc0 100644 --- a/observability-test/session-pool.ts +++ b/observability-test/session-pool.ts @@ -113,7 +113,7 @@ describe('SessionPool', () => { it('on exception from Database.batchCreateSessions', async () => { const ourException = new Error('this fails intentionally'); sandbox.stub(DATABASE, 'batchCreateSessions').throws(ourException); - const releaseStub = sandbox.stub(sessionPool, 'release'); + sandbox.stub(sessionPool, 'release'); assert.rejects(async () => { await sessionPool._createSessions(OPTIONS); @@ -167,7 +167,7 @@ describe('SessionPool', () => { const RESPONSE = [[{}, {}, {}]]; sandbox.stub(DATABASE, 'batchCreateSessions').resolves(RESPONSE); - const releaseStub = sandbox.stub(sessionPool, 'release'); + sandbox.stub(sessionPool, 'release'); await sessionPool._createSessions(OPTIONS); assert.strictEqual(sessionPool.size, 3); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index e94e5bc23..c250683ab 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -21,21 +21,16 @@ import {Database, Instance, Spanner} from '../src'; import {MutationSet} from '../src/transaction'; import protobuf = google.spanner.v1; import v1 = google.spanner.v1; -import PartialResultSet = google.spanner.v1.PartialResultSet; import * as mock from '../test/mockserver/mockspanner'; import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; import * as sinon from 'sinon'; import {Row} from '../src/partial-result-stream'; -import {Json} from '../src/codec'; const { AlwaysOnSampler, NodeTracerProvider, InMemorySpanExporter, } = require('@opentelemetry/sdk-trace-node'); -const { - TraceExporter, -} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); const {SpanStatusCode} = require('@opentelemetry/api'); @@ -53,8 +48,6 @@ const { } = require('@opentelemetry/context-async-hooks'); const {ObservabilityOptions} = require('../src/instrument'); -import {SessionPool} from '../src/session-pool'; - const selectSql = 'SELECT 1'; const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; @@ -132,14 +125,12 @@ async function setup( describe('EndToEnd', async () => { const contextManager = new AsyncHooksContextManager(); - const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); setGlobalContextManager(contextManager); afterEach(() => { disableContextAndManager(contextManager); }); const traceExporter = new InMemorySpanExporter(); - //const traceExporter = new TraceExporter(); const sampler = new AlwaysOnSampler(); const tracerProvider = new NodeTracerProvider({ sampler: sampler, @@ -165,7 +156,6 @@ describe('EndToEnd', async () => { afterEach(async () => { await tracerProvider.forceFlush(); await traceExporter.reset(); - // await sleep(80000); spannerMock.resetRequests(); }); @@ -175,7 +165,7 @@ describe('EndToEnd', async () => { // To deflake expectations of session creation, let's // issue out a warm-up request request that'll ensure // that the SessionPool is created deterministically. - const [rows] = await database.run('SELECT 1'); + await database.run('SELECT 1'); // Clear out any present traces to make a clean slate for testing. traceExporter.forceFlush(); traceExporter.reset(); @@ -183,7 +173,7 @@ describe('EndToEnd', async () => { describe('Database', () => { it('getSessions', async () => { - const [rows] = await database.getSessions(); + await database.getSessions(); const expectedSpanNames = ['CloudSpanner.Database.getSessions']; const expectedEventNames = []; @@ -198,7 +188,7 @@ describe('EndToEnd', async () => { database.getSnapshot((err, transaction) => { assert.ifError(err); - transaction!.run('SELECT 1', async (err, rows) => { + transaction!.run('SELECT 1', async err => { assert.ifError(err); transaction!.end(); const expectedSpanNames = [ @@ -244,7 +234,7 @@ describe('EndToEnd', async () => { it('runStream', done => { database .runStream('SELECT 1') - .on('data', row => {}) + .on('data', () => {}) .once('error', assert.ifError) .on('end', async () => { const expectedSpanNames = [ @@ -267,69 +257,21 @@ describe('EndToEnd', async () => { }); it('run', async () => { - const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( - database.formattedName_ - ); - const [rows] = await database.run('SELECT 1'); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); - - // Sort the spans by duration. - spans.sort((spanA, spanB) => { - spanA.duration < spanB.duration; - }); - - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - + await database.run('SELECT 1'); const expectedSpanNames = [ 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - // Ensure that RunStream is a child span of createQueryPartitions. - const spanRunStream = spans[0]; - const spanRun = spans[1]; - assert.ok( - spanRun.spanContext().traceId, - 'Expected that createQueryPartitions has a defined traceId' - ); - assert.ok( - spanRunStream.spanContext().traceId, - 'Expected that RunStream has a defined traceId' - ); - assert.ok( - spanRun.spanContext().spanId, - 'Expected that createQueryPartitions has a defined spanId' - ); - assert.ok( - spanRunStream.spanContext().spanId, - 'Expected that RunStream has a defined spanId' - ); - const expectedEventNames = [ 'Starting stream', ...cacheSessionEvents, 'Using Session', ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames ); }); @@ -385,6 +327,49 @@ describe('EndToEnd', async () => { ); }); + it('runTransactionAsync with abort', async () => { + let attempts = 0; + await database.runTransactionAsync((transaction): Promise => { + if (!attempts) { + spannerMock.abortTransaction(transaction); + } + attempts++; + return transaction.run(selectSql).then(([rows]) => { + let count = 0; + rows.forEach(() => count++); + return transaction.commit().then(() => count); + }); + }); + assert.strictEqual(attempts, 2); + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', + ]; + const expectedEventNames = [ + 'Starting stream', + 'exception', + 'Stream broken. Not safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Starting Commit', + 'Commit Done', + ...cacheSessionEvents, + 'Using Session', + 'Retrying transaction', + ]; + await verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); + }); + it('writeAtLeastOnce', done => { const blankMutations = new MutationSet(); database.writeAtLeastOnce(blankMutations, async (err, response) => { @@ -410,7 +395,7 @@ describe('EndToEnd', async () => { }); it('batchCreateSessions', done => { - database.batchCreateSessions(5, async (err, sessions) => { + database.batchCreateSessions(5, async err => { assert.ifError(err); const expectedSpanNames = ['CloudSpanner.Database.batchCreateSessions']; const expectedEventNames = []; @@ -420,134 +405,39 @@ describe('EndToEnd', async () => { expectedEventNames ); done(); - }); - }); - it('runPartitionedUpdate', async () => { - const [rowCount] = await database.runPartitionedUpdate({ - sql: updateSql, - }); - - await tracerProvider.forceFlush(); - await traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); + it('runPartitionedUpdate', async () => { + await database.runPartitionedUpdate({ + sql: updateSql, + }); - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.PartitionedDml.runUpdate', + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; + verifySpansAndEvents( + traceExporter, + expectedSpanNames, + expectedEventNames + ); }); }); - - const expectedSpanNames = [ - 'CloudSpanner.Snapshot.begin', - 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', - 'CloudSpanner.Dml.runUpdate', - 'CloudSpanner.PartitionedDml.runUpdate', - 'CloudSpanner.Database.runPartitionedUpdate', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = [ - 'Begin Transaction', - 'Transaction Creation Done', - 'Starting stream', - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); }); }); }); -describe('SessionPool', async () => { - const traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); - - const spanner = setupResult.spanner; - const server = setupResult.server; - const spannerMock = setupResult.spannerMock; - const instance = spanner.instance('instance'); - - after(async () => { - traceExporter.reset(); - await provider.shutdown(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); - - it('_createSessions', async () => { - // The first invocation of new SessionPool shall implicitly happen in here. - const database = instance.database('database'); - await database.run('SELECT 1'); - - await provider.forceFlush(); - traceExporter.reset(); - - // Explicitly invoking new SessionPool. - const sessionPool = new SessionPool(database); - - const OPTIONS = 3; - await sessionPool._createSessions(OPTIONS); - - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - - const actualSpanNames: string[] = []; - const actualEventNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); - }); - - const expectedSpanNames = [ - 'CloudSpanner.Database.batchCreateSessions', - 'CloudSpanner.SessionPool.createSessions', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = [ - 'Requesting 3 sessions', - 'Creating 3 sessions', - 'Requested for 3 sessions returned 3', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); - }); -}); - describe('ObservabilityOptions injection and propagation', async () => { it('Passed into Spanner, Instance and Database', async () => { const traceExporter = new InMemorySpanExporter(); @@ -643,7 +533,7 @@ describe('ObservabilityOptions injection and propagation', async () => { // To deflake expectations of session creation, let's // issue out a warm-up request request that'll ensure // that the SessionPool is created deterministically. - const [rows] = await database.run('SELECT 1'); + await database.run('SELECT 1'); // Clear out any present traces to make a clean slate for testing. traceExporter.forceFlush(); traceExporter.reset(); @@ -662,7 +552,7 @@ describe('ObservabilityOptions injection and propagation', async () => { database.getTransaction((err, tx) => { assert.ifError(err); - tx!.run('SELECT 1', async (err, rows) => { + tx!.run('SELECT 1', async () => { tx!.end(); await tracerProvider.forceFlush(); @@ -716,7 +606,7 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.reset(); tx!.begin(); - tx!.runUpdate(updateSql, async (err, rowCount) => { + tx!.runUpdate(updateSql, async err => { assert.ifError(err); tx!.end(); @@ -771,7 +661,7 @@ describe('ObservabilityOptions injection and propagation', async () => { .runStream(selectSql) .on('data', () => rowCount++) .on('error', assert.ifError) - .on('stats', _stats => {}) + .on('stats', () => {}) .on('end', async () => { tx!.end(); @@ -825,9 +715,9 @@ describe('ObservabilityOptions injection and propagation', async () => { tx!.begin(); - tx!.runUpdate(updateSql, async (err, rowCount) => { + tx!.runUpdate(updateSql, async err => { assert.ifError(err); - tx!.rollback(async err => { + tx!.rollback(async () => { tx!.end(); await tracerProvider.forceFlush(); traceExporter.forceFlush(); @@ -921,7 +811,7 @@ describe('ObservabilityOptions injection and propagation', async () => { database.formattedName_ ); - database.run('SELECT 1', (err, rows) => { + database.run('SELECT 1', err => { assert.ifError(err); injectedTraceExporter.forceFlush(); @@ -983,7 +873,6 @@ describe('ObservabilityOptions injection and propagation', async () => { describe('E2E traces with async/await', async () => { let server: grpc.Server; let spanner: Spanner; - let database: Database; let spannerMock: mock.MockSpanner; let traceExporter: typeof InMemorySpanExporter; let provider: typeof TracerProvider; @@ -1134,7 +1023,7 @@ describe('E2E traces with async/await', async () => { const [rows] = await database.run(query); rows.forEach(row => { - const json = row.toJSON(); + row.toJSON(); }); provider.forceFlush(); @@ -1155,7 +1044,7 @@ describe('E2E traces with async/await', async () => { database.run(query, (err, rows) => { rows.forEach(row => { - const json = row.toJSON(); + row.toJSON(); }); provider.forceFlush(); @@ -1173,7 +1062,6 @@ describe('E2E traces with async/await', async () => { describe('Negative cases', async () => { let server: grpc.Server; let spanner: Spanner; - let database: Database; let spannerMock: mock.MockSpanner; let traceExporter: typeof InMemorySpanExporter; let provider: typeof TracerProvider; @@ -1360,7 +1248,7 @@ SELECT 1p const database = instance.database('database'); try { - const [rows] = await database.run(selectSql1p); + await database.run(selectSql1p); } catch (e) { // This catch is meant to ensure that we // can assert on the generated spans. @@ -1375,7 +1263,7 @@ SELECT 1p const instance = spanner.instance('instance'); const database = instance.database('database'); - database.run(selectSql1p, (err, rows) => { + database.run(selectSql1p, err => { assert.ok(err); provider.forceFlush(); assertRunBadSyntaxExpectations(); diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 233728784..20e604d94 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -69,18 +69,9 @@ describe('Transaction', () => { const PARTIAL_RESULT_STREAM = sandbox.stub(); const PROMISIFY_ALL = sandbox.stub(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any let Snapshot; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let Dml; - // eslint-disable-next-line @typescript-eslint/no-explicit-any let Transaction; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let PartitionedDml; - // eslint-disable-next-line @typescript-eslint/no-explicit-any let transaction; - - // eslint-disable-next-line @typescript-eslint/no-explicit-any let snapshot; before(() => { @@ -91,9 +82,7 @@ describe('Transaction', () => { }); Snapshot = txns.Snapshot; - Dml = txns.Dml; Transaction = txns.Transaction; - PartitionedDml = txns.PartitionedDml; }); let traceExporter: typeof InMemorySpanExporter; @@ -246,11 +235,10 @@ describe('Transaction', () => { const TABLE = 'my-table-123'; let fakeStream; - let stub; beforeEach(() => { fakeStream = new EventEmitter(); - stub = sandbox.stub(snapshot, 'createReadStream').returns(fakeStream); + sandbox.stub(snapshot, 'createReadStream').returns(fakeStream); }); it('with error', done => { @@ -348,11 +336,10 @@ describe('Transaction', () => { const QUERY = 'SELET * FROM `MyTable`'; let fakeStream; - let stub; beforeEach(() => { fakeStream = new EventEmitter(); - stub = sandbox.stub(snapshot, 'runStream').returns(fakeStream); + sandbox.stub(snapshot, 'runStream').returns(fakeStream); }); it('without error', done => { diff --git a/src/instrument.ts b/src/instrument.ts index 15fe7b8e3..8ad123bf6 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -131,7 +131,6 @@ export function startTrace( config: traceConfig | undefined, cb: (span: Span) => T ): T { - const origConfig = config; if (!config) { config = {} as traceConfig; } diff --git a/src/table.ts b/src/table.ts index 8f87ba00b..227f8d107 100644 --- a/src/table.ts +++ b/src/table.ts @@ -35,7 +35,6 @@ import { ObservabilityOptions, startTrace, setSpanError, - setSpanErrorAndException, traceConfig, } from './instrument'; diff --git a/test/database.ts b/test/database.ts index c1fa54ea1..d5f5ec6db 100644 --- a/test/database.ts +++ b/test/database.ts @@ -46,7 +46,6 @@ import { CommitOptions, MutationSet, } from '../src/transaction'; -import {error} from 'is'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -836,9 +835,8 @@ describe('Database', () => { }); it('should return an error when passing null mutation', done => { - const fakeError = new Error('err'); try { - database.writeAtLeastOnce(null, (err, res) => {}); + database.writeAtLeastOnce(null, () => {}); } catch (err) { const errorMessage = (err as grpc.ServiceError).message; assert.ok( From 530d664936c50ac5d69a33edd275ab17523ec28b Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 30 Oct 2024 10:57:08 +0530 Subject: [PATCH 4/4] moving additional attributes to separate PR --- observability-test/spanner.ts | 46 ----------------------------------- src/partial-result-stream.ts | 3 --- src/transaction-runner.ts | 3 --- 3 files changed, 52 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index c250683ab..c60549776 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -327,49 +327,6 @@ describe('EndToEnd', async () => { ); }); - it('runTransactionAsync with abort', async () => { - let attempts = 0; - await database.runTransactionAsync((transaction): Promise => { - if (!attempts) { - spannerMock.abortTransaction(transaction); - } - attempts++; - return transaction.run(selectSql).then(([rows]) => { - let count = 0; - rows.forEach(() => count++); - return transaction.commit().then(() => count); - }); - }); - assert.strictEqual(attempts, 2); - const expectedSpanNames = [ - 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', - 'CloudSpanner.Snapshot.begin', - 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', - 'CloudSpanner.Transaction.commit', - 'CloudSpanner.Database.runTransactionAsync', - ]; - const expectedEventNames = [ - 'Starting stream', - 'exception', - 'Stream broken. Not safe to retry', - 'Begin Transaction', - 'Transaction Creation Done', - 'Starting stream', - 'Starting Commit', - 'Commit Done', - ...cacheSessionEvents, - 'Using Session', - 'Retrying transaction', - ]; - await verifySpansAndEvents( - traceExporter, - expectedSpanNames, - expectedEventNames - ); - }); - it('writeAtLeastOnce', done => { const blankMutations = new MutationSet(); database.writeAtLeastOnce(blankMutations, async (err, response) => { @@ -1233,7 +1190,6 @@ SELECT 1p const expectedEventNames = [ ...batchCreateSessionsEvents, 'Starting stream', - 'exception', ...waitingSessionsEvents, ]; assert.deepStrictEqual( @@ -1385,7 +1341,6 @@ SELECT 1p const expectedEventNames = [ ...batchCreateSessionsEvents, 'Starting stream', - 'exception', 'Stream broken. Safe to retry', 'Begin Transaction', 'Transaction Creation Done', @@ -1778,7 +1733,6 @@ describe('Traces for ExecuteStream broken stream retries', () => { const expectedEventNames = [ ...batchCreateSessionsEvents, 'Starting stream', - 'exception', ...waitingSessionsEvents, 'Transaction Creation Done', ]; diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 8bf414c7a..69439f534 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -28,7 +28,6 @@ import {DeadlineError, isRetryableInternalError} from './transaction-runner'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; import * as stream from 'stream'; -import {getActiveOrNoopSpan, setSpanErrorAndException} from './instrument'; export type ResumeToken = string | Uint8Array; @@ -495,7 +494,6 @@ export function partialResultStream( let lastRequestStream: Readable; const startTime = Date.now(); const timeout = options?.gaxOptions?.timeout ?? Infinity; - const span = getActiveOrNoopSpan(); // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -570,7 +568,6 @@ export function partialResultStream( // checkpoint stream has queued. After that, we will destroy the // user's stream with the same error. setImmediate(() => batchAndSplitOnTokenStream.destroy(err)); - setSpanErrorAndException(span, err as Error); return; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index a99e6bd12..61d979e8c 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -25,7 +25,6 @@ import {NormalCallback} from './common'; import {isSessionNotFoundError} from './session-pool'; import {Database} from './database'; import {google} from '../protos/protos'; -import {getActiveOrNoopSpan} from './instrument'; import IRequestOptions = google.spanner.v1.IRequestOptions; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -227,7 +226,6 @@ export abstract class Runner { const timeout = this.options.timeout!; let lastError: grpc.ServiceError; - const span = getActiveOrNoopSpan(); // The transaction runner should always execute at least one attempt before // timing out. @@ -252,7 +250,6 @@ export abstract class Runner { } this.attempts += 1; - span.addEvent('Retrying transaction'); const delay = this.getNextDelay(lastError); await new Promise(resolve => setTimeout(resolve, delay));