From 3d06c452c129b6e7a7df3aa2ee961c4e486b6f53 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Sat, 6 Aug 2022 02:22:23 +0000 Subject: [PATCH 01/16] feat: inline BeginTransaction with first statement --- src/session-pool.ts | 1 - src/transaction.ts | 38 ++++++++++---- test/mockserver/mockspanner.ts | 96 ++++++++++++++++++++-------------- test/session-pool.ts | 15 ------ test/spanner.ts | 86 ++++++++++++++++++------------ test/transaction.ts | 6 ++- 6 files changed, 141 insertions(+), 101 deletions(-) diff --git a/src/session-pool.ts b/src/session-pool.ts index 47fb8df9a..742e8b083 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -1159,7 +1159,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { const transaction = session.transaction( (session.parent as Database).queryOptions_ ); - await transaction.begin(); session.txn = transaction; } diff --git a/src/transaction.ts b/src/transaction.ts index fde8907de..c4ac63c4e 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -378,17 +378,7 @@ export class Snapshot extends EventEmitter { callback!(err, resp); return; } - - const {id, readTimestamp} = resp; - - this.id = id!; - this.metadata = resp; - - if (readTimestamp) { - this.readTimestampProto = readTimestamp; - this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); - } - + this._update(resp); callback!(null, resp); } ); @@ -573,6 +563,8 @@ export class Snapshot extends EventEmitter { if (this.id) { transaction.id = this.id as Uint8Array; + } else if (typeof this._options.readWrite !== 'undefined') { + transaction.begin = this._options; } else { transaction.singleUse = this._options; } @@ -909,6 +901,9 @@ export class Snapshot extends EventEmitter { .on('response', response => { if (response.metadata) { metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } }) .on('data', row => rows.push(row)) @@ -1034,6 +1029,8 @@ export class Snapshot extends EventEmitter { const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; if (this.id) { transaction.id = this.id as Uint8Array; + } else if (typeof this._options.readWrite !== 'undefined') { + transaction.begin = this._options; } else { transaction.singleUse = this._options; } @@ -1226,6 +1223,25 @@ export class Snapshot extends EventEmitter { return {params, paramTypes}; } + + /** + * Update transaction properties from the response. + * + * @private + * + * @param {spannerClient.spanner.v1.ITransaction} resp Response object. + */ + private _update(resp: spannerClient.spanner.v1.ITransaction): void { + const {id, readTimestamp} = resp; + + this.id = id!; + this.metadata = resp; + + if (readTimestamp) { + this.readTimestampProto = readTimestamp; + this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); + } + } } /*! Developer Documentation diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index d11325703..ad627da93 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -16,7 +16,7 @@ import * as path from 'path'; import {google} from '../../protos/protos'; -import {grpc} from 'google-gax'; +import {grpc, ServiceError} from 'google-gax'; import * as protoLoader from '@grpc/proto-loader'; // eslint-disable-next-line node/no-extraneous-import import {Metadata} from '@grpc/grpc-js'; @@ -317,7 +317,7 @@ export class MockSpanner { abortTransaction(transaction: Transaction): void { const formattedId = `${transaction.session.formattedName_}/transactions/${transaction.id}`; - if (this.transactions.has(formattedId)) { + if (this.transactions.has(formattedId) || !transaction.id) { this.transactions.delete(formattedId); this.transactionOptions.delete(formattedId); this.abortedTransactions.add(formattedId); @@ -534,7 +534,7 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.executeStreamingSql.name) .then(() => { - if (call.request!.transaction && call.request!.transaction.id) { + if (call.request!.transaction) { const fullTransactionId = `${call.request!.session}/transactions/${ call.request!.transaction.id }`; @@ -549,6 +549,20 @@ export class MockSpanner { } const res = this.statementResults.get(call.request!.sql); if (res) { + if (call.request!.transaction?.begin) { + const txn = this._updateTransaction( + call.request!.session, + call.request!.transaction.begin + ); + if (txn instanceof Error) { + call.emit('error', txn); + call.end(); + return; + } + if (res.type === StatementResultType.RESULT_SET) { + (res.resultSet as protobuf.ResultSet).metadata!.transaction = txn; + } + } let partialResultSets; let resumeIndex; let streamErr; @@ -817,32 +831,14 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.beginTransaction.name) .then(() => { - const session = this.sessions.get(call.request!.session); - if (session) { - let counter = this.transactionCounters.get(session.name); - if (!counter) { - counter = 0; - } - const id = ++counter; - this.transactionCounters.set(session.name, counter); - const transactionId = id.toString().padStart(12, '0'); - const fullTransactionId = - session.name + '/transactions/' + transactionId; - const readTimestamp = - call.request!.options && call.request!.options.readOnly - ? now() - : undefined; - const transaction = protobuf.Transaction.create({ - id: Buffer.from(transactionId), - readTimestamp, - }); - this.transactions.set(fullTransactionId, transaction); - this.transactionOptions.set(fullTransactionId, call.request!.options); - callback(null, transaction); + const res = this._updateTransaction( + call.request!.session, + call.request!.options + ); + if (res instanceof Error) { + callback(res); } else { - callback( - MockSpanner.createSessionNotFoundError(call.request!.session) - ); + callback(null, res); } }) .catch(err => { @@ -857,16 +853,14 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.commit.name) .then(() => { - if (call.request!.transactionId) { - const fullTransactionId = `${call.request!.session}/transactions/${ - call.request!.transactionId - }`; - if (this.abortedTransactions.has(fullTransactionId)) { - callback( - MockSpanner.createTransactionAbortedError(`${fullTransactionId}`) - ); - return; - } + const fullTransactionId = `${call.request!.session}/transactions/${ + call.request!.transactionId + }`; + if (this.abortedTransactions.has(fullTransactionId)) { + callback( + MockSpanner.createTransactionAbortedError(`${fullTransactionId}`) + ); + return; } const session = this.sessions.get(call.request!.session); if (session) { @@ -947,6 +941,32 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); callback(createUnimplementedError('PartitionQuery is not yet implemented')); } + + private _updateTransaction( + sessionName: string, + options: google.spanner.v1.ITransactionOptions | null | undefined + ): google.spanner.v1.Transaction | ServiceError { + const session = this.sessions.get(sessionName); + if (!session) { + return MockSpanner.createSessionNotFoundError(sessionName); + } + let counter = this.transactionCounters.get(session.name); + if (!counter) { + counter = 0; + } + const id = ++counter; + this.transactionCounters.set(session.name, counter); + const transactionId = id.toString().padStart(12, '0'); + const fullTransactionId = session.name + '/transactions/' + transactionId; + const readTimestamp = options && options.readOnly ? now() : undefined; + const transaction = protobuf.Transaction.create({ + id: Buffer.from(transactionId), + readTimestamp, + }); + this.transactions.set(fullTransactionId, transaction); + this.transactionOptions.set(fullTransactionId, options); + return transaction; + } } /** diff --git a/test/session-pool.ts b/test/session-pool.ts index 1ef996a2d..12be4ef15 100644 --- a/test/session-pool.ts +++ b/test/session-pool.ts @@ -1550,21 +1550,6 @@ describe('SessionPool', () => { }); }); - describe('_prepareTransaction', () => { - it('should prepare a transaction', async () => { - const fakeSession = createSession(); - const fakeTransaction = new FakeTransaction(); - const beginStub = sandbox.stub(fakeTransaction, 'begin').resolves(); - - fakeSession.transaction = sandbox.stub().returns(fakeTransaction); - - await sessionPool._prepareTransaction(fakeSession); - - assert.strictEqual(fakeSession.txn, fakeTransaction); - assert.strictEqual(beginStub.callCount, 1); - }); - }); - describe('_release', () => { it('should release the session', () => { const fakeSession = createSession('id', {type: types.ReadOnly}); diff --git a/test/spanner.ts b/test/spanner.ts index 6e412d4c2..72bf12c07 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -1898,23 +1898,6 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors on BeginTransaction during Database.runTransaction()', done => { - // Create a session pool with 1 read-only session. - const db = newTestDatabase({min: 1, incStep: 1, writes: 0.0}); - const pool = db.pool_ as SessionPool; - // Wait until one read-only session has been created. - pool.once('available', () => { - spannerMock.setExecutionTime( - spannerMock.beginTransaction, - SimulatedExecutionTime.ofError({ - code: grpc.status.NOT_FOUND, - message: 'Session not found', - } as MockError) - ); - runTransactionWithExpectedSessionRetry(db, done); - }); - }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransaction()', done => { const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); const pool = db.pool_ as SessionPool; @@ -2071,23 +2054,6 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors on BeginTransaction during Database.runTransactionAsync()', done => { - // Create a session pool with 1 read-only session. - const db = newTestDatabase({min: 1, incStep: 1, writes: 0.0}); - const pool = db.pool_ as SessionPool; - // Wait until one read-only session has been created. - pool.once('available', async () => { - spannerMock.setExecutionTime( - spannerMock.beginTransaction, - SimulatedExecutionTime.ofError({ - code: grpc.status.NOT_FOUND, - message: 'Session not found', - } as MockError) - ); - runAsyncTransactionWithExpectedSessionRetry(db).then(done).catch(done); - }); - }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransactionAsync()', done => { const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); const pool = db.pool_ as SessionPool; @@ -3086,6 +3052,58 @@ describe('Spanner with mock server', () => { ); }); + it('should use inline begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + await tx!.run(insertSql); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, insertSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction on retry', async () => { + const database = newTestDatabase(); + let attempts = 0; + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + if (!attempts) { + spannerMock.abortTransaction(tx); + } + attempts++; + await tx!.run(insertSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + it('should use transactionTag on blind commit', async () => { const database = newTestDatabase({min: 0}); const [session] = await database.createSession({}); diff --git a/test/transaction.ts b/test/transaction.ts index 387627e6f..739b68d0c 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -1949,7 +1949,7 @@ describe('Transaction', () => { }); }); - it('should not set transaction tag if `singleUse`', () => { + it('should set transaction tag if `begin`', () => { const TABLE = 'my-table-123'; const transactionTag = 'bar'; transaction.requestOptions = {transactionTag}; @@ -1958,7 +1958,9 @@ describe('Transaction', () => { const {reqOpts} = REQUEST_STREAM.lastCall.args[0]; - assert.deepStrictEqual(reqOpts.requestOptions, {}); + assert.deepStrictEqual(reqOpts.requestOptions, { + transactionTag, + }); }); }); }); From 8d5c6d6281f5e53c09a0e769c77e81d4c379ff98 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Fri, 12 Aug 2022 22:49:13 +0000 Subject: [PATCH 02/16] fix: prevent multiple begin transactions happen during a race condition happens; make sure stream requests uses a transaction id from the first response even if the stream fails halfway with an UNAVAILABLE error --- src/transaction.ts | 44 ++++++++++++++++++++++++++++--- test/spanner.ts | 66 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 104 insertions(+), 6 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index c4ac63c4e..0faba34af 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -216,6 +216,8 @@ export type CommitCallback = export class Snapshot extends EventEmitter { protected _options!: spannerClient.spanner.v1.ITransactionOptions; protected _seqno = 1; + protected _idWaiter: Readable; + protected _inlineBeginStarted; id?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -289,6 +291,11 @@ export class Snapshot extends EventEmitter { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; + this._idWaiter = new Readable({ + read() { + }, + }); + this._inlineBeginStarted = false; } /** @@ -604,10 +611,14 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(makeRequest, { + return partialResultStream(this._wrapWithIdWaiter(makeRequest), { json, jsonOptions, maxResumeRetries, + }).on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } }); } @@ -1056,7 +1067,7 @@ export class Snapshot extends EventEmitter { }; const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts) { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { try { sanitizeRequest(); } catch (e) { @@ -1075,10 +1086,14 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(makeRequest, { + return partialResultStream(this._wrapWithIdWaiter(makeRequest), { json, jsonOptions, maxResumeRetries, + }).on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } }); } @@ -1241,6 +1256,29 @@ export class Snapshot extends EventEmitter { this.readTimestampProto = readTimestamp; this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); } + this._idWaiter.emit('notify'); + } + + /** + * Wrap `makeRequest` function with the lock to make sure the inline begin + * transaction can happen only once. + * + * @param makeRequest + * @private + */ + private _wrapWithIdWaiter( + makeRequest: (resumeToken?: ResumeToken) => Readable): + (resumeToken?: ResumeToken) => Readable { + if (this.id || typeof this._options.readWrite === 'undefined') { + return makeRequest; + } + if (!this._inlineBeginStarted) { + this._inlineBeginStarted = true; + return makeRequest; + } + return (resumeToken?: ResumeToken): Readable => + this._idWaiter.once('notify', () => + this._idWaiter.wrap(makeRequest(resumeToken))); } } diff --git a/test/spanner.ts b/test/spanner.ts index 72bf12c07..dee678efc 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -17,7 +17,7 @@ import {after, before, beforeEach, describe, Done, it} from 'mocha'; import * as assert from 'assert'; import {grpc, Status, ServiceError} from 'google-gax'; -import {Database, Instance, SessionPool, Snapshot, Spanner} from '../src'; +import {Database, Instance, SessionPool, Snapshot, Spanner, Transaction} from '../src'; import * as mock from './mockserver/mockspanner'; import { MockError, @@ -1214,6 +1214,33 @@ describe('Spanner with mock server', () => { await database.close(); }); + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock.getRequests().filter(val => + (val as v1.ExecuteSqlRequest).sql + ).map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 2); + assert.ok(requests[0].transaction?.begin!.readWrite, 'inline txn is not set.'); + assert.ok(requests[1].transaction!.id, 'Transaction ID is not used for retries.'); + assert.ok(requests[1].resumeToken, "Resume token is not set for the retried"); + }); + it('should not retry non-retryable error during streaming', async () => { const database = newTestDatabase(); const err = { @@ -3084,6 +3111,39 @@ describe('Spanner with mock server', () => { assert.ok(!beginTxnRequest, 'beginTransaction was called'); }); + it('should only inline one begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + const rowCount1 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount2 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + await Promise.all([rowCount1, rowCount2]); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, selectSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + it('should use beginTransaction on retry', async () => { const database = newTestDatabase(); let attempts = 0; @@ -3914,13 +3974,13 @@ function executeSimpleUpdate( } function getRowCountFromStreamingSql( - database: Database, + context: Database | Transaction, query: ExecuteSqlRequest ): Promise { return new Promise((resolve, reject) => { let rows = 0; let errored = false; - database + context .runStream(query) .on('error', err => { errored = true; From dd9891ed4213ff805c5daa5a6d34c4dc7726a47e Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Fri, 26 Aug 2022 01:50:29 +0000 Subject: [PATCH 03/16] fix: minor --- src/transaction.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 0faba34af..b741380b5 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -570,7 +570,7 @@ export class Snapshot extends EventEmitter { if (this.id) { transaction.id = this.id as Uint8Array; - } else if (typeof this._options.readWrite !== 'undefined') { + } else if (this._options.readWrite) { transaction.begin = this._options; } else { transaction.singleUse = this._options; @@ -615,7 +615,7 @@ export class Snapshot extends EventEmitter { json, jsonOptions, maxResumeRetries, - }).on('response', response => { + })?.on('response', response => { if (response.metadata && response.metadata!.transaction && !this.id) { this._update(response.metadata!.transaction); } @@ -1040,7 +1040,7 @@ export class Snapshot extends EventEmitter { const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; if (this.id) { transaction.id = this.id as Uint8Array; - } else if (typeof this._options.readWrite !== 'undefined') { + } else if (this._options.readWrite) { transaction.begin = this._options; } else { transaction.singleUse = this._options; @@ -1269,7 +1269,7 @@ export class Snapshot extends EventEmitter { private _wrapWithIdWaiter( makeRequest: (resumeToken?: ResumeToken) => Readable): (resumeToken?: ResumeToken) => Readable { - if (this.id || typeof this._options.readWrite === 'undefined') { + if (this.id || !this._options.readWrite) { return makeRequest; } if (!this._inlineBeginStarted) { @@ -1277,8 +1277,10 @@ export class Snapshot extends EventEmitter { return makeRequest; } return (resumeToken?: ResumeToken): Readable => - this._idWaiter.once('notify', () => - this._idWaiter.wrap(makeRequest(resumeToken))); + this._idWaiter.once('notify', () => makeRequest(resumeToken) + .on('data', (chunk) => this._idWaiter.emit('data', chunk)) + .once('end', () => this._idWaiter.emit('end')) + ); } } From 6b57710c9e905c43649ae12c23fcd0e423ab7ba1 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 14 Sep 2022 09:40:00 +0000 Subject: [PATCH 04/16] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/transaction.ts | 16 ++++++++-------- test/spanner.ts | 47 +++++++++++++++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index b741380b5..f24cde8f6 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -292,8 +292,7 @@ export class Snapshot extends EventEmitter { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; this._idWaiter = new Readable({ - read() { - }, + read() {}, }); this._inlineBeginStarted = false; } @@ -1267,8 +1266,8 @@ export class Snapshot extends EventEmitter { * @private */ private _wrapWithIdWaiter( - makeRequest: (resumeToken?: ResumeToken) => Readable): - (resumeToken?: ResumeToken) => Readable { + makeRequest: (resumeToken?: ResumeToken) => Readable + ): (resumeToken?: ResumeToken) => Readable { if (this.id || !this._options.readWrite) { return makeRequest; } @@ -1277,10 +1276,11 @@ export class Snapshot extends EventEmitter { return makeRequest; } return (resumeToken?: ResumeToken): Readable => - this._idWaiter.once('notify', () => makeRequest(resumeToken) - .on('data', (chunk) => this._idWaiter.emit('data', chunk)) - .once('end', () => this._idWaiter.emit('end')) - ); + this._idWaiter.once('notify', () => + makeRequest(resumeToken) + .on('data', chunk => this._idWaiter.emit('data', chunk)) + .once('end', () => this._idWaiter.emit('end')) + ); } } diff --git a/test/spanner.ts b/test/spanner.ts index dee678efc..7a1974932 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -17,7 +17,14 @@ import {after, before, beforeEach, describe, Done, it} from 'mocha'; import * as assert from 'assert'; import {grpc, Status, ServiceError} from 'google-gax'; -import {Database, Instance, SessionPool, Snapshot, Spanner, Transaction} from '../src'; +import { + Database, + Instance, + SessionPool, + Snapshot, + Spanner, + Transaction, +} from '../src'; import * as mock from './mockserver/mockspanner'; import { MockError, @@ -1221,8 +1228,8 @@ describe('Spanner with mock server', () => { streamIndex: index, } as MockError; spannerMock.setExecutionTime( - spannerMock.executeStreamingSql, - SimulatedExecutionTime.ofError(err) + spannerMock.executeStreamingSql, + SimulatedExecutionTime.ofError(err) ); const database = newTestDatabase(); @@ -1232,13 +1239,23 @@ describe('Spanner with mock server', () => { }); await database.close(); - const requests = spannerMock.getRequests().filter(val => - (val as v1.ExecuteSqlRequest).sql - ).map(req => req as v1.ExecuteSqlRequest); + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); assert.strictEqual(requests.length, 2); - assert.ok(requests[0].transaction?.begin!.readWrite, 'inline txn is not set.'); - assert.ok(requests[1].transaction!.id, 'Transaction ID is not used for retries.'); - assert.ok(requests[1].resumeToken, "Resume token is not set for the retried"); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); }); it('should not retry non-retryable error during streaming', async () => { @@ -3129,12 +3146,12 @@ describe('Spanner with mock server', () => { assert.strictEqual(request.sql, selectSql); request = spannerMock - .getRequests() - .slice() - .reverse() - .find(val => { - return (val as v1.ExecuteSqlRequest).sql; - }) as v1.ExecuteSqlRequest; + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); assert.strictEqual(request.sql, selectSql); assert.ok(request.transaction!.id, 'TransactionID is not set.'); From 154872b5134c7981d29997298aa5f40926391536 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Wed, 14 Sep 2022 23:11:06 +0000 Subject: [PATCH 05/16] fix: call explicit begin transaction only after the first attempt. --- src/transaction-runner.ts | 4 +++- test/transaction-runner.ts | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 35b0a9c2d..2eb9e146b 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -194,7 +194,9 @@ export abstract class Runner { const transaction = this.session.transaction( (this.session.parent as Database).queryOptions_ ); - await transaction.begin(); + if (this.attempts > 0) { + await transaction.begin(); + } return transaction; } /** diff --git a/test/transaction-runner.ts b/test/transaction-runner.ts index 4a28d9ea3..c2743721d 100644 --- a/test/transaction-runner.ts +++ b/test/transaction-runner.ts @@ -213,6 +213,9 @@ describe('TransactionRunner', () => { const transaction = await runner.getTransaction(); assert.strictEqual(transaction, expectedTransaction); + assert.strictEqual(beginStub.callCount, 0); + runner.attempts++; + await runner.getTransaction(); assert.strictEqual(beginStub.callCount, 1); }); }); From 2caaa0fd233314386627e946499a9822391eef84 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Tue, 20 Sep 2022 00:56:48 +0000 Subject: [PATCH 06/16] feat: add inline begin transaction for batch DML requests --- src/transaction.ts | 15 +++++++++++++-- test/mockserver/mockspanner.ts | 18 ++++++++++++++++-- test/spanner.ts | 8 ++++++++ 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index f24cde8f6..0aaf12284 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1245,7 +1245,7 @@ export class Snapshot extends EventEmitter { * * @param {spannerClient.spanner.v1.ITransaction} resp Response object. */ - private _update(resp: spannerClient.spanner.v1.ITransaction): void { + protected _update(resp: spannerClient.spanner.v1.ITransaction): void { const {id, readTimestamp} = resp; this.id = id!; @@ -1584,6 +1584,12 @@ export class Transaction extends Dml { return {sql, params, paramTypes}; }); + const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; + if (this.id) { + transaction.id = this.id as Uint8Array; + } else { + transaction.begin = this._options; + } const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = { session: this.session.formattedName_!, requestOptions: this.configureTagOptions( @@ -1591,7 +1597,7 @@ export class Transaction extends Dml { this.requestOptions?.transactionTag ?? undefined, (options as BatchUpdateOptions).requestOptions ), - transaction: {id: this.id!}, + transaction, seqno: this._seqno++, statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; @@ -1618,6 +1624,11 @@ export class Transaction extends Dml { } const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata && resultSet.metadata.transaction) { + this._update(resultSet.metadata.transaction); + } + } const rowCounts: number[] = resultSets.map(({stats}) => { return ( (stats && diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index ad627da93..9aa10601e 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -720,7 +720,7 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.executeBatchDml.name) .then(() => { - if (call.request!.transaction && call.request!.transaction.id) { + if (call.request!.transaction) { const fullTransactionId = `${call.request!.session}/transactions/${ call.request!.transaction.id }`; @@ -764,7 +764,21 @@ export class MockSpanner { callback(new Error('Wrong result type for batch DML')); break; case StatementResultType.UPDATE_COUNT: - results.push(MockSpanner.toResultSet(res.updateCount)); + let resultSet = MockSpanner.toResultSet(res.updateCount); + if (call.request!.transaction!.begin && i === 0) { + const transaction = this._updateTransaction( + call.request!.session, + call.request?.transaction!.begin + ); + if (transaction instanceof Error) { + callback(transaction); + break; + } + resultSet.metadata = protobuf.ResultSetMetadata.create({ + transaction + }); + } + results.push(resultSet); break; case StatementResultType.ERROR: if ((res.error as grpc.ServiceError).code) { diff --git a/test/spanner.ts b/test/spanner.ts index 7a1974932..95a51fa35 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -289,6 +289,7 @@ describe('Spanner with mock server', () => { requestTag: 'request-tag', }, }); + await tx!.batchUpdate([insertSql, insertSql]); return await tx.commit(); } ); @@ -307,6 +308,13 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); + assert.ok(request.transaction?.begin, 'transaction is not empty'); + const nextBatchRequest = spannerMock.getRequests().reverse().find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(nextBatchRequest, 'no ExecuteBatchDmlRequest found'); + assert.ok(nextBatchRequest.transaction?.id, 'no transaction ID'); + const commitRequest = spannerMock.getRequests().find(val => { return (val as v1.CommitRequest).mutations; }) as v1.CommitRequest; From f65e2d1dcd6ed62b4d4a359c3f3f94f1bb132650 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Tue, 20 Sep 2022 17:34:19 +0000 Subject: [PATCH 07/16] fix: lint --- src/transaction.ts | 2 +- test/mockserver/mockspanner.ts | 11 ++++++----- test/spanner.ts | 9 ++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 0aaf12284..1536a0a6c 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1625,7 +1625,7 @@ export class Transaction extends Dml { const {resultSets, status} = resp; for (const resultSet of resultSets) { - if (!this.id && resultSet.metadata && resultSet.metadata.transaction) { + if (!this.id && resultSet.metadata?.transaction) { this._update(resultSet.metadata.transaction); } } diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index 9aa10601e..8b5e3c36f 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -763,23 +763,24 @@ export class MockSpanner { case StatementResultType.RESULT_SET: callback(new Error('Wrong result type for batch DML')); break; - case StatementResultType.UPDATE_COUNT: - let resultSet = MockSpanner.toResultSet(res.updateCount); + case StatementResultType.UPDATE_COUNT: { + const resultSet = MockSpanner.toResultSet(res.updateCount); if (call.request!.transaction!.begin && i === 0) { const transaction = this._updateTransaction( - call.request!.session, - call.request?.transaction!.begin + call.request!.session, + call.request?.transaction!.begin ); if (transaction instanceof Error) { callback(transaction); break; } resultSet.metadata = protobuf.ResultSetMetadata.create({ - transaction + transaction, }); } results.push(resultSet); break; + } case StatementResultType.ERROR: if ((res.error as grpc.ServiceError).code) { const serviceError = res.error as grpc.ServiceError; diff --git a/test/spanner.ts b/test/spanner.ts index 95a51fa35..924fc7143 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -309,9 +309,12 @@ describe('Spanner with mock server', () => { 'transaction-tag' ); assert.ok(request.transaction?.begin, 'transaction is not empty'); - const nextBatchRequest = spannerMock.getRequests().reverse().find(val => { - return (val as v1.ExecuteBatchDmlRequest).statements; - }) as v1.ExecuteBatchDmlRequest; + const nextBatchRequest = spannerMock + .getRequests() + .reverse() + .find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; assert.ok(nextBatchRequest, 'no ExecuteBatchDmlRequest found'); assert.ok(nextBatchRequest.transaction?.id, 'no transaction ID'); From 6d83a946e0fe5fe1aaada99fd2b106e3b4e859ec Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Fri, 23 Sep 2022 04:58:15 +0000 Subject: [PATCH 08/16] fix: explicit begin transaction for blind commit if a transaction runs over the runner. --- src/transaction-runner.ts | 1 + src/transaction.ts | 13 ++++++++++++- test/spanner.ts | 17 +++++++++++++++++ test/transaction-runner.ts | 1 + 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 2eb9e146b..b6d5ece44 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -120,6 +120,7 @@ export abstract class Runner { this.attempts = 0; this.session = session; this.transaction = transaction; + this.transaction.disableSingleUse(); const defaults = {timeout: 3600000}; diff --git a/src/transaction.ts b/src/transaction.ts index 1536a0a6c..65d4a3669 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1413,6 +1413,7 @@ export class Transaction extends Dml { commitTimestamp?: PreciseDate; commitTimestampProto?: spannerClient.protobuf.ITimestamp; private _queuedMutations: spannerClient.spanner.v1.Mutation[]; + private _allowSingleUse = true; /** * Timestamp at which the transaction was committed. Will be populated once @@ -1753,8 +1754,11 @@ export class Transaction extends Dml { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; - } else { + } else if (this._allowSingleUse) { reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => this.commit(options, callback)); + return; } if ( @@ -2251,6 +2255,13 @@ export class Transaction extends Dml { const unique = new Set(allKeys); return Array.from(unique).sort(); } + + /** + * Make sure that single use is not used for a blind commit. + */ + disableSingleUse() : void { + this._allowSingleUse = false; + } } /*! Developer Documentation diff --git a/test/spanner.ts b/test/spanner.ts index 924fc7143..7ffd40b9c 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -2730,6 +2730,7 @@ describe('Spanner with mock server', () => { // Execute an update. const [count] = await database.runTransactionAsync( (transaction): Promise<[number]> => { + transaction.begin(); return transaction.runUpdate(insertSql).then(updateCount => { transaction.commit(); return updateCount; @@ -2937,6 +2938,7 @@ describe('Spanner with mock server', () => { const database = newTestDatabase(); const [updated] = await database.runTransactionAsync( (transaction): Promise => { + transaction.begin(); return transaction.runUpdate(insertSql).then(updateCount => { if (!attempts) { spannerMock.abortTransaction(transaction); @@ -2958,6 +2960,7 @@ describe('Spanner with mock server', () => { await database.runTransactionAsync( {timeout: 1}, (transaction): Promise => { + transaction.begin(); attempts++; return transaction.runUpdate(insertSql).then(updateCount => { // Always abort the transaction. @@ -3216,6 +3219,20 @@ describe('Spanner with mock server', () => { 'transaction-tag' ); }); + + it('should run begin transaction on blind commit', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + tx.insert('foo', {id: 1, name: 'One'}); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); }); describe('table', () => { diff --git a/test/transaction-runner.ts b/test/transaction-runner.ts index c2743721d..f51498847 100644 --- a/test/transaction-runner.ts +++ b/test/transaction-runner.ts @@ -29,6 +29,7 @@ class FakeTransaction extends EventEmitter { async begin(): Promise {} request() {} requestStream() {} + disableSingleUse() {} } describe('TransactionRunner', () => { From eeef8cfad3ba0e3e1f566577a45c4da708b96cf8 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Wed, 12 Oct 2022 06:29:15 +0000 Subject: [PATCH 09/16] feat: adding more unit tests --- src/transaction.ts | 2 +- test/database.ts | 10 ------ test/mockserver/mockspanner.ts | 5 +-- test/spanner.ts | 61 ++++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 65d4a3669..bcc16e397 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -2259,7 +2259,7 @@ export class Transaction extends Dml { /** * Make sure that single use is not used for a blind commit. */ - disableSingleUse() : void { + disableSingleUse(): void { this._allowSingleUse = false; } } diff --git a/test/database.ts b/test/database.ts index 31bbcbf5d..d7bf9cc55 100644 --- a/test/database.ts +++ b/test/database.ts @@ -91,16 +91,6 @@ class FakeSession { } } -interface ReadSessionCallback { - (err: Error, session?: null): void; - (err: null, session: FakeSession): void; -} - -interface WriteSessionCallback { - (err: Error, session?: null, transaction?: null): void; - (err: null, session: FakeSession, transaction: FakeTransaction): void; -} - class FakeSessionPool extends EventEmitter { calledWith_: IArguments; constructor() { diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index 8b5e3c36f..93895f34b 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -136,11 +136,12 @@ export class StatementResult { /** * Create a StatementResult that will return an update count. * @param updateCount The row count to return. + * @param error The status error to return. */ - static updateCount(updateCount: number): StatementResult { + static updateCount(updateCount: number, error?: Error): StatementResult { return new StatementResult( StatementResultType.UPDATE_COUNT, - null, + error || null, null, updateCount ); diff --git a/test/spanner.ts b/test/spanner.ts index 7ffd40b9c..5656907e2 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -328,6 +328,38 @@ describe('Spanner with mock server', () => { ); }); + it('should use txn ID from batchUpdate if non-ok status', async () => { + const sql = "INSERT INTO TBL (NUM, NAME) VALUES (14, 'Four')"; + const database = newTestDatabase(); + const err = { + message: 'Not OK', + } as MockError; + spannerMock.putStatementResult( + sql, + mock.StatementResult.updateCount(1, err) + ); + + await database.runTransactionAsync(async tx => { + await tx!.batchUpdate([sql, insertSql]); + await tx!.batchUpdate([sql, insertSql]); + return await tx.commit(); + }); + await database.close(); + const request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(request, 'no ExecuteBatchDmlRequest found'); + assert.ok(request.transaction?.begin, 'transaction is not empty'); + const nextBatchRequest = spannerMock + .getRequests() + .reverse() + .find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(nextBatchRequest, 'no ExecuteBatchDmlRequest found'); + assert.ok(nextBatchRequest.transaction?.id, 'no transaction ID'); + }); + it('should execute update with requestOptions', async () => { const database = newTestDatabase(); await database.runTransactionAsync( @@ -354,6 +386,7 @@ describe('Spanner with mock server', () => { ); assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); assert.strictEqual(request.requestOptions!.requestTag, 'request-tag'); + assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); assert.strictEqual( request.requestOptions!.transactionTag, 'transaction-tag' @@ -406,6 +439,7 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); + assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); }); it('should return an array of json objects', async () => { @@ -3195,6 +3229,33 @@ describe('Spanner with mock server', () => { assert.ok(beginTxnRequest, 'beginTransaction was called'); }); + it('should fail if beginTransaction fails', async () => { + const database = newTestDatabase(); + const err = { + message: 'Test error', + } as MockError; + spannerMock.setExecutionTime( + spannerMock.beginTransaction, + SimulatedExecutionTime.ofError(err) + ); + try { + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + spannerMock.abortTransaction(tx); + await tx!.run(insertSql); + await tx.commit(); + }); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + '2 UNKNOWN: Test error' + ); + } finally { + await database.close(); + } + }); + it('should use transactionTag on blind commit', async () => { const database = newTestDatabase({min: 0}); const [session] = await database.createSession({}); From 4fff8428db798b23c110c0443a1653c17913ff04 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Mon, 17 Oct 2022 12:44:23 -0700 Subject: [PATCH 10/16] fix: explicit begin transaction for unknown error --- src/transaction-runner.ts | 2 +- src/transaction.ts | 38 ++++++++++++++++++------------- test/spanner.ts | 46 ++++++++++++++++++++++++++++++++++++++ test/transaction-runner.ts | 2 +- 4 files changed, 71 insertions(+), 17 deletions(-) diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index b6d5ece44..91b43787c 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -120,7 +120,7 @@ export abstract class Runner { this.attempts = 0; this.session = session; this.transaction = transaction; - this.transaction.disableSingleUse(); + this.transaction.useInRunner(); const defaults = {timeout: 3600000}; diff --git a/src/transaction.ts b/src/transaction.ts index bcc16e397..9994d2a64 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -218,6 +218,7 @@ export class Snapshot extends EventEmitter { protected _seqno = 1; protected _idWaiter: Readable; protected _inlineBeginStarted; + protected _useInRunner = false; id?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -614,11 +615,15 @@ export class Snapshot extends EventEmitter { json, jsonOptions, maxResumeRetries, - })?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }); + }) + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) this.begin(); + }); } /** @@ -1089,11 +1094,15 @@ export class Snapshot extends EventEmitter { json, jsonOptions, maxResumeRetries, - }).on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }); + }) + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) this.begin(); + }); } /** @@ -1413,7 +1422,6 @@ export class Transaction extends Dml { commitTimestamp?: PreciseDate; commitTimestampProto?: spannerClient.protobuf.ITimestamp; private _queuedMutations: spannerClient.spanner.v1.Mutation[]; - private _allowSingleUse = true; /** * Timestamp at which the transaction was committed. Will be populated once @@ -1754,7 +1762,7 @@ export class Transaction extends Dml { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; - } else if (this._allowSingleUse) { + } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; } else { this.begin().then(() => this.commit(options, callback)); @@ -2257,10 +2265,10 @@ export class Transaction extends Dml { } /** - * Make sure that single use is not used for a blind commit. + * Mark transaction as started from the runner. */ - disableSingleUse(): void { - this._allowSingleUse = false; + useInRunner(): void { + this._useInRunner = true; } } diff --git a/test/spanner.ts b/test/spanner.ts index 5656907e2..a249c302a 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -3229,6 +3229,52 @@ describe('Spanner with mock server', () => { assert.ok(beginTxnRequest, 'beginTransaction was called'); }); + it('should use beginTransaction on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await tx.runUpdate(invalidSql); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction for streaming on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await getRowCountFromStreamingSql(tx!, {sql: invalidSql}); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + it('should fail if beginTransaction fails', async () => { const database = newTestDatabase(); const err = { diff --git a/test/transaction-runner.ts b/test/transaction-runner.ts index f51498847..586501bd9 100644 --- a/test/transaction-runner.ts +++ b/test/transaction-runner.ts @@ -29,7 +29,7 @@ class FakeTransaction extends EventEmitter { async begin(): Promise {} request() {} requestStream() {} - disableSingleUse() {} + useInRunner() {} } describe('TransactionRunner', () => { From 53f1e1ca83e7946699396f0f671c21313e6fc4cd Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Mon, 17 Oct 2022 12:44:23 -0700 Subject: [PATCH 11/16] fix: format --- src/transaction-runner.ts | 2 +- src/transaction.ts | 42 +++++++++++++++++++++------------- test/spanner.ts | 46 ++++++++++++++++++++++++++++++++++++++ test/transaction-runner.ts | 2 +- 4 files changed, 75 insertions(+), 17 deletions(-) diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index b6d5ece44..91b43787c 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -120,7 +120,7 @@ export abstract class Runner { this.attempts = 0; this.session = session; this.transaction = transaction; - this.transaction.disableSingleUse(); + this.transaction.useInRunner(); const defaults = {timeout: 3600000}; diff --git a/src/transaction.ts b/src/transaction.ts index bcc16e397..b2bcf9c7a 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -218,6 +218,7 @@ export class Snapshot extends EventEmitter { protected _seqno = 1; protected _idWaiter: Readable; protected _inlineBeginStarted; + protected _useInRunner = false; id?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -614,11 +615,17 @@ export class Snapshot extends EventEmitter { json, jsonOptions, maxResumeRetries, - })?.on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }); + }) + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) { + this.begin(); + } + }); } /** @@ -1089,11 +1096,17 @@ export class Snapshot extends EventEmitter { json, jsonOptions, maxResumeRetries, - }).on('response', response => { - if (response.metadata && response.metadata!.transaction && !this.id) { - this._update(response.metadata!.transaction); - } - }); + }) + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) { + this.begin(); + } + }); } /** @@ -1413,7 +1426,6 @@ export class Transaction extends Dml { commitTimestamp?: PreciseDate; commitTimestampProto?: spannerClient.protobuf.ITimestamp; private _queuedMutations: spannerClient.spanner.v1.Mutation[]; - private _allowSingleUse = true; /** * Timestamp at which the transaction was committed. Will be populated once @@ -1754,7 +1766,7 @@ export class Transaction extends Dml { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; - } else if (this._allowSingleUse) { + } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; } else { this.begin().then(() => this.commit(options, callback)); @@ -2257,10 +2269,10 @@ export class Transaction extends Dml { } /** - * Make sure that single use is not used for a blind commit. + * Mark transaction as started from the runner. */ - disableSingleUse(): void { - this._allowSingleUse = false; + useInRunner(): void { + this._useInRunner = true; } } diff --git a/test/spanner.ts b/test/spanner.ts index 5656907e2..a249c302a 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -3229,6 +3229,52 @@ describe('Spanner with mock server', () => { assert.ok(beginTxnRequest, 'beginTransaction was called'); }); + it('should use beginTransaction on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await tx.runUpdate(invalidSql); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction for streaming on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await getRowCountFromStreamingSql(tx!, {sql: invalidSql}); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + it('should fail if beginTransaction fails', async () => { const database = newTestDatabase(); const err = { diff --git a/test/transaction-runner.ts b/test/transaction-runner.ts index f51498847..586501bd9 100644 --- a/test/transaction-runner.ts +++ b/test/transaction-runner.ts @@ -29,7 +29,7 @@ class FakeTransaction extends EventEmitter { async begin(): Promise {} request() {} requestStream() {} - disableSingleUse() {} + useInRunner() {} } describe('TransactionRunner', () => { From e5585ffe79e483b5281979e15c9d3c91890da08d Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Mon, 31 Oct 2022 10:35:02 -0700 Subject: [PATCH 12/16] fix: tests after merge --- test/spanner.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/spanner.ts b/test/spanner.ts index a249c302a..f7a69de7e 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -386,7 +386,7 @@ describe('Spanner with mock server', () => { ); assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); assert.strictEqual(request.requestOptions!.requestTag, 'request-tag'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); assert.strictEqual( request.requestOptions!.transactionTag, 'transaction-tag' @@ -439,7 +439,7 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); }); it('should return an array of json objects', async () => { @@ -3157,7 +3157,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); assert.strictEqual(request.sql, selectSql); request = spannerMock @@ -3190,7 +3190,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); assert.strictEqual(request.sql, selectSql); request = spannerMock From 241394d029bafffd9b9b8e5d1c93a225ce74a7e2 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 2 Nov 2022 10:23:30 +0530 Subject: [PATCH 13/16] Lint fix --- test/spanner.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/spanner.ts b/test/spanner.ts index f7a69de7e..76cba2213 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -386,7 +386,7 @@ describe('Spanner with mock server', () => { ); assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); assert.strictEqual(request.requestOptions!.requestTag, 'request-tag'); - assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual( request.requestOptions!.transactionTag, 'transaction-tag' @@ -439,7 +439,7 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); - assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); }); it('should return an array of json objects', async () => { @@ -3157,7 +3157,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual(request.sql, selectSql); request = spannerMock @@ -3190,7 +3190,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.ok(request.transaction!.begin!.readWrite, "ReadWrite is not set"); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual(request.sql, selectSql); request = spannerMock From fa3c0248960dfdfb5c23bb16f79c63203dff3790 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Mon, 31 Oct 2022 10:35:02 -0700 Subject: [PATCH 14/16] fix: lint --- test/spanner.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/spanner.ts b/test/spanner.ts index a249c302a..76cba2213 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -386,7 +386,7 @@ describe('Spanner with mock server', () => { ); assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); assert.strictEqual(request.requestOptions!.requestTag, 'request-tag'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual( request.requestOptions!.transactionTag, 'transaction-tag' @@ -439,7 +439,7 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); }); it('should return an array of json objects', async () => { @@ -3157,7 +3157,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual(request.sql, selectSql); request = spannerMock @@ -3190,7 +3190,7 @@ describe('Spanner with mock server', () => { return (val as v1.ExecuteSqlRequest).sql; }) as v1.ExecuteSqlRequest; assert.ok(request, 'no ExecuteSqlRequest found'); - assert.deepStrictEqual(request.transaction!.begin!.readWrite, {}); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual(request.sql, selectSql); request = spannerMock From 346abeacd81312dc0355c9f5c57d7070c4d33de7 Mon Sep 17 00:00:00 2001 From: Roma Slyusarchuk Date: Fri, 4 Nov 2022 17:08:25 -0700 Subject: [PATCH 15/16] fix: system test --- system-test/spanner.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 4c754408b..31ff2a85c 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -6799,10 +6799,9 @@ describe('Spanner', () => { NumberValue: 0, }; - beforeEach(() => { - return googleSqlTable.update(defaultRowValues).then(() => { - postgreSqlTable.update(defaultRowValues); - }); + beforeEach(async () => { + await googleSqlTable.update(defaultRowValues); + await postgreSqlTable.update(defaultRowValues); }); const readConcurrentTransaction = (done, database, table) => { From 311a80fce6c4471cda06bda2e9c1870f29682f9b Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Mon, 7 Nov 2022 10:40:50 +0530 Subject: [PATCH 16/16] fix: system emulator test --- system-test/spanner.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 31ff2a85c..36432442d 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -6801,7 +6801,9 @@ describe('Spanner', () => { beforeEach(async () => { await googleSqlTable.update(defaultRowValues); - await postgreSqlTable.update(defaultRowValues); + if (!IS_EMULATOR_ENABLED) { + await postgreSqlTable.update(defaultRowValues); + } }); const readConcurrentTransaction = (done, database, table) => {