diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 616cdc88b..69439f534 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -22,8 +22,8 @@ import mergeStream = require('merge-stream'); import {common as p} from 'protobufjs'; import {Readable, Transform} from 'stream'; import * as streamEvents from 'stream-events'; -import {grpc} from 'google-gax'; -import {isRetryableInternalError} from './transaction-runner'; +import {grpc, CallOptions} from 'google-gax'; +import {DeadlineError, isRetryableInternalError} from './transaction-runner'; import {codec, JSONOptions, Json, Field, Value} from './codec'; import {google} from '../protos/protos'; @@ -96,6 +96,7 @@ export interface RowOptions { * }; */ columnsMetadata?: object; + gaxOptions?: CallOptions; } /** @@ -491,6 +492,8 @@ export function partialResultStream( const maxQueued = 10; let lastResumeToken: ResumeToken; let lastRequestStream: Readable; + const startTime = Date.now(); + const timeout = options?.gaxOptions?.timeout ?? Infinity; // mergeStream allows multiple streams to be connected into one. This is good; // if we need to retry a request and pipe more data to the user's stream. @@ -541,6 +544,17 @@ export function partialResultStream( }; const retry = (err: grpc.ServiceError): void => { + const elapsed = Date.now() - startTime; + if (elapsed >= timeout) { + // The timeout has reached so this will flush any rows the + // checkpoint stream has queued. After that, we will destroy the + // user's stream with the Deadline exceeded error. + setImmediate(() => + batchAndSplitOnTokenStream.destroy(new DeadlineError(err)) + ); + return; + } + if ( !( err.code && diff --git a/src/transaction.ts b/src/transaction.ts index d5ec34a30..479262a27 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -704,6 +704,7 @@ export class Snapshot extends EventEmitter { jsonOptions, maxResumeRetries, columnsMetadata, + gaxOptions, }) ?.on('response', response => { if (response.metadata && response.metadata!.transaction && !this.id) { @@ -1210,6 +1211,7 @@ export class Snapshot extends EventEmitter { jsonOptions, maxResumeRetries, columnsMetadata, + gaxOptions, }) .on('response', response => { if (response.metadata && response.metadata!.transaction && !this.id) { diff --git a/test/partial-result-stream.ts b/test/partial-result-stream.ts index 8256221f1..799d29b00 100644 --- a/test/partial-result-stream.ts +++ b/test/partial-result-stream.ts @@ -311,6 +311,38 @@ describe('PartialResultStream', () => { ); }); + it('should get Deadline exceeded error if timeout has reached', done => { + const fakeCheckpointStream = through.obj(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (fakeCheckpointStream as any).reset = () => {}; + + sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream); + + const firstFakeRequestStream = through.obj(); + + const requestFnStub = sandbox.stub(); + + requestFnStub.onCall(0).callsFake(() => { + setTimeout(() => { + // This causes a new request stream to be created. + firstFakeRequestStream.emit('error', { + code: grpc.status.UNAVAILABLE, + message: 'Error.', + } as grpc.ServiceError); + }, 50); + + return firstFakeRequestStream; + }); + + partialResultStream(requestFnStub, {gaxOptions: {timeout: 0}}) + .on('data', row => {}) + .on('error', err => { + assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED); + assert.strictEqual(requestFnStub.callCount, 1); + done(); + }); + }); + it('should resume if there was a retryable error', done => { // This test will emit four rows total: // - Two rows diff --git a/test/transaction.ts b/test/transaction.ts index 392370ab7..28be1543f 100644 --- a/test/transaction.ts +++ b/test/transaction.ts @@ -391,11 +391,15 @@ describe('Transaction', () => { }); it('should pass along row options', () => { + const gaxOptions = { + timeout: 60, + }; const fakeOptions = { json: true, jsonOptions: {a: 'b'}, maxResumeRetries: 10, columnsMetadata: {column1: {test: 'ss'}, column2: Function}, + gaxOptions: gaxOptions, }; snapshot.createReadStream(TABLE, fakeOptions); @@ -766,11 +770,15 @@ describe('Transaction', () => { }); it('should pass along row options', () => { + const gaxOptions = { + timeout: 60, + }; const expectedOptions = { json: true, jsonOptions: {a: 'b'}, maxResumeRetries: 10, columnsMetadata: {column1: {test: 'ss'}, column2: Function}, + gaxOptions: gaxOptions, }; const fakeQuery = Object.assign({}, QUERY, expectedOptions);