diff --git a/.gitignore b/.gitignore index d4f03a0df..14050d4e4 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ system-test/*key.json .DS_Store package-lock.json __pycache__ +.vscode \ No newline at end of file diff --git a/src/transaction.ts b/src/transaction.ts index 5617b4a3c..2e2420c04 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -712,8 +712,16 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', () => { - if (!this.id && this._useInRunner) { + .on('error', err => { + const isServiceError = err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { this.begin(); } }); @@ -1219,8 +1227,16 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', () => { - if (!this.id && this._useInRunner) { + .on('error', err => { + const isServiceError = err && typeof err === 'object' && 'code' in err; + if ( + !this.id && + this._useInRunner && + !( + isServiceError && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ) + ) { this.begin(); } }); @@ -1437,6 +1453,7 @@ export class Snapshot extends EventEmitter { this._waitingRequests.push(() => { makeRequest(resumeToken) .on('data', chunk => streamProxy.emit('data', chunk)) + .on('error', err => streamProxy.emit('error', err)) .on('end', () => streamProxy.emit('end')); }); diff --git a/test/mockserver/mockspanner.ts b/test/mockserver/mockspanner.ts index 464027b7d..9e0c06996 100644 --- a/test/mockserver/mockspanner.ts +++ b/test/mockserver/mockspanner.ts @@ -540,6 +540,7 @@ export class MockSpanner { call.request!.transaction.id }`; if (this.abortedTransactions.has(fullTransactionId)) { + call.sendMetadata(new Metadata()); call.emit( 'error', MockSpanner.createTransactionAbortedError(`${fullTransactionId}`) @@ -556,6 +557,7 @@ export class MockSpanner { call.request!.transaction.begin ); if (txn instanceof Error) { + call.sendMetadata(new Metadata()); call.emit('error', txn); call.end(); return; @@ -593,6 +595,7 @@ export class MockSpanner { index ); if (streamErr) { + call.sendMetadata(new Metadata()); call.emit('error', streamErr); break; } @@ -610,6 +613,7 @@ export class MockSpanner { 1 ); if (streamErr) { + call.sendMetadata(new Metadata()); call.emit('error', streamErr); break; } diff --git a/test/spanner.ts b/test/spanner.ts index 4acb15fdf..b9c2ca682 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -1314,6 +1314,68 @@ describe('Spanner with mock server', () => { ); }); + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response with parallel queries', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + const [rows1, rows2] = await Promise.all([ + tx!.run(selectSql), + tx!.run(selectSql), + ]); + assert.equal(rows1.length, 3); + assert.equal(rows2.length, 3); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 3); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + const commitRequests = spannerMock + .getRequests() + .filter(val => (val as v1.CommitRequest).mutations) + .map(req => req as v1.CommitRequest); + assert.strictEqual(commitRequests.length, 1); + assert.deepStrictEqual( + requests[1].transaction!.id, + requests[2].transaction!.id + ); + assert.deepStrictEqual( + requests[1].transaction!.id, + commitRequests[0].transactionId + ); + const beginTxnRequests = spannerMock + .getRequests() + .filter( + val => (val as v1.BeginTransactionRequest).options?.readWrite + ) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequests.length, 0); + }); + it('should not retry non-retryable error during streaming', async () => { const database = newTestDatabase(); const err = { @@ -2890,6 +2952,59 @@ describe('Spanner with mock server', () => { }); }); + it('should retry on aborted when running parallel query', async () => { + let attempts = 0; + const database = newTestDatabase(); + const rowCount = await database.runTransactionAsync( + (transaction): Promise => { + if (!attempts) { + spannerMock.abortTransaction(transaction); + } + attempts++; + return Promise.all([ + transaction!.run(selectSql), + transaction!.run(selectSql), + ]).then(([rows1, rows2]) => { + assert.strictEqual(rows1.length, 3); + assert.strictEqual(rows2.length, 3); + return transaction.commit().then(() => rows1.length + rows2.length); + }); + } + ); + assert.strictEqual(rowCount, 6); + assert.strictEqual(attempts, 2); + const requests = spannerMock + .getRequests() + .filter(val => { + return (val as v1.ExecuteSqlRequest).sql === selectSql; + }) + .map(req => req as v1.ExecuteSqlRequest); + + // First request will fail and second blocked request will get discarded, once Abort error is received. + assert.strictEqual(requests.length, 3); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'Inline txn is not set in request.' + ); + requests.slice(1, 3).forEach((request, index) => { + assert.ok( + request.transaction!.id, + `Transaction ID is not used for retries. ${index}.` + ); + }); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); + const commitRequests = spannerMock + .getRequests() + .filter(val => (val as v1.CommitRequest).mutations) + .map(req => req as v1.CommitRequest); + assert.strictEqual(commitRequests.length, 1); + await database.close(); + }); + it('should retry on aborted update statement', async () => { let attempts = 0; const database = newTestDatabase(); @@ -3211,6 +3326,19 @@ describe('Spanner with mock server', () => { assert.ok(!beginTxnRequest, 'beginTransaction was called'); }); + it('should catch an exception error during invalid queries while using inline begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + try { + await Promise.all([tx!.run(selectSql), tx!.run(invalidSql)]); + await tx.commit(); + } catch (err) { + assert(err, 'Expected an error to be thrown'); + assert.match((err as Error).message, /Table FOO not found/); + } + }); + }); + it('should apply blind writes only once', async () => { const database = newTestDatabase(); let attempts = 0; @@ -3243,10 +3371,11 @@ describe('Spanner with mock server', () => { }) as v1.ExecuteSqlRequest; assert.ok(secondExecuteSqlRequest.transaction?.id); // Verify that we have a BeginTransaction request for the retry. - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequests = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequests.length, 1); // Verify that we have a single Commit request, and that the Commit request contains only one mutation. assert.strictEqual( 1, @@ -3541,11 +3670,31 @@ describe('Spanner with mock server', () => { await tx.commit(); }); await database.close(); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); + }); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + it('should use beginTransaction on retry for parallel queries', async () => { + const database = newTestDatabase(); + let attempts = 0; + await database.runTransactionAsync(async tx => { + await Promise.all([tx!.run(selectSql), tx!.run(selectSql)]); + if (!attempts) { + spannerMock.abortTransaction(tx); + } + attempts++; + await Promise.all([tx!.run(insertSql), tx!.run(insertSql)]); + await tx.commit(); + }); + await database.close(); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); }); it('should use beginTransaction on retry with excludeTxnFromChangeStreams', async () => { @@ -3565,12 +3714,13 @@ describe('Spanner with mock server', () => { ); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); }); @@ -3589,12 +3739,13 @@ describe('Spanner with mock server', () => { }); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options!.readWrite!.readLockMode, + beginTxnRequest[0].options!.readWrite!.readLockMode, 'OPTIMISTIC' ); }); @@ -3616,10 +3767,11 @@ describe('Spanner with mock server', () => { }); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); }); it('should use beginTransaction on retry for unknown reason with excludeTxnFromChangeStreams', async () => { @@ -3644,12 +3796,13 @@ describe('Spanner with mock server', () => { ); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); }); @@ -3671,10 +3824,11 @@ describe('Spanner with mock server', () => { }); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); }); it('should use beginTransaction for streaming on retry for unknown reason with excludeTxnFromChangeStreams', async () => { @@ -3699,12 +3853,13 @@ describe('Spanner with mock server', () => { ); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); }); @@ -3769,10 +3924,11 @@ describe('Spanner with mock server', () => { }); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - assert.ok(beginTxnRequest, 'beginTransaction was called'); + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); }); it('should run begin transaction on blind commit with excludeTxnFromChangeStreams', async () => { @@ -3788,11 +3944,13 @@ describe('Spanner with mock server', () => { ); await database.close(); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); }); @@ -3841,12 +3999,13 @@ describe('Spanner with mock server', () => { } ); } catch (e) { - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; - + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); assert.strictEqual( @@ -3897,11 +4056,13 @@ describe('Spanner with mock server', () => { excludeTxnFromChangeStreams: true, } ); - const beginTxnRequest = spannerMock.getRequests().find(val => { - return (val as v1.BeginTransactionRequest).options?.readWrite; - }) as v1.BeginTransactionRequest; + const beginTxnRequest = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequest.length, 1); assert.strictEqual( - beginTxnRequest.options?.excludeTxnFromChangeStreams, + beginTxnRequest[0].options?.excludeTxnFromChangeStreams, true ); await database.close();