Skip to content

Commit

Permalink
feat: implement Request.signal
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Aug 9, 2023
1 parent 6508456 commit 4b67036
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 40 deletions.
6 changes: 6 additions & 0 deletions .changeset/spotty-cougars-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@whatwg-node/node-fetch': patch
'@whatwg-node/server': patch
---

Handle AbortSignal
19 changes: 13 additions & 6 deletions packages/node-fetch/src/fetchCurl.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Readable } from 'node:stream';
import { PonyfillAbortError } from './AbortError.js';
import { PonyfillRequest } from './Request.js';
import { PonyfillResponse } from './Response.js';
import { defaultHeadersSerializer } from './utils.js';
Expand All @@ -10,12 +11,6 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(

const curlHandle = new Curl();

if (fetchRequest['_signal']) {
fetchRequest['_signal'].onabort = () => {
curlHandle.pause(CurlPause.Recv);
};
}

curlHandle.enable(CurlFeature.NoDataParsing);

curlHandle.setOpt('URL', fetchRequest.url);
Expand Down Expand Up @@ -77,6 +72,17 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
curlHandle.enable(CurlFeature.NoHeaderParsing);

return new Promise(function promiseResolver(resolve, reject) {
let streamResolved = false;
if (fetchRequest['_signal']) {
fetchRequest['_signal'].onabort = () => {
if (streamResolved) {
curlHandle.pause(CurlPause.Recv);
} else {
reject(new PonyfillAbortError());
curlHandle.close();
}
};
}
curlHandle.once('end', function endListener() {
curlHandle.close();
});
Expand Down Expand Up @@ -115,6 +121,7 @@ export function fetchCurl<TResponseJSON = any, TRequestJSON = any>(
url: fetchRequest.url,
}),
);
streamResolved = true;
});
curlHandle.perform();
});
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/createServerAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
NodeResponse,
normalizeNodeRequest,
sendNodeResponse,
ServerAdapterRequestAbortSignal,
} from './utils.js';
import {
getRequestFromUWSRequest,
Expand Down Expand Up @@ -244,6 +245,7 @@ function createServerAdapter<
let resAborted = false;
res.onAborted(() => {
resAborted = true;
(request.signal as ServerAdapterRequestAbortSignal).sendAbort();
});
let response$: Response | Promise<Response> | undefined;
try {
Expand Down
42 changes: 42 additions & 0 deletions packages/server/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface NodeRequest {
raw?: IncomingMessage | Http2ServerRequest;
socket?: Socket;
query?: any;
once?(event: string, listener: (...args: any[]) => void): void;
}

export type NodeResponse = ServerResponse | Http2ServerResponse;
Expand Down Expand Up @@ -80,6 +81,36 @@ function isRequestBody(body: any): body is BodyInit {
return false;
}

export class ServerAdapterRequestAbortSignal extends EventTarget implements AbortSignal {
aborted = false;
_onabort: ((this: AbortSignal, ev: Event) => any) | null = null;
reason: any;

throwIfAborted(): void {
if (this.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
}

sendAbort() {
this.aborted = true;
this.dispatchEvent(new Event('abort'));
}

get onabort() {
return this._onabort;
}

set onabort(value) {
this._onabort = value;
if (value) {
this.addEventListener('abort', value);
} else {
this.removeEventListener('abort', value);
}
}
}

export function normalizeNodeRequest(
nodeRequest: NodeRequest,
RequestCtor: typeof Request,
Expand All @@ -94,10 +125,18 @@ export function normalizeNodeRequest(
fullUrl = url.toString();
}

const signal = new ServerAdapterRequestAbortSignal();

if (rawRequest.once) {
rawRequest.once('end', () => signal.sendAbort());
rawRequest.once('close', () => signal.sendAbort());
}

if (nodeRequest.method === 'GET' || nodeRequest.method === 'HEAD') {
return new RequestCtor(fullUrl, {
method: nodeRequest.method,
headers: nodeRequest.headers,
signal,
});
}

Expand All @@ -114,11 +153,13 @@ export function normalizeNodeRequest(
method: nodeRequest.method,
headers: nodeRequest.headers,
body: maybeParsedBody,
signal,
});
}
const request = new RequestCtor(fullUrl, {
method: nodeRequest.method,
headers: nodeRequest.headers,
signal,
});
if (!request.headers.get('content-type')?.includes('json')) {
request.headers.set('content-type', 'application/json; charset=utf-8');
Expand All @@ -142,6 +183,7 @@ export function normalizeNodeRequest(
method: nodeRequest.method,
headers: nodeRequest.headers,
body: rawRequest as any,
signal,
});
}

Expand Down
36 changes: 2 additions & 34 deletions packages/server/src/uwebsockets.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Readable } from 'node:stream';
import type { FetchAPI } from './types.js';
import { ServerAdapterRequestAbortSignal } from './utils.js';

export interface UWSRequest {
getMethod(): string;
Expand Down Expand Up @@ -32,39 +33,6 @@ interface GetRequestFromUWSOpts {
fetchAPI: FetchAPI;
}

class UWSAbortSignal extends EventTarget implements AbortSignal {
aborted = false;
_onabort: ((this: AbortSignal, ev: Event) => any) | null = null;
reason: any;

throwIfAborted(): void {
if (this.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
}

constructor(res: UWSResponse) {
super();
res.onAborted(() => {
this.aborted = true;
this.dispatchEvent(new Event('request aborted'));
});
}

get onabort() {
return this._onabort;
}

set onabort(value) {
this._onabort = value;
if (value) {
this.addEventListener('request aborted', value);
} else {
this.removeEventListener('request aborted', value);
}
}
}

export function getRequestFromUWSRequest({ req, res, fetchAPI }: GetRequestFromUWSOpts) {
let body: ReadableStream | undefined;
const method = req.getMethod();
Expand Down Expand Up @@ -97,7 +65,7 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI }: GetRequestFromU
method,
headers,
body: body as any,
signal: new UWSAbortSignal(res),
signal: new ServerAdapterRequestAbortSignal(),
});
}

Expand Down
21 changes: 21 additions & 0 deletions packages/server/test/node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ describe('Node Specific Cases', () => {
const response = await fetch(testServer.url);
expect(response.status).toBe(418);
});

it('handles AbortSignal correctly', async () => {
const abortListener = jest.fn();
const serverAdapter = createServerAdapter(
req =>
new Promise(resolve => {
req.signal.onabort = () => {
abortListener();
resolve(new Response('Hello World', { status: 200 }));
};
}),
);
testServer.addOnceHandler(serverAdapter);
const controller = new AbortController();
setTimeout(() => controller.abort(), 1000);
await expect(() => fetch(testServer.url, { signal: controller.signal })).rejects.toEqual(
new Error('The operation was aborted'),
);
await new Promise(resolve => setTimeout(resolve, 300));
expect(abortListener).toHaveBeenCalledTimes(1);
});
});
});

Expand Down

0 comments on commit 4b67036

Please sign in to comment.