Skip to content

Commit

Permalink
fix: retry error with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
surbhigarg92 committed Jun 26, 2024
1 parent 055f2f7 commit 407b870
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import mergeStream = require('merge-stream');
import {common as p} from 'protobufjs';
import {Readable, Transform} from 'stream';
import * as streamEvents from 'stream-events';
import {grpc} from 'google-gax';
import {isRetryableInternalError} from './transaction-runner';
import {grpc, CallOptions} from 'google-gax';
import {DeadlineError, isRetryableInternalError} from './transaction-runner';

import {codec, JSONOptions, Json, Field, Value} from './codec';
import {google} from '../protos/protos';
Expand Down Expand Up @@ -96,6 +96,7 @@ export interface RowOptions {
* };
*/
columnsMetadata?: object;
gaxOptions?: CallOptions;
}

/**
Expand Down Expand Up @@ -491,6 +492,8 @@ export function partialResultStream(
const maxQueued = 10;
let lastResumeToken: ResumeToken;
let lastRequestStream: Readable;
const startTime = Date.now();
const timeout = options?.gaxOptions?.timeout ?? Infinity;

// mergeStream allows multiple streams to be connected into one. This is good;
// if we need to retry a request and pipe more data to the user's stream.
Expand Down Expand Up @@ -541,6 +544,17 @@ export function partialResultStream(
};

const retry = (err: grpc.ServiceError): void => {
const elapsed = Date.now() - startTime;
if (elapsed >= timeout) {
// The timeout has reached so this will flush any rows the
// checkpoint stream has queued. After that, we will destroy the
// user's stream with the Deadline exceeded error.
setImmediate(() =>
batchAndSplitOnTokenStream.destroy(new DeadlineError(err))
);
return;
}

if (
!(
err.code &&
Expand Down
2 changes: 2 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ export class Snapshot extends EventEmitter {
jsonOptions,
maxResumeRetries,
columnsMetadata,
gaxOptions,
})
?.on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
Expand Down Expand Up @@ -1210,6 +1211,7 @@ export class Snapshot extends EventEmitter {
jsonOptions,
maxResumeRetries,
columnsMetadata,
gaxOptions,
})
.on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
Expand Down
32 changes: 32 additions & 0 deletions test/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,38 @@ describe('PartialResultStream', () => {
);
});

it('should get Deadline exceeded error if timeout has reached', done => {
const fakeCheckpointStream = through.obj();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(fakeCheckpointStream as any).reset = () => {};

sandbox.stub(checkpointStream, 'obj').returns(fakeCheckpointStream);

const firstFakeRequestStream = through.obj();

const requestFnStub = sandbox.stub();

requestFnStub.onCall(0).callsFake(() => {
setTimeout(() => {
// This causes a new request stream to be created.
firstFakeRequestStream.emit('error', {
code: grpc.status.UNAVAILABLE,
message: 'Error.',
} as grpc.ServiceError);
}, 50);

return firstFakeRequestStream;
});

partialResultStream(requestFnStub, {gaxOptions: {timeout: 0}})
.on('data', row => {})
.on('error', err => {
assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED);
assert.strictEqual(requestFnStub.callCount, 1);
done();
});
});

it('should resume if there was a retryable error', done => {
// This test will emit four rows total:
// - Two rows
Expand Down
8 changes: 8 additions & 0 deletions test/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,15 @@ describe('Transaction', () => {
});

it('should pass along row options', () => {
const gaxOptions = {
timeout: 60,
};
const fakeOptions = {
json: true,
jsonOptions: {a: 'b'},
maxResumeRetries: 10,
columnsMetadata: {column1: {test: 'ss'}, column2: Function},
gaxOptions: gaxOptions,
};

snapshot.createReadStream(TABLE, fakeOptions);
Expand Down Expand Up @@ -766,11 +770,15 @@ describe('Transaction', () => {
});

it('should pass along row options', () => {
const gaxOptions = {
timeout: 60,
};
const expectedOptions = {
json: true,
jsonOptions: {a: 'b'},
maxResumeRetries: 10,
columnsMetadata: {column1: {test: 'ss'}, column2: Function},
gaxOptions: gaxOptions,
};

const fakeQuery = Object.assign({}, QUERY, expectedOptions);
Expand Down

0 comments on commit 407b870

Please sign in to comment.