Skip to content

Commit

Permalink
Merge branch 'main' into refactor/tree-shakable-kind-2
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonkuhrt authored Nov 9, 2024
2 parents 05b06d4 + 831c121 commit 61c6351
Show file tree
Hide file tree
Showing 15 changed files with 1,777 additions and 378 deletions.
91 changes: 91 additions & 0 deletions src/execution/AbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

/**
* A AbortSignalListener object can be used to trigger multiple responses
* in response to a single AbortSignal.
*
* @internal
*/
export class AbortSignalListener {
abortSignal: AbortSignal;
abort: () => void;

private _onAborts: Set<() => void>;

constructor(abortSignal: AbortSignal) {
this.abortSignal = abortSignal;
this._onAborts = new Set<() => void>();
this.abort = () => {
for (const abort of this._onAborts) {
abort();
}
};

abortSignal.addEventListener('abort', this.abort);
}

add(onAbort: () => void): void {
this._onAborts.add(onAbort);
}

delete(onAbort: () => void): void {
this._onAborts.delete(onAbort);
}

disconnect(): void {
this.abortSignal.removeEventListener('abort', this.abort);
}
}

export function cancellablePromise<T>(
originalPromise: Promise<T>,
abortSignalListener: AbortSignalListener,
): Promise<T> {
const abortSignal = abortSignalListener.abortSignal;
if (abortSignal.aborted) {
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
return Promise.reject(abortSignal.reason);
}

const { promise, resolve, reject } = promiseWithResolvers<T>();
const onAbort = () => reject(abortSignal.reason);
abortSignalListener.add(onAbort);
originalPromise.then(
(resolved) => {
abortSignalListener.delete(onAbort);
resolve(resolved);
},
(error: unknown) => {
abortSignalListener.delete(onAbort);
reject(error);
},
);

return promise;
}

export function cancellableIterable<T>(
iterable: AsyncIterable<T>,
abortSignalListener: AbortSignalListener,
): AsyncIterable<T> {
const iterator = iterable[Symbol.asyncIterator]();

const _next = iterator.next.bind(iterator);

if (iterator.return) {
const _return = iterator.return.bind(iterator);

return {
[Symbol.asyncIterator]: () => ({
next: () => cancellablePromise(_next(), abortSignalListener),
return: () => cancellablePromise(_return(), abortSignalListener),
}),
};
}

return {
[Symbol.asyncIterator]: () => ({
next: () => cancellablePromise(_next(), abortSignalListener),
}),
};
}
6 changes: 6 additions & 0 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { pathToArray } from '../jsutils/Path.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import type { AbortSignalListener } from './AbortSignalListener.js';
import { IncrementalGraph } from './IncrementalGraph.js';
import type {
CancellableStreamRecord,
Expand Down Expand Up @@ -43,6 +44,7 @@ export function buildIncrementalResponse(
}

interface IncrementalPublisherContext {
abortSignalListener: AbortSignalListener | undefined;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

Expand Down Expand Up @@ -125,6 +127,7 @@ class IncrementalPublisher {
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
if (isDone) {
this._context.abortSignalListener?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
}
Expand Down Expand Up @@ -171,6 +174,9 @@ class IncrementalPublisher {
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

// TODO: add test for this case
/* c8 ignore next */
this._context.abortSignalListener?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};
Expand Down
170 changes: 170 additions & 0 deletions src/execution/__tests__/AbortSignalListener-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import { expectPromise } from '../../__testUtils__/expectPromise.js';

import {
AbortSignalListener,
cancellableIterable,
cancellablePromise,
} from '../AbortSignalListener.js';

describe('AbortSignalListener', () => {
it('works to add a listener', () => {
const abortController = new AbortController();

const abortSignalListener = new AbortSignalListener(abortController.signal);

let called = false;
const onAbort = () => {
called = true;
};
abortSignalListener.add(onAbort);

abortController.abort();

expect(called).to.equal(true);
});

it('works to delete a listener', () => {
const abortController = new AbortController();

const abortSignalListener = new AbortSignalListener(abortController.signal);

let called = false;
/* c8 ignore next 3 */
const onAbort = () => {
called = true;
};
abortSignalListener.add(onAbort);
abortSignalListener.delete(onAbort);

abortController.abort();

expect(called).to.equal(false);
});

it('works to disconnect a listener from the abortSignal', () => {
const abortController = new AbortController();

const abortSignalListener = new AbortSignalListener(abortController.signal);

let called = false;
/* c8 ignore next 3 */
const onAbort = () => {
called = true;
};
abortSignalListener.add(onAbort);

abortSignalListener.disconnect();

abortController.abort();

expect(called).to.equal(false);
});
});

describe('cancellablePromise', () => {
it('works to cancel an already resolved promise', async () => {
const abortController = new AbortController();

const abortSignalListener = new AbortSignalListener(abortController.signal);

const promise = Promise.resolve(1);

const withCancellation = cancellablePromise(promise, abortSignalListener);

abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel an already resolved promise after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignalListener = new AbortSignalListener(abortController.signal);

abortController.abort(new Error('Cancelled!'));

const promise = Promise.resolve(1);

const withCancellation = cancellablePromise(promise, abortSignalListener);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel a hanging promise', async () => {
const abortController = new AbortController();
const abortSignalListener = new AbortSignalListener(abortController.signal);

const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = cancellablePromise(promise, abortSignalListener);

abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel a hanging promise created after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignalListener = new AbortSignalListener(abortController.signal);

abortController.abort(new Error('Cancelled!'));

const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = cancellablePromise(promise, abortSignalListener);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});
});

describe('cancellableAsyncIterable', () => {
it('works to abort a next call', async () => {
const abortController = new AbortController();
const abortSignalListener = new AbortSignalListener(abortController.signal);

const asyncIterable = {
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ value: 1, done: false }),
}),
};

const withCancellation = cancellableIterable(
asyncIterable,
abortSignalListener,
);

const nextPromise = withCancellation[Symbol.asyncIterator]().next();

abortController.abort(new Error('Cancelled!'));

await expectPromise(nextPromise).toRejectWith('Cancelled!');
});

it('works to abort a next call when already aborted', async () => {
const abortController = new AbortController();
const abortSignalListener = new AbortSignalListener(abortController.signal);

abortController.abort(new Error('Cancelled!'));

const asyncIterable = {
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ value: 1, done: false }),
}),
};

const withCancellation = cancellableIterable(
asyncIterable,
abortSignalListener,
);

const nextPromise = withCancellation[Symbol.asyncIterator]().next();

await expectPromise(nextPromise).toRejectWith('Cancelled!');
});
});
Loading

0 comments on commit 61c6351

Please sign in to comment.