Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polish(incremental): small fixes #4141

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 29 additions & 45 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class IncrementalGraph {

private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void
>;

constructor() {
Expand Down Expand Up @@ -70,37 +70,32 @@ export class IncrementalGraph {
}
}

completedIncrementalData() {
return {
[Symbol.asyncIterator]() {
return this;
},
next: (): Promise<
IteratorResult<Iterable<IncrementalDataRecordResult>>
> => {
const firstResult = this._completedQueue.shift();
if (firstResult !== undefined) {
return Promise.resolve({
value: this._yieldCurrentCompletedIncrementalData(firstResult),
done: false,
});
}
const { promise, resolve } =
promiseWithResolvers<
IteratorResult<Iterable<IncrementalDataRecordResult>>
>();
this._nextQueue.push(resolve);
return promise;
},
return: (): Promise<
IteratorResult<Iterable<IncrementalDataRecordResult>>
> => {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
return Promise.resolve({ value: undefined, done: true });
},
};
*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}
}

nextCompletedBatch(): Promise<
Iterable<IncrementalDataRecordResult> | undefined
> {
const { promise, resolve } = promiseWithResolvers<
Iterable<IncrementalDataRecordResult> | undefined
>();
this._nextQueue.push(resolve);
return promise;
}

abort(): void {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}

hasNext(): boolean {
Expand Down Expand Up @@ -157,11 +152,6 @@ export class IncrementalGraph {
subsequentResultRecord: SubsequentResultRecord,
): void {
this._rootNodes.delete(subsequentResultRecord);
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
}
}

private _addIncrementalDataRecords(
Expand Down Expand Up @@ -332,19 +322,13 @@ export class IncrementalGraph {
first: IncrementalDataRecordResult,
): Generator<IncrementalDataRecordResult> {
yield first;
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
yield* this.currentCompletedBatch();
}

private _enqueue(completed: IncrementalDataRecordResult): void {
const next = this._nextQueue.shift();
if (next !== undefined) {
next({
value: this._yieldCurrentCompletedIncrementalData(completed),
done: false,
});
next(this._yieldCurrentCompletedIncrementalData(completed));
return;
}
this._completedQueue.push(completed);
Expand Down
21 changes: 8 additions & 13 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ class IncrementalPublisher {
completed: [],
};

const completedIncrementalData =
this._incrementalGraph.completedIncrementalData();
// use the raw iterator rather than 'for await ... of' so as not to trigger the
// '.return()' method on the iterator when exiting the loop with the next value
const asyncIterator = completedIncrementalData[Symbol.asyncIterator]();
let iteration = await asyncIterator.next();
while (!iteration.done) {
for (const completedResult of iteration.value) {
let batch: Iterable<IncrementalDataRecordResult> | undefined =
this._incrementalGraph.currentCompletedBatch();
do {
for (const completedResult of batch) {
this._handleCompletedIncrementalData(completedResult, context);
}

Expand All @@ -151,7 +147,6 @@ class IncrementalPublisher {
const hasNext = this._incrementalGraph.hasNext();

if (!hasNext) {
// eslint-disable-next-line require-atomic-updates
isDone = true;
}

Expand All @@ -173,8 +168,8 @@ class IncrementalPublisher {
}

// eslint-disable-next-line no-await-in-loop
iteration = await asyncIterator.next();
}
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
Expand All @@ -184,6 +179,7 @@ class IncrementalPublisher {
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
return { value: undefined, done: true };
};
Expand All @@ -192,6 +188,7 @@ class IncrementalPublisher {
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
return Promise.reject(error);
};
Expand Down Expand Up @@ -363,8 +360,6 @@ class IncrementalPublisher {
}

private async _returnAsyncIterators(): Promise<void> {
await this._incrementalGraph.completedIncrementalData().return();

const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams === undefined) {
return;
Expand Down
32 changes: 10 additions & 22 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2331,16 +2331,12 @@ describe('Execute: stream directive', () => {
});
it('Returns underlying async iterables when returned generator is returned', async () => {
let returned = false;
let index = 0;
const iterable = {
[Symbol.asyncIterator]: () => ({
next: () => {
const friend = friends[index++];
if (friend == null) {
return Promise.resolve({ done: true, value: undefined });
}
return Promise.resolve({ done: false, value: friend });
},
next: () =>
new Promise(() => {
/* never resolves */
}),
return: () => {
returned = true;
},
Expand All @@ -2349,11 +2345,8 @@ describe('Execute: stream directive', () => {

const document = parse(`
query {
friendList @stream(initialCount: 1) {
friendList @stream(initialCount: 0) {
id
... @defer {
name
}
}
}
`);
Expand All @@ -2371,21 +2364,16 @@ describe('Execute: stream directive', () => {
const result1 = executeResult.initialResult;
expectJSON(result1).toDeepEqual({
data: {
friendList: [
{
id: '1',
},
],
friendList: [],
},
pending: [
{ id: '0', path: ['friendList', 0] },
{ id: '1', path: ['friendList'] },
],
pending: [{ id: '0', path: ['friendList'] }],
hasNext: true,
});

const result2Promise = iterator.next();
const returnPromise = iterator.return();

const result2 = await iterator.next();
const result2 = await result2Promise;
expectJSON(result2).toDeepEqual({
done: true,
value: undefined,
Expand Down
Loading