From a464bdb5cbb7856b7a08dac3ff48132948b65792 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 8 Oct 2024 05:30:29 -0700 Subject: [PATCH] feat(observability): trace Transaction (#2122) This change adds observability tracing for Transaction along with tests. Updates #2079 Built from PR #2087 Updates #2114 --- observability-test/spanner.ts | 263 ++++++++++- observability-test/transaction.ts | 737 ++++++++++++++++++++++++++++++ src/database.ts | 32 +- src/transaction.ts | 643 +++++++++++++++----------- 4 files changed, 1388 insertions(+), 287 deletions(-) create mode 100644 observability-test/transaction.ts diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index bf3d93538..3e9cc295b 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -37,6 +37,9 @@ const { const {ObservabilityOptions} = require('../src/instrument'); +const selectSql = 'SELECT 1'; +const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; + /** A simple result set for SELECT 1. */ function createSelect1ResultSet(): protobuf.ResultSet { const fields = [ @@ -85,8 +88,6 @@ async function setup( ); }); - const selectSql = 'SELECT 1'; - const updateSql = 'UPDATE FOO SET BAR=1 WHERE BAZ=2'; spannerMock.putStatementResult( selectSql, mock.StatementResult.resultSet(createSelect1ResultSet()) @@ -205,7 +206,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -216,14 +216,19 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.getSnapshot']; + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Database.getSnapshot', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = []; + const expectedEventNames = ['Begin Transaction']; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -279,7 +284,6 @@ describe('EndToEnd', () => { .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -290,7 +294,10 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runStream']; + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, @@ -313,7 +320,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); // Sort the spans by duration. spans.sort((spanA, spanB) => { @@ -330,6 +336,7 @@ describe('EndToEnd', () => { }); const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; @@ -375,7 +382,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -386,7 +392,11 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + const expectedSpanNames = [ + 'CloudSpanner.Database.runTransaction', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, @@ -413,7 +423,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -424,14 +433,21 @@ describe('EndToEnd', () => { }); }); - const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + const expectedSpanNames = [ + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.writeAtLeastOnce', + ]; assert.deepStrictEqual( actualSpanNames, expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['Using Session']; + const expectedEventNames = [ + 'Starting Commit', + 'Commit Done', + 'Using Session', + ]; assert.deepStrictEqual( actualEventNames, expectedEventNames, @@ -522,6 +538,226 @@ describe('ObservabilityOptions injection and propagation', async () => { done(); }); + afterEach(async () => { + await injectedTracerProvider.forceFlush(); + injectedTraceExporter.reset(); + }); + + let database: Database; + beforeEach(() => { + const instance = spanner.instance('instance'); + database = instance.database('db'); + }); + + describe('Transaction', () => { + const traceExporter = injectedTraceExporter; + + it('run', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + tx!.run('SELECT 1', (err, rows) => { + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getTransaction', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => expectedEventNames.includes(value)), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('Transaction.begin+Dml.runUpdate', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + // Firstly erase the prior spans so that we can have only Transaction spans. + traceExporter.reset(); + + tx!.begin(); + tx!.runUpdate(updateSql, (err, rowCount) => { + assert.ifError(err); + + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 4); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => expectedEventNames.includes(value)), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('runStream', done => { + let rowCount = 0; + database.getTransaction((err, tx) => { + assert.ifError(err); + tx! + .runStream(selectSql) + .on('data', () => rowCount++) + .on('error', assert.ifError) + .on('stats', _stats => {}) + .on('end', () => { + tx!.end(); + + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.getTransaction', + 'CloudSpanner.Snapshot.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + + it('rollback', done => { + database.getTransaction((err, tx) => { + assert.ifError(err); + + // Firstly erase the prior spans so that we can have only Transaction spans. + traceExporter.reset(); + + tx!.begin(); + + tx!.runUpdate(updateSql, async (err, rowCount) => { + assert.ifError(err); + tx!.rollback(err => { + traceExporter.forceFlush(); + + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.Transaction.rollback', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + ]; + assert.strictEqual( + actualEventNames.every(value => + expectedEventNames.includes(value) + ), + true, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + }); + }); + it('Propagates spans to the injected not global TracerProvider', done => { const instance = spanner.instance('instance'); const database = instance.database('database'); @@ -558,6 +794,7 @@ describe('ObservabilityOptions injection and propagation', async () => { }); const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Database.runStream', 'CloudSpanner.Database.run', ]; diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts new file mode 100644 index 000000000..7d1795e49 --- /dev/null +++ b/observability-test/transaction.ts @@ -0,0 +1,737 @@ +/*! + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as assert from 'assert'; +import {before, beforeEach, afterEach, describe, it} from 'mocha'; +import {EventEmitter} from 'events'; +import * as proxyquire from 'proxyquire'; +import * as sinon from 'sinon'; + +import {codec} from '../src/codec'; +const { + AlwaysOnSampler, + NodeTracerProvider, + InMemorySpanExporter, +} = require('@opentelemetry/sdk-trace-node'); +// eslint-disable-next-line n/no-extraneous-require +const {SpanStatusCode} = require('@opentelemetry/api'); +const { + ReadableSpan, + SimpleSpanProcessor, +} = require('@opentelemetry/sdk-trace-base'); + +describe('Transaction', () => { + const sandbox = sinon.createSandbox(); + + const REQUEST = sandbox.stub(); + const REQUEST_STREAM = sandbox.stub(); + const SESSION_NAME = 'session-123'; + + const SPANNER = { + routeToLeaderEnabled: true, + directedReadOptions: {}, + }; + + const INSTANCE = { + parent: SPANNER, + }; + + const DATABASE = { + formattedName_: 'formatted-database-name', + parent: INSTANCE, + }; + + const SESSION = { + parent: DATABASE, + formattedName_: SESSION_NAME, + request: REQUEST, + requestStream: REQUEST_STREAM, + }; + + const PARTIAL_RESULT_STREAM = sandbox.stub(); + const PROMISIFY_ALL = sandbox.stub(); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Snapshot; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Dml; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let Transaction; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let PartitionedDml; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let transaction; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let snapshot; + + before(() => { + const txns = proxyquire('../src/transaction', { + '@google-cloud/promisify': {promisifyAll: PROMISIFY_ALL}, + './codec': {codec}, + './partial-result-stream': {partialResultStream: PARTIAL_RESULT_STREAM}, + }); + + Snapshot = txns.Snapshot; + Dml = txns.Dml; + Transaction = txns.Transaction; + PartitionedDml = txns.PartitionedDml; + }); + + let traceExporter: typeof InMemorySpanExporter; + let tracerProvider: typeof NodeTracerProvider; + + beforeEach(() => { + traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + + tracerProvider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + const SNAPSHOT_OPTIONS = {a: 'b', c: 'd'}; + sandbox.stub(Snapshot, 'encodeTimestampBounds').returns(SNAPSHOT_OPTIONS); + snapshot = new Snapshot(SESSION); + snapshot._observabilityOptions = {tracerProvider: tracerProvider}; + + transaction = new Transaction(SESSION); + transaction._observabilityOptions = {tracerProvider: tracerProvider}; + }); + + afterEach(async () => { + sandbox.restore(); + await tracerProvider.forceFlush(); + traceExporter.reset(); + }); + + after(async () => { + await tracerProvider.shutdown(); + }); + + interface spanExportResults { + spans: (typeof ReadableSpan)[]; + spanNames: string[]; + spanEventNames: string[]; + } + + function extractExportedSpans(): spanExportResults { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + // Sort the spans by startTime. + spans.sort((spanA, spanB) => { + spanA.startTime < spanB.startTime; + }); + + const spanNames: string[] = []; + const eventNames: string[] = []; + spans.forEach(span => { + spanNames.push(span.name); + span.events.forEach(event => { + eventNames.push(event.name); + }); + }); + + return { + spans: spans, + spanNames: spanNames, + spanEventNames: eventNames, + } as spanExportResults; + } + + describe('Snapshot', () => { + describe('begin', () => { + const BEGIN_RESPONSE = { + id: Buffer.from('transaction-id-123'), + }; + + it('without error', done => { + REQUEST.callsFake((_, callback) => callback(null, BEGIN_RESPONSE)); + + snapshot.begin((err, resp) => { + assert.ifError(err); + assert.strictEqual(resp, BEGIN_RESPONSE); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.begin']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Begin Transaction']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with error', done => { + const fakeError = new Error('begin.error'); + + REQUEST.callsFake((_, callback) => callback(fakeError)); + + snapshot.begin(err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.begin']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Begin Transaction']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'begin.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + }); + + describe('read', () => { + const TABLE = 'my-table-123'; + + let fakeStream; + let stub; + + beforeEach(() => { + fakeStream = new EventEmitter(); + stub = sandbox.stub(snapshot, 'createReadStream').returns(fakeStream); + }); + + it('with error', done => { + const fakeError = new Error('read.error'); + + snapshot.read(TABLE, {}, err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.read']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'read.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeStream.emit('error', fakeError); + }); + + it('without error', done => { + const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; + + snapshot.read(TABLE, {}, (err, rows) => { + assert.ifError(err); + assert.deepStrictEqual(rows, fakeRows); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.read']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeRows.forEach(row => fakeStream.emit('data', row)); + fakeStream.emit('end'); + }); + }); + + describe('run', () => { + const QUERY = 'SELET * FROM `MyTable`'; + + let fakeStream; + let stub; + + beforeEach(() => { + fakeStream = new EventEmitter(); + stub = sandbox.stub(snapshot, 'runStream').returns(fakeStream); + }); + + it('without error', done => { + const fakeRows = [{a: 'b'}, {c: 'd'}, {e: 'f'}]; + + snapshot.run(QUERY, (err, rows) => { + assert.ifError(err); + assert.deepStrictEqual(rows, fakeRows); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.run']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + done(); + }); + + fakeRows.forEach(row => fakeStream.emit('data', row)); + fakeStream.emit('end'); + }); + + it('with errors', done => { + const fakeError = new Error('run.error'); + + snapshot.run(QUERY, err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.run']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'run.error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeStream.emit('error', fakeError); + }); + }); + + describe('runStream', () => { + const QUERY = { + sql: 'SELECT * FROM `MyTable`', + }; + + beforeEach(() => { + PARTIAL_RESULT_STREAM.callsFake(makeRequest => makeRequest()); + }); + + it('with error', done => { + REQUEST_STREAM.resetHistory(); + + const fakeQuery = Object.assign({}, QUERY, { + params: {a: undefined}, + }); + + const stream = snapshot.runStream(fakeQuery); + stream.on('error', error => { + assert.strictEqual( + error.message, + 'Value of type undefined not recognized.' + ); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Snapshot.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['exception']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + 'Value of type undefined not recognized.', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + assert.ok(!REQUEST_STREAM.called, 'No request should be made'); + }); + }); + }); + + describe('rollback', () => { + const ID = 'transaction-id-0xdedabeef'; + + beforeEach(() => { + transaction.id = ID; + }); + + it('error with unset `id`', done => { + const expectedError = new Error( + 'Transaction ID is unknown, nothing to rollback.' + ); + delete transaction.id; + + transaction.rollback(err => { + assert.deepStrictEqual(err, expectedError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + expectedError.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with request error', done => { + const fakeError = new Error('our request error'); + transaction.request = (config, callback) => { + callback(fakeError); + }; + + transaction.rollback(err => { + assert.deepStrictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + 'our request error', + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with no error', done => { + transaction.request = (config, callback) => { + callback(null); + }; + + transaction.rollback(err => { + assert.ifError(err); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.rollback']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + }); + + describe('commit', () => { + it('without error', done => { + const id = 'transaction-id-123'; + const transactionTag = 'bar'; + transaction.id = id; + transaction.requestOptions = {transactionTag}; + + transaction.request = (config, callback) => { + callback(null, {}); + }; + + transaction.commit(err => { + assert.ifError(err); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.commit']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Starting Commit', 'Commit Done']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + }); + + it('with generic error', () => { + const fakeError = new Error('commit.error'); + transaction.request = (config, callback) => { + callback(fakeError, {}); + }; + + transaction.commit(err => { + assert.strictEqual(err, fakeError); + + const exportResults = extractExportedSpans(); + const actualSpanNames = exportResults.spanNames; + const actualEventNames = exportResults.spanEventNames; + + const expectedSpanNames = ['CloudSpanner.Transaction.commit']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = ['Starting Commit', 'Commit failed']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + // Ensure that the final span that got retries did not error. + const spans = exportResults.spans; + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + fakeError.message, + firstSpan.status.message, + 'Unexpected span status message' + ); + }); + }); + }); +}); diff --git a/src/database.ts b/src/database.ts index 917bcabcf..aad10f111 100644 --- a/src/database.ts +++ b/src/database.ts @@ -828,8 +828,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.createBatchTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.createBatchTransaction', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -1875,8 +1875,8 @@ class Database extends common.GrpcServiceObject { delete (gaxOpts as GetSessionsOptions).pageToken; } - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getSessions', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getSessions', traceConfig, span => { this.request< google.spanner.v1.ISession, google.spanner.v1.IListSessionsResponse @@ -2058,8 +2058,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getSnapshot', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getSnapshot', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -2159,8 +2159,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as GetTransactionOptions) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.getTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.getTransaction', traceConfig, span => { this.pool_.getSession((err, session, transaction) => { if (options.requestOptions) { transaction!.requestOptions = Object.assign( @@ -2786,8 +2786,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as TimestampBounds) : {}; - const q = {sql: query, opts: this._observabilityOptions}; - return startTrace('Database.run', q, span => { + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('Database.run', traceConfig, span => { this.runStream(query, options) .on('error', err => { setSpanError(span, err); @@ -3007,8 +3007,8 @@ class Database extends common.GrpcServiceObject { options?: TimestampBounds ): PartialResultStream { const proxyStream: Transform = through.obj(); - const q = {sql: query, opts: this._observabilityOptions}; - return startTrace('Database.runStream', q, span => { + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('Database.runStream', traceConfig, span => { this.pool_.getSession((err, session) => { if (err) { setSpanError(span, err); @@ -3185,8 +3185,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const q = {opts: this._observabilityOptions}; - startTrace('Database.runTransaction', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + startTrace('Database.runTransaction', traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { setSpanError(span, err); @@ -3578,8 +3578,8 @@ class Database extends common.GrpcServiceObject { ? (optionsOrCallback as CallOptions) : {}; - const q = {opts: this._observabilityOptions}; - return startTrace('Database.writeAtLeastOnce', q, span => { + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Database.writeAtLeastOnce', traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err && isSessionNotFoundError(err as grpc.ServiceError)) { span.addEvent('No session available', { diff --git a/src/transaction.ts b/src/transaction.ts index ca96864e1..7e0dbb40b 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -22,7 +22,7 @@ import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; import {common as p} from 'protobufjs'; -import {Readable, PassThrough} from 'stream'; +import {finished, Readable, PassThrough, Stream} from 'stream'; import {codec, Json, JSONOptions, Type, Value} from './codec'; import { @@ -46,7 +46,12 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions; import IRequestOptions = google.spanner.v1.IRequestOptions; import {Database, Spanner} from '.'; import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + startTrace, + setSpanError, + setSpanErrorAndException, +} from './instrument'; export type Rows = Array; const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo'; @@ -351,6 +356,7 @@ export class Snapshot extends EventEmitter { }; this._waitingRequests = []; this._inlineBeginStarted = false; + this._observabilityOptions = session._observabilityOptions; } /** @@ -416,9 +422,6 @@ export class Snapshot extends EventEmitter { options, }; - const span = getActiveOrNoopSpan(); - span.addEvent('Begin Transaction'); - // Only hand crafted read-write transactions will be able to set a // transaction tag for the BeginTransaction RPC. Also, this.requestOptions // is only set in the constructor of Transaction, which is the constructor @@ -436,26 +439,34 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'beginTransaction', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ITransaction - ) => { - if (err) { - callback!(err, resp); - return; + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Snapshot.begin', traceConfig, span => { + span.addEvent('Begin Transaction'); + + this.request( + { + client: 'SpannerClient', + method: 'beginTransaction', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ITransaction + ) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err, resp); + return; + } + this._update(resp); + span.end(); + callback!(null, resp); } - this._update(resp); - callback!(null, resp); - } - ); + ); + }); } /** @@ -706,31 +717,56 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - ?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + const traceConfig = {tableName: table, opts: this._observabilityOptions}; + return startTrace('Snapshot.createReadStream', traceConfig, span => { + const resultStream = partialResultStream( + this._wrapWithIdWaiter(makeRequest), + { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, } - }); + ) + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + setSpanError(span, err); + }) + .on('end', err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + + if (resultStream instanceof Stream) { + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return resultStream; + }); } /** @@ -925,10 +961,21 @@ export class Snapshot extends EventEmitter { callback = cb as ReadCallback; } - this.createReadStream(table, request) - .on('error', callback!) - .on('data', row => rows.push(row)) - .on('end', () => callback!(null, rows)); + const traceConfig = {tableName: table, opts: this._observabilityOptions}; + return startTrace('Snapshot.read', traceConfig, span => { + this.createReadStream(table, request) + .on('error', err => { + const e = err as grpc.ServiceError; + setSpanError(span, e); + span.end(); + callback!(e, null); + }) + .on('data', row => rows.push(row)) + .on('end', () => { + span.end(); + callback!(null, rows); + }); + }); } /** @@ -1018,19 +1065,29 @@ export class Snapshot extends EventEmitter { let stats: google.spanner.v1.ResultSetStats; let metadata: google.spanner.v1.ResultSetMetadata; - this.runStream(query) - .on('error', callback!) - .on('response', response => { - if (response.metadata) { - metadata = response.metadata; - if (metadata.transaction && !this.id) { - this._update(metadata.transaction); + const traceConfig = {sql: query, opts: this._observabilityOptions}; + startTrace('Snapshot.run', traceConfig, span => { + return this.runStream(query) + .on('error', (err, rows, stats, metadata) => { + setSpanError(span, err); + span.end(); + callback!(err, rows, stats, metadata); + }) + .on('response', response => { + if (response.metadata) { + metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } - } - }) - .on('data', row => rows.push(row)) - .on('stats', _stats => (stats = _stats)) - .on('end', () => callback!(null, rows, stats, metadata)); + }) + .on('data', row => rows.push(row)) + .on('stats', _stats => (stats = _stats)) + .on('end', () => { + span.end(); + callback!(null, rows, stats, metadata); + }); + }); } /** @@ -1201,51 +1258,78 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts || (this.id && !reqOpts.transaction.id)) { - try { - sanitizeRequest(); - } catch (e) { - const errorStream = new PassThrough(); - setImmediate(() => errorStream.destroy(e as Error)); - return errorStream; + const traceConfig = {opts: this._observabilityOptions, ...query}; + return startTrace('Snapshot.runStream', traceConfig, span => { + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { + try { + sanitizeRequest(); + } catch (e) { + const errorStream = new PassThrough(); + setSpanErrorAndException(span, e as Error); + span.end(); + setImmediate(() => errorStream.destroy(e as Error)); + return errorStream; + } } - } - - return this.requestStream({ - client: 'SpannerClient', - method: 'executeStreamingSql', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - return partialResultStream(this._wrapWithIdWaiter(makeRequest), { - json, - jsonOptions, - maxResumeRetries, - columnsMetadata, - gaxOptions, - }) - .on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }) - .on('error', err => { - const isServiceError = err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + return this.requestStream({ + client: 'SpannerClient', + method: 'executeStreamingSql', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + + const resultStream = partialResultStream( + this._wrapWithIdWaiter(makeRequest), + { + json, + jsonOptions, + maxResumeRetries, + columnsMetadata, + gaxOptions, } - }); + ) + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', err => { + setSpanError(span, err as Error); + const isServiceError = + err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { + this.begin(); + } + }) + .on('end', err => { + if (err) { + setSpanError(span, err as Error); + } + span.end(); + }); + + if (resultStream instanceof Stream) { + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } + + return resultStream; + }); } /** @@ -1543,22 +1627,30 @@ export class Dml extends Snapshot { query = {sql: query} as ExecuteSqlRequest; } - this.run( - query, - ( - err: null | grpc.ServiceError, - rows: Rows, - stats: spannerClient.spanner.v1.ResultSetStats - ) => { - let rowCount = 0; - - if (stats && stats.rowCount) { - rowCount = Math.floor(stats[stats.rowCount] as number); - } + const traceConfig = {opts: this._observabilityOptions, ...query}; + return startTrace('Dml.runUpdate', traceConfig, span => { + this.run( + query, + ( + err: null | grpc.ServiceError, + rows: Rows, + stats: spannerClient.spanner.v1.ResultSetStats + ) => { + let rowCount = 0; + + if (stats && stats.rowCount) { + rowCount = Math.floor(stats[stats.rowCount] as number); + } - callback!(err, rowCount); - } - ); + if (err) { + setSpanError(span, err); + } + + span.end(); + callback!(err, rowCount); + } + ); + }); } } @@ -1812,57 +1904,64 @@ export class Transaction extends Dml { addLeaderAwareRoutingHeader(headers); } - this.request( - { - client: 'SpannerClient', - method: 'executeBatchDml', - reqOpts, - gaxOpts, - headers: headers, - }, - ( - err: null | grpc.ServiceError, - resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse - ) => { - let batchUpdateError: BatchUpdateError; - - if (err) { - const rowCounts: number[] = []; - batchUpdateError = Object.assign(err, {rowCounts}); - callback!(batchUpdateError, rowCounts, resp); - return; - } + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.batchUpdate', traceConfig, span => { + this.request( + { + client: 'SpannerClient', + method: 'executeBatchDml', + reqOpts, + gaxOpts, + headers: headers, + }, + ( + err: null | grpc.ServiceError, + resp: spannerClient.spanner.v1.ExecuteBatchDmlResponse + ) => { + let batchUpdateError: BatchUpdateError; + + if (err) { + const rowCounts: number[] = []; + batchUpdateError = Object.assign(err, {rowCounts}); + setSpanError(span, batchUpdateError); + span.end(); + callback!(batchUpdateError, rowCounts, resp); + return; + } - const {resultSets, status} = resp; - for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata?.transaction) { - this._update(resultSet.metadata.transaction); + const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } + const rowCounts: number[] = resultSets.map(({stats}) => { + return ( + (stats && + Number( + stats[ + (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! + ] + )) || + 0 + ); + }); + + if (status && status.code !== 0) { + const error = new Error(status.message!); + batchUpdateError = Object.assign(error, { + code: status.code, + metadata: Transaction.extractKnownMetadata(status.details!), + rowCounts, + }) as BatchUpdateError; + setSpanError(span, batchUpdateError); } - } - const rowCounts: number[] = resultSets.map(({stats}) => { - return ( - (stats && - Number( - stats[ - (stats as spannerClient.spanner.v1.ResultSetStats).rowCount! - ] - )) || - 0 - ); - }); - if (status && status.code !== 0) { - const error = new Error(status.message!); - batchUpdateError = Object.assign(error, { - code: status.code, - metadata: Transaction.extractKnownMetadata(status.details!), - rowCounts, - }) as BatchUpdateError; + span.end(); + callback!(batchUpdateError!, rowCounts, resp); } - - callback!(batchUpdateError!, rowCounts, resp); - } - ); + ); + }); } private static extractKnownMetadata( @@ -1975,69 +2074,82 @@ export class Transaction extends Dml { const requestOptions = (options as CommitOptions).requestOptions; const reqOpts: CommitRequest = {mutations, session, requestOptions}; - const span = getActiveOrNoopSpan(); - - if (this.id) { - reqOpts.transactionId = this.id as Uint8Array; - } else if (!this._useInRunner) { - reqOpts.singleUseTransaction = this._options; - } else { - this.begin().then(() => this.commit(options, callback), callback); - return; - } - - if ( - 'returnCommitStats' in options && - (options as CommitOptions).returnCommitStats - ) { - reqOpts.returnCommitStats = (options as CommitOptions).returnCommitStats; - } - if ( - 'maxCommitDelay' in options && - (options as CommitOptions).maxCommitDelay - ) { - reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; - } - reqOpts.requestOptions = Object.assign( - requestOptions || {}, - this.requestOptions - ); + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.commit', traceConfig, span => { + if (this.id) { + reqOpts.transactionId = this.id as Uint8Array; + } else if (!this._useInRunner) { + reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => { + this.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + }, callback); + return; + } - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } + if ( + 'returnCommitStats' in options && + (options as CommitOptions).returnCommitStats + ) { + reqOpts.returnCommitStats = ( + options as CommitOptions + ).returnCommitStats; + } + if ( + 'maxCommitDelay' in options && + (options as CommitOptions).maxCommitDelay + ) { + reqOpts.maxCommitDelay = (options as CommitOptions).maxCommitDelay; + } + reqOpts.requestOptions = Object.assign( + requestOptions || {}, + this.requestOptions + ); - span.addEvent('Starting Commit'); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); + } - this.request( - { - client: 'SpannerClient', - method: 'commit', - reqOpts, - gaxOpts: gaxOpts, - headers: headers, - }, - (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { - this.end(); + span.addEvent('Starting Commit'); + + this.request( + { + client: 'SpannerClient', + method: 'commit', + reqOpts, + gaxOpts: gaxOpts, + headers: headers, + }, + (err: null | Error, resp: spannerClient.spanner.v1.ICommitResponse) => { + this.end(); + + if (err) { + span.addEvent('Commit failed'); + setSpanError(span, err); + } else { + span.addEvent('Commit Done'); + } - if (err) { - span.addEvent('Commit failed'); - } else { - span.addEvent('Commit Done'); - } + if (resp && resp.commitTimestamp) { + this.commitTimestampProto = resp.commitTimestamp; + this.commitTimestamp = new PreciseDate( + resp.commitTimestamp as DateStruct + ); + } + err = Transaction.decorateCommitError(err as ServiceError, mutations); - if (resp && resp.commitTimestamp) { - this.commitTimestampProto = resp.commitTimestamp; - this.commitTimestamp = new PreciseDate( - resp.commitTimestamp as DateStruct - ); + span.end(); + callback!(err as ServiceError | null, resp); } - err = Transaction.decorateCommitError(err as ServiceError, mutations); - - callback!(err as ServiceError | null, resp); - } - ); + ); + }); } /** @@ -2328,40 +2440,48 @@ export class Transaction extends Dml { const callback = typeof gaxOptionsOrCallback === 'function' ? gaxOptionsOrCallback : cb!; - if (!this.id) { - callback!( - new Error( + const traceConfig = {opts: this._observabilityOptions}; + return startTrace('Transaction.rollback', traceConfig, span => { + if (!this.id) { + const err = new Error( 'Transaction ID is unknown, nothing to rollback.' - ) as ServiceError - ); - return; - } + ) as ServiceError; + setSpanError(span, err); + span.end(); + callback!(err); + return; + } - const session = this.session.formattedName_!; - const transactionId = this.id; - const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { - session, - transactionId, - }; + const session = this.session.formattedName_!; + const transactionId = this.id; + const reqOpts: spannerClient.spanner.v1.IRollbackRequest = { + session, + transactionId, + }; - const headers = this.resourceHeader_; - if (this._getSpanner().routeToLeaderEnabled) { - addLeaderAwareRoutingHeader(headers); - } - - this.request( - { - client: 'SpannerClient', - method: 'rollback', - reqOpts, - gaxOpts, - headers: headers, - }, - (err: null | ServiceError) => { - this.end(); - callback!(err); + const headers = this.resourceHeader_; + if (this._getSpanner().routeToLeaderEnabled) { + addLeaderAwareRoutingHeader(headers); } - ); + + this.request( + { + client: 'SpannerClient', + method: 'rollback', + reqOpts, + gaxOpts, + headers: headers, + }, + (err: null | ServiceError) => { + if (err) { + setSpanError(span, err); + } + span.end(); + this.end(); + callback!(err); + } + ); + }); } /** @@ -2813,9 +2933,16 @@ export class PartitionedDml extends Dml { query: string | ExecuteSqlRequest, callback?: RunUpdateCallback ): void | Promise { - super.runUpdate(query, (err, count) => { - this.end(); - callback!(err, count); + const traceConfig = {sql: query, opts: this._observabilityOptions}; + return startTrace('PartitionedDml.runUpdate', traceConfig, span => { + super.runUpdate(query, (err, count) => { + if (err) { + setSpanError(span, err); + } + this.end(); + span.end(); + callback!(err, count); + }); }); } }