diff --git a/observability-test/database.ts b/observability-test/database.ts index f938735e4..f092bfd42 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -19,7 +19,7 @@ import {EventEmitter} from 'events'; import * as assert from 'assert'; import * as extend from 'extend'; import {google} from '../protos/protos'; -import {CommitCallback, CommitOptions} from '../src/transaction'; +import {CommitCallback, CommitOptions, MutationSet} from '../src/transaction'; import {util} from '@google-cloud/common'; import {Transform} from 'stream'; import * as proxyquire from 'proxyquire'; @@ -810,4 +810,478 @@ describe('Database', () => { }); }); }); + + describe('writeAtLeastOnce', () => { + const mutations = new MutationSet(); + mutations.insert('MyTable', { + Key: 'k3', + Thing: 'xyz', + }); + + const SESSION = new FakeSession(); + const RESPONSE = {commitTimestamp: {seconds: 1, nanos: 0}}; + const TRANSACTION = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + let pool: FakeSessionPool; + + beforeEach(() => { + pool = database.pool_; + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( + callback => { + callback(null, SESSION, TRANSACTION); + } + ); + }); + + it('should return any errors getting a session', done => { + const fakeErr = new Error('getting session error'); + + (pool.getSession as sinon.SinonStub).callsFake(callback => + callback(fakeErr, null, null) + ); + + database.writeAtLeastOnce(mutations, err => { + assert.deepStrictEqual(err, fakeErr); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getting session error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + }); + + it('with empty mutation should return successful CommitResponse', done => { + const fakeMutations = new MutationSet(); + try { + database.writeAtLeastOnce(fakeMutations, (err, response) => { + assert.ifError(err); + assert.deepStrictEqual( + response.commitTimestamp, + RESPONSE.commitTimestamp + ); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + } catch (error) { + assert(error instanceof Error); + } + }); + + it('with error on null mutation should catch thrown error', done => { + const fakeError = new Error('err'); + try { + database.writeAtLeastOnce(null, (err, res) => {}); + } catch (err) { + (err as grpc.ServiceError).message.includes( + "Cannot read properties of null (reading 'proto')" + ); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.writeAtLeastOnce']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + "Cannot read properties of null (reading 'proto')", + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + } + }); + + it('should return CommitResponse on successful write using Callback', done => { + database.writeAtLeastOnce(mutations, (err, res) => { + assert.deepStrictEqual(err, null); + assert.deepStrictEqual(res, RESPONSE); + done(); + }); + }); + + it('should return CommitResponse on successful write using await', async () => { + sinon.stub(database, 'writeAtLeastOnce').resolves([RESPONSE]); + const [response, err] = await database.writeAtLeastOnce(mutations, {}); + assert.deepStrictEqual( + response.commitTimestamp, + RESPONSE.commitTimestamp + ); + }); + }); + + describe('runTransaction', () => { + const SESSION = new FakeSession(); + const TRANSACTION = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + let pool: FakeSessionPool; + + beforeEach(() => { + pool = database.pool_; + + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( + callback => { + callback(null, SESSION, TRANSACTION); + } + ); + }); + + it('with error getting session', done => { + const fakeErr = new Error('getting a session'); + + (pool.getSession as sinon.SinonStub).callsFake(callback => + callback(fakeErr) + ); + + database.runTransaction(err => { + assert.strictEqual(err, fakeErr); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getting a session', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + }); + + it('with other errors when running the transaction', done => { + const releaseStub = ( + sandbox.stub(pool, 'release') as sinon.SinonStub + ).withArgs(SESSION); + const fakeError = new Error('internal rejects err'); + + sandbox.stub(FakeTransactionRunner.prototype, 'run').rejects(fakeError); + + database.runTransaction(err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransaction']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'internal rejects err', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + }); + }); + + describe('runStream', () => { + const QUERY = { + sql: 'SELECT * FROM table', + a: 'b', + c: 'd', + }; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeSession2: FakeSession; + let fakeSnapshot: FakeTransaction; + let fakeSnapshot2: FakeTransaction; + let fakeStream: Transform; + let fakeStream2: Transform; + + let getSessionStub: sinon.SinonStub; + let snapshotStub: sinon.SinonStub; + let runStreamStub: sinon.SinonStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeSession2 = new FakeSession(); + fakeSnapshot = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + fakeSnapshot2 = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadOnly + ); + fakeStream = through.obj(); + fakeStream2 = through.obj(); + + getSessionStub = (sandbox.stub(fakePool, 'getSession') as sinon.SinonStub) + .onFirstCall() + .callsFake(callback => callback(null, fakeSession)) + .onSecondCall() + .callsFake(callback => callback(null, fakeSession2)); + + snapshotStub = sandbox + .stub(fakeSession, 'snapshot') + .returns(fakeSnapshot); + + sandbox.stub(fakeSession2, 'snapshot').returns(fakeSnapshot2); + + runStreamStub = sandbox + .stub(fakeSnapshot, 'runStream') + .returns(fakeStream); + + sandbox.stub(fakeSnapshot2, 'runStream').returns(fakeStream2); + }); + + it('with error on `getSession`', done => { + const fakeError = new Error('getSession error'); + + getSessionStub.onFirstCall().callsFake(callback => callback(fakeError)); + + database.runStream(QUERY).on('error', err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runStream']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'getSession error', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + }); + + it('propagation on stream/transaction errors', done => { + const fakeError = new Error('propagation err'); + const endStub = sandbox.stub(fakeSnapshot, 'end'); + + database.runStream(QUERY).on('error', err => { + assert.strictEqual(err, fakeError); + assert.strictEqual(endStub.callCount, 1); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'propagation err', + firstSpan.status.message, + 'Mismatched span status message' + ); + + done(); + }); + + fakeStream.destroy(fakeError); + }); + + it('retries with "Session not found" error', done => { + const sessionNotFoundError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as grpc.ServiceError; + const endStub = sandbox.stub(fakeSnapshot, 'end'); + const endStub2 = sandbox.stub(fakeSnapshot2, 'end'); + let rows = 0; + + database + .runStream(QUERY) + .on('data', () => rows++) + .on('error', err => { + assert.fail(err); + }) + .on('end', () => { + assert.strictEqual(endStub.callCount, 1); + assert.strictEqual(endStub2.callCount, 1); + assert.strictEqual(rows, 1); + + const spans = traceExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const secondSpan = spans[1]; + assert.strictEqual( + SpanStatusCode.ERROR, + secondSpan.status.code, + 'Expected an ERROR span status' + ); + assert.strictEqual( + 'Session not found', + secondSpan.status.message, + 'Mismatched span status message' + ); + + // Ensure that the final span that got retries did not error. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected an span status code' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + done(); + }); + + fakeStream.emit('error', sessionNotFoundError); + fakeStream2.push('row1'); + fakeStream2.push(null); + }); + }); }); diff --git a/src/database.ts b/src/database.ts index 1d4d2fef4..a805d7a84 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3595,14 +3595,20 @@ class Database extends common.GrpcServiceObject { } span.addEvent('Using Session', {'session.id': session?.id}); this._releaseOnEnd(session!, transaction!, span); - transaction?.setQueuedMutations(mutations.proto()); - return transaction?.commit(options, (err, resp) => { - if (err) { - setSpanError(span, err); - } + try { + transaction?.setQueuedMutations(mutations.proto()); + return transaction?.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + cb!(err, resp); + }); + } catch (e) { + setSpanErrorAndException(span, e as Error); span.end(); - cb!(err, resp); - }); + throw e; + } }); }); }