diff --git a/src/session-pool.ts b/src/session-pool.ts index 47fb8df9a..742e8b083 100644 --- a/src/session-pool.ts +++ b/src/session-pool.ts @@ -1159,7 +1159,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { const transaction = session.transaction( (session.parent as Database).queryOptions_ ); - await transaction.begin(); session.txn = transaction; } diff --git a/src/transaction-runner.ts b/src/transaction-runner.ts index 35b0a9c2d..91b43787c 100644 --- a/src/transaction-runner.ts +++ b/src/transaction-runner.ts @@ -120,6 +120,7 @@ export abstract class Runner { this.attempts = 0; this.session = session; this.transaction = transaction; + this.transaction.useInRunner(); const defaults = {timeout: 3600000}; @@ -194,7 +195,9 @@ export abstract class Runner { const transaction = this.session.transaction( (this.session.parent as Database).queryOptions_ ); - await transaction.begin(); + if (this.attempts > 0) { + await transaction.begin(); + } return transaction; } /** diff --git a/src/transaction.ts b/src/transaction.ts index fde8907de..aad555b29 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -216,6 +216,9 @@ export type CommitCallback = export class Snapshot extends EventEmitter { protected _options!: spannerClient.spanner.v1.ITransactionOptions; protected _seqno = 1; + protected _idWaiter: Readable; + protected _inlineBeginStarted; + protected _useInRunner = false; id?: Uint8Array | string; ended: boolean; metadata?: spannerClient.spanner.v1.ITransaction; @@ -289,6 +292,10 @@ export class Snapshot extends EventEmitter { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; + this._idWaiter = new Readable({ + read() {}, + }); + this._inlineBeginStarted = false; } /** @@ -378,17 +385,7 @@ export class Snapshot extends EventEmitter { callback!(err, resp); return; } - - const {id, readTimestamp} = resp; - - this.id = id!; - this.metadata = resp; - - if (readTimestamp) { - this.readTimestampProto = readTimestamp; - this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); - } - + this._update(resp); callback!(null, resp); } ); @@ -573,6 +570,8 @@ export class Snapshot extends EventEmitter { if (this.id) { transaction.id = this.id as Uint8Array; + } else if (this._options.readWrite) { + transaction.begin = this._options; } else { transaction.singleUse = this._options; } @@ -603,6 +602,10 @@ export class Snapshot extends EventEmitter { ); const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (this.id && transaction.begin) { + delete transaction.begin; + transaction.id = this.id; + } return this.requestStream({ client: 'SpannerClient', method: 'streamingRead', @@ -612,11 +615,21 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(makeRequest, { + return partialResultStream(this._wrapWithIdWaiter(makeRequest), { json, jsonOptions, maxResumeRetries, - }); + }) + ?.on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) { + this.begin(); + } + }); } /** @@ -909,6 +922,9 @@ export class Snapshot extends EventEmitter { .on('response', response => { if (response.metadata) { metadata = response.metadata; + if (metadata.transaction && !this.id) { + this._update(metadata.transaction); + } } }) .on('data', row => rows.push(row)) @@ -1034,6 +1050,8 @@ export class Snapshot extends EventEmitter { const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; if (this.id) { transaction.id = this.id as Uint8Array; + } else if (this._options.readWrite) { + transaction.begin = this._options; } else { transaction.singleUse = this._options; } @@ -1059,7 +1077,7 @@ export class Snapshot extends EventEmitter { }; const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (!reqOpts) { + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { try { sanitizeRequest(); } catch (e) { @@ -1078,11 +1096,21 @@ export class Snapshot extends EventEmitter { }); }; - return partialResultStream(makeRequest, { + return partialResultStream(this._wrapWithIdWaiter(makeRequest), { json, jsonOptions, maxResumeRetries, - }); + }) + .on('response', response => { + if (response.metadata && response.metadata!.transaction && !this.id) { + this._update(response.metadata!.transaction); + } + }) + .on('error', () => { + if (!this.id && this._useInRunner) { + this.begin(); + } + }); } /** @@ -1226,6 +1254,51 @@ export class Snapshot extends EventEmitter { return {params, paramTypes}; } + + /** + * Update transaction properties from the response. + * + * @private + * + * @param {spannerClient.spanner.v1.ITransaction} resp Response object. + */ + protected _update(resp: spannerClient.spanner.v1.ITransaction): void { + const {id, readTimestamp} = resp; + + this.id = id!; + this.metadata = resp; + + if (readTimestamp) { + this.readTimestampProto = readTimestamp; + this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); + } + this._idWaiter.emit('notify'); + } + + /** + * Wrap `makeRequest` function with the lock to make sure the inline begin + * transaction can happen only once. + * + * @param makeRequest + * @private + */ + private _wrapWithIdWaiter( + makeRequest: (resumeToken?: ResumeToken) => Readable + ): (resumeToken?: ResumeToken) => Readable { + if (this.id || !this._options.readWrite) { + return makeRequest; + } + if (!this._inlineBeginStarted) { + this._inlineBeginStarted = true; + return makeRequest; + } + return (resumeToken?: ResumeToken): Readable => + this._idWaiter.once('notify', () => + makeRequest(resumeToken) + .on('data', chunk => this._idWaiter.emit('data', chunk)) + .once('end', () => this._idWaiter.emit('end')) + ); + } } /*! Developer Documentation @@ -1528,6 +1601,12 @@ export class Transaction extends Dml { return {sql, params, paramTypes}; }); + const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; + if (this.id) { + transaction.id = this.id as Uint8Array; + } else { + transaction.begin = this._options; + } const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = { session: this.session.formattedName_!, requestOptions: this.configureTagOptions( @@ -1535,7 +1614,7 @@ export class Transaction extends Dml { this.requestOptions?.transactionTag ?? undefined, (options as BatchUpdateOptions).requestOptions ), - transaction: {id: this.id!}, + transaction, seqno: this._seqno++, statements, } as spannerClient.spanner.v1.ExecuteBatchDmlRequest; @@ -1562,6 +1641,11 @@ export class Transaction extends Dml { } const {resultSets, status} = resp; + for (const resultSet of resultSets) { + if (!this.id && resultSet.metadata?.transaction) { + this._update(resultSet.metadata.transaction); + } + } const rowCounts: number[] = resultSets.map(({stats}) => { return ( (stats && @@ -1686,8 +1770,11 @@ export class Transaction extends Dml { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; - } else { + } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; + } else { + this.begin().then(() => this.commit(options, callback)); + return; } if ( @@ -2184,6 +2271,13 @@ export class Transaction extends Dml { const unique = new Set(allKeys); return Array.from(unique).sort(); } + + /** + * Mark transaction as started from the runner. + */ + useInRunner(): void { + this._useInRunner = true; + } } /*! Developer Documentation diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 4c754408b..36432442d 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -6799,10 +6799,11 @@ describe('Spanner', () => { NumberValue: 0, }; - beforeEach(() => { - return googleSqlTable.update(defaultRowValues).then(() => { - postgreSqlTable.update(defaultRowValues); - }); + beforeEach(async () => { + await googleSqlTable.update(defaultRowValues); + if (!IS_EMULATOR_ENABLED) { + await postgreSqlTable.update(defaultRowValues); + } }); const readConcurrentTransaction = (done, database, table) => { diff --git a/test/database.ts b/test/database.ts index 31bbcbf5d..d7bf9cc55 100644 --- a/test/database.ts +++ b/test/database.ts @@ -91,16 +91,6 @@ class FakeSession { } } -interface ReadSessionCallback { - (err: Error, session?: null): void; - (err: null, session: FakeSession): void; -} - -interface WriteSessionCallback { - (err: Error, session?: null, transaction?: null): void; - (err: null, session: FakeSession, transaction: FakeTransaction): void; -} - class FakeSessionPool extends EventEmitter { calledWith_: IArguments; constructor() { diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index d11325703..93895f34b 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -16,7 +16,7 @@ import * as path from 'path'; import {google} from '../../protos/protos'; -import {grpc} from 'google-gax'; +import {grpc, ServiceError} from 'google-gax'; import * as protoLoader from '@grpc/proto-loader'; // eslint-disable-next-line node/no-extraneous-import import {Metadata} from '@grpc/grpc-js'; @@ -136,11 +136,12 @@ export class StatementResult { /** * Create a StatementResult that will return an update count. * @param updateCount The row count to return. + * @param error The status error to return. */ - static updateCount(updateCount: number): StatementResult { + static updateCount(updateCount: number, error?: Error): StatementResult { return new StatementResult( StatementResultType.UPDATE_COUNT, - null, + error || null, null, updateCount ); @@ -317,7 +318,7 @@ export class MockSpanner { abortTransaction(transaction: Transaction): void { const formattedId = `${transaction.session.formattedName_}/transactions/${transaction.id}`; - if (this.transactions.has(formattedId)) { + if (this.transactions.has(formattedId) || !transaction.id) { this.transactions.delete(formattedId); this.transactionOptions.delete(formattedId); this.abortedTransactions.add(formattedId); @@ -534,7 +535,7 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.executeStreamingSql.name) .then(() => { - if (call.request!.transaction && call.request!.transaction.id) { + if (call.request!.transaction) { const fullTransactionId = `${call.request!.session}/transactions/${ call.request!.transaction.id }`; @@ -549,6 +550,20 @@ export class MockSpanner { } const res = this.statementResults.get(call.request!.sql); if (res) { + if (call.request!.transaction?.begin) { + const txn = this._updateTransaction( + call.request!.session, + call.request!.transaction.begin + ); + if (txn instanceof Error) { + call.emit('error', txn); + call.end(); + return; + } + if (res.type === StatementResultType.RESULT_SET) { + (res.resultSet as protobuf.ResultSet).metadata!.transaction = txn; + } + } let partialResultSets; let resumeIndex; let streamErr; @@ -706,7 +721,7 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.executeBatchDml.name) .then(() => { - if (call.request!.transaction && call.request!.transaction.id) { + if (call.request!.transaction) { const fullTransactionId = `${call.request!.session}/transactions/${ call.request!.transaction.id }`; @@ -749,9 +764,24 @@ export class MockSpanner { case StatementResultType.RESULT_SET: callback(new Error('Wrong result type for batch DML')); break; - case StatementResultType.UPDATE_COUNT: - results.push(MockSpanner.toResultSet(res.updateCount)); + case StatementResultType.UPDATE_COUNT: { + const resultSet = MockSpanner.toResultSet(res.updateCount); + if (call.request!.transaction!.begin && i === 0) { + const transaction = this._updateTransaction( + call.request!.session, + call.request?.transaction!.begin + ); + if (transaction instanceof Error) { + callback(transaction); + break; + } + resultSet.metadata = protobuf.ResultSetMetadata.create({ + transaction, + }); + } + results.push(resultSet); break; + } case StatementResultType.ERROR: if ((res.error as grpc.ServiceError).code) { const serviceError = res.error as grpc.ServiceError; @@ -817,32 +847,14 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.beginTransaction.name) .then(() => { - const session = this.sessions.get(call.request!.session); - if (session) { - let counter = this.transactionCounters.get(session.name); - if (!counter) { - counter = 0; - } - const id = ++counter; - this.transactionCounters.set(session.name, counter); - const transactionId = id.toString().padStart(12, '0'); - const fullTransactionId = - session.name + '/transactions/' + transactionId; - const readTimestamp = - call.request!.options && call.request!.options.readOnly - ? now() - : undefined; - const transaction = protobuf.Transaction.create({ - id: Buffer.from(transactionId), - readTimestamp, - }); - this.transactions.set(fullTransactionId, transaction); - this.transactionOptions.set(fullTransactionId, call.request!.options); - callback(null, transaction); + const res = this._updateTransaction( + call.request!.session, + call.request!.options + ); + if (res instanceof Error) { + callback(res); } else { - callback( - MockSpanner.createSessionNotFoundError(call.request!.session) - ); + callback(null, res); } }) .catch(err => { @@ -857,16 +869,14 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); this.simulateExecutionTime(this.commit.name) .then(() => { - if (call.request!.transactionId) { - const fullTransactionId = `${call.request!.session}/transactions/${ - call.request!.transactionId - }`; - if (this.abortedTransactions.has(fullTransactionId)) { - callback( - MockSpanner.createTransactionAbortedError(`${fullTransactionId}`) - ); - return; - } + const fullTransactionId = `${call.request!.session}/transactions/${ + call.request!.transactionId + }`; + if (this.abortedTransactions.has(fullTransactionId)) { + callback( + MockSpanner.createTransactionAbortedError(`${fullTransactionId}`) + ); + return; } const session = this.sessions.get(call.request!.session); if (session) { @@ -947,6 +957,32 @@ export class MockSpanner { this.pushRequest(call.request!, call.metadata); callback(createUnimplementedError('PartitionQuery is not yet implemented')); } + + private _updateTransaction( + sessionName: string, + options: google.spanner.v1.ITransactionOptions | null | undefined + ): google.spanner.v1.Transaction | ServiceError { + const session = this.sessions.get(sessionName); + if (!session) { + return MockSpanner.createSessionNotFoundError(sessionName); + } + let counter = this.transactionCounters.get(session.name); + if (!counter) { + counter = 0; + } + const id = ++counter; + this.transactionCounters.set(session.name, counter); + const transactionId = id.toString().padStart(12, '0'); + const fullTransactionId = session.name + '/transactions/' + transactionId; + const readTimestamp = options && options.readOnly ? now() : undefined; + const transaction = protobuf.Transaction.create({ + id: Buffer.from(transactionId), + readTimestamp, + }); + this.transactions.set(fullTransactionId, transaction); + this.transactionOptions.set(fullTransactionId, options); + return transaction; + } } /** diff --git a/test/session-pool.ts b/test/session-pool.ts index 1ef996a2d..12be4ef15 100644 --- a/test/session-pool.ts +++ b/test/session-pool.ts @@ -1550,21 +1550,6 @@ describe('SessionPool', () => { }); }); - describe('_prepareTransaction', () => { - it('should prepare a transaction', async () => { - const fakeSession = createSession(); - const fakeTransaction = new FakeTransaction(); - const beginStub = sandbox.stub(fakeTransaction, 'begin').resolves(); - - fakeSession.transaction = sandbox.stub().returns(fakeTransaction); - - await sessionPool._prepareTransaction(fakeSession); - - assert.strictEqual(fakeSession.txn, fakeTransaction); - assert.strictEqual(beginStub.callCount, 1); - }); - }); - describe('_release', () => { it('should release the session', () => { const fakeSession = createSession('id', {type: types.ReadOnly}); diff --git a/test/spanner.ts b/test/spanner.ts index 6e412d4c2..76cba2213 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -17,7 +17,14 @@ import {after, before, beforeEach, describe, Done, it} from 'mocha'; import * as assert from 'assert'; import {grpc, Status, ServiceError} from 'google-gax'; -import {Database, Instance, SessionPool, Snapshot, Spanner} from '../src'; +import { + Database, + Instance, + SessionPool, + Snapshot, + Spanner, + Transaction, +} from '../src'; import * as mock from './mockserver/mockspanner'; import { MockError, @@ -282,6 +289,7 @@ describe('Spanner with mock server', () => { requestTag: 'request-tag', }, }); + await tx!.batchUpdate([insertSql, insertSql]); return await tx.commit(); } ); @@ -300,6 +308,16 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); + assert.ok(request.transaction?.begin, 'transaction is not empty'); + const nextBatchRequest = spannerMock + .getRequests() + .reverse() + .find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(nextBatchRequest, 'no ExecuteBatchDmlRequest found'); + assert.ok(nextBatchRequest.transaction?.id, 'no transaction ID'); + const commitRequest = spannerMock.getRequests().find(val => { return (val as v1.CommitRequest).mutations; }) as v1.CommitRequest; @@ -310,6 +328,38 @@ describe('Spanner with mock server', () => { ); }); + it('should use txn ID from batchUpdate if non-ok status', async () => { + const sql = "INSERT INTO TBL (NUM, NAME) VALUES (14, 'Four')"; + const database = newTestDatabase(); + const err = { + message: 'Not OK', + } as MockError; + spannerMock.putStatementResult( + sql, + mock.StatementResult.updateCount(1, err) + ); + + await database.runTransactionAsync(async tx => { + await tx!.batchUpdate([sql, insertSql]); + await tx!.batchUpdate([sql, insertSql]); + return await tx.commit(); + }); + await database.close(); + const request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(request, 'no ExecuteBatchDmlRequest found'); + assert.ok(request.transaction?.begin, 'transaction is not empty'); + const nextBatchRequest = spannerMock + .getRequests() + .reverse() + .find(val => { + return (val as v1.ExecuteBatchDmlRequest).statements; + }) as v1.ExecuteBatchDmlRequest; + assert.ok(nextBatchRequest, 'no ExecuteBatchDmlRequest found'); + assert.ok(nextBatchRequest.transaction?.id, 'no transaction ID'); + }); + it('should execute update with requestOptions', async () => { const database = newTestDatabase(); await database.runTransactionAsync( @@ -336,6 +386,7 @@ describe('Spanner with mock server', () => { ); assert.strictEqual(request.requestOptions!.priority, 'PRIORITY_LOW'); assert.strictEqual(request.requestOptions!.requestTag, 'request-tag'); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); assert.strictEqual( request.requestOptions!.transactionTag, 'transaction-tag' @@ -388,6 +439,7 @@ describe('Spanner with mock server', () => { request.requestOptions!.transactionTag, 'transaction-tag' ); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); }); it('should return an array of json objects', async () => { @@ -1214,6 +1266,43 @@ describe('Spanner with mock server', () => { await database.close(); }); + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 2); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + }); + it('should not retry non-retryable error during streaming', async () => { const database = newTestDatabase(); const err = { @@ -1898,23 +1987,6 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors on BeginTransaction during Database.runTransaction()', done => { - // Create a session pool with 1 read-only session. - const db = newTestDatabase({min: 1, incStep: 1, writes: 0.0}); - const pool = db.pool_ as SessionPool; - // Wait until one read-only session has been created. - pool.once('available', () => { - spannerMock.setExecutionTime( - spannerMock.beginTransaction, - SimulatedExecutionTime.ofError({ - code: grpc.status.NOT_FOUND, - message: 'Session not found', - } as MockError) - ); - runTransactionWithExpectedSessionRetry(db, done); - }); - }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransaction()', done => { const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); const pool = db.pool_ as SessionPool; @@ -2071,23 +2143,6 @@ describe('Spanner with mock server', () => { }); }); - it('should retry "Session not found" errors on BeginTransaction during Database.runTransactionAsync()', done => { - // Create a session pool with 1 read-only session. - const db = newTestDatabase({min: 1, incStep: 1, writes: 0.0}); - const pool = db.pool_ as SessionPool; - // Wait until one read-only session has been created. - pool.once('available', async () => { - spannerMock.setExecutionTime( - spannerMock.beginTransaction, - SimulatedExecutionTime.ofError({ - code: grpc.status.NOT_FOUND, - message: 'Session not found', - } as MockError) - ); - runAsyncTransactionWithExpectedSessionRetry(db).then(done).catch(done); - }); - }); - it('should retry "Session not found" errors for a query on a write-session on Database.runTransactionAsync()', done => { const db = newTestDatabase({min: 1, incStep: 1, writes: 1.0}); const pool = db.pool_ as SessionPool; @@ -2709,6 +2764,7 @@ describe('Spanner with mock server', () => { // Execute an update. const [count] = await database.runTransactionAsync( (transaction): Promise<[number]> => { + transaction.begin(); return transaction.runUpdate(insertSql).then(updateCount => { transaction.commit(); return updateCount; @@ -2916,6 +2972,7 @@ describe('Spanner with mock server', () => { const database = newTestDatabase(); const [updated] = await database.runTransactionAsync( (transaction): Promise => { + transaction.begin(); return transaction.runUpdate(insertSql).then(updateCount => { if (!attempts) { spannerMock.abortTransaction(transaction); @@ -2937,6 +2994,7 @@ describe('Spanner with mock server', () => { await database.runTransactionAsync( {timeout: 1}, (transaction): Promise => { + transaction.begin(); attempts++; return transaction.runUpdate(insertSql).then(updateCount => { // Always abort the transaction. @@ -3086,6 +3144,164 @@ describe('Spanner with mock server', () => { ); }); + it('should use inline begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + await tx!.run(insertSql); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, insertSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + + it('should only inline one begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + const rowCount1 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount2 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + await Promise.all([rowCount1, rowCount2]); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, selectSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction on retry', async () => { + const database = newTestDatabase(); + let attempts = 0; + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + if (!attempts) { + spannerMock.abortTransaction(tx); + } + attempts++; + await tx!.run(insertSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await tx.runUpdate(invalidSql); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + + it('should use beginTransaction for streaming on retry for unknown reason', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await getRowCountFromStreamingSql(tx!, {sql: invalidSql}); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + `${grpc.status.NOT_FOUND} NOT_FOUND: ${fooNotFoundErr.message}` + ); + } + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); + + it('should fail if beginTransaction fails', async () => { + const database = newTestDatabase(); + const err = { + message: 'Test error', + } as MockError; + spannerMock.setExecutionTime( + spannerMock.beginTransaction, + SimulatedExecutionTime.ofError(err) + ); + try { + await database.runTransactionAsync(async tx => { + await tx!.run(selectSql); + spannerMock.abortTransaction(tx); + await tx!.run(insertSql); + await tx.commit(); + }); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as ServiceError).message, + '2 UNKNOWN: Test error' + ); + } finally { + await database.close(); + } + }); + it('should use transactionTag on blind commit', async () => { const database = newTestDatabase({min: 0}); const [session] = await database.createSession({}); @@ -3110,6 +3326,20 @@ describe('Spanner with mock server', () => { 'transaction-tag' ); }); + + it('should run begin transaction on blind commit', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + tx.insert('foo', {id: 1, name: 'One'}); + await tx.commit(); + }); + await database.close(); + + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(beginTxnRequest, 'beginTransaction was called'); + }); }); describe('table', () => { @@ -3896,13 +4126,13 @@ function executeSimpleUpdate( } function getRowCountFromStreamingSql( - database: Database, + context: Database | Transaction, query: ExecuteSqlRequest ): Promise { return new Promise((resolve, reject) => { let rows = 0; let errored = false; - database + context .runStream(query) .on('error', err => { errored = true; diff --git a/test/transaction-runner.ts b/test/transaction-runner.ts index 4a28d9ea3..586501bd9 100644 --- a/test/transaction-runner.ts +++ b/test/transaction-runner.ts @@ -29,6 +29,7 @@ class FakeTransaction extends EventEmitter { async begin(): Promise {} request() {} requestStream() {} + useInRunner() {} } describe('TransactionRunner', () => { @@ -213,6 +214,9 @@ describe('TransactionRunner', () => { const transaction = await runner.getTransaction(); assert.strictEqual(transaction, expectedTransaction); + assert.strictEqual(beginStub.callCount, 0); + runner.attempts++; + await runner.getTransaction(); assert.strictEqual(beginStub.callCount, 1); }); }); diff --git a/test/transaction.ts b/test/transaction.ts index 387627e6f..739b68d0c 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -1949,7 +1949,7 @@ describe('Transaction', () => { }); }); - it('should not set transaction tag if `singleUse`', () => { + it('should set transaction tag if `begin`', () => { const TABLE = 'my-table-123'; const transactionTag = 'bar'; transaction.requestOptions = {transactionTag}; @@ -1958,7 +1958,9 @@ describe('Transaction', () => { const {reqOpts} = REQUEST_STREAM.lastCall.args[0]; - assert.deepStrictEqual(reqOpts.requestOptions, {}); + assert.deepStrictEqual(reqOpts.requestOptions, { + transactionTag, + }); }); }); });