diff --git a/.changeset/few-baboons-suffer.md b/.changeset/few-baboons-suffer.md new file mode 100644 index 00000000000..2f3c105ff4d --- /dev/null +++ b/.changeset/few-baboons-suffer.md @@ -0,0 +1,5 @@ +--- +"@whatwg-node/server": patch +--- + +Send AbortSignal at correct time diff --git a/packages/node-fetch/src/fetchCurl.ts b/packages/node-fetch/src/fetchCurl.ts index a8ceeb10f6d..e12aa61d206 100644 --- a/packages/node-fetch/src/fetchCurl.ts +++ b/packages/node-fetch/src/fetchCurl.ts @@ -76,7 +76,9 @@ export function fetchCurl( if (fetchRequest['_signal']) { fetchRequest['_signal'].onabort = () => { if (streamResolved) { - curlHandle.pause(CurlPause.Recv); + if (curlHandle.isOpen) { + curlHandle.pause(CurlPause.Recv); + } } else { reject(new PonyfillAbortError()); curlHandle.close(); diff --git a/packages/server/src/utils.ts b/packages/server/src/utils.ts index 7f8bbc00ec3..b142f07061e 100644 --- a/packages/server/src/utils.ts +++ b/packages/server/src/utils.ts @@ -21,6 +21,7 @@ export interface NodeRequest { headers?: any; req?: IncomingMessage | Http2ServerRequest; raw?: IncomingMessage | Http2ServerRequest; + connection?: Socket; socket?: Socket; query?: any; once?(event: string, listener: (...args: any[]) => void): void; @@ -133,9 +134,10 @@ export function normalizeNodeRequest( if (RequestCtor !== globalThis.Request) { signal = new ServerAdapterRequestAbortSignal(); - if (rawRequest.once) { - rawRequest.once('end', () => (signal as ServerAdapterRequestAbortSignal).sendAbort()); - rawRequest.once('close', () => (signal as ServerAdapterRequestAbortSignal).sendAbort()); + if (rawRequest.connection?.once) { + rawRequest.connection?.once('close', () => + (signal as ServerAdapterRequestAbortSignal).sendAbort(), + ); } } else { const controller = new AbortController(); diff --git a/packages/server/test/proxy.spec.ts b/packages/server/test/proxy.spec.ts new file mode 100644 index 00000000000..20fc778b5df --- /dev/null +++ b/packages/server/test/proxy.spec.ts @@ -0,0 +1,86 @@ +import { createServer } from 'http'; +import { AddressInfo } from 'net'; +import { fetch } from '@whatwg-node/fetch'; +import { createServerAdapter } from '../src/createServerAdapter'; + +describe('Proxy', () => { + let aborted: boolean = false; + const originalAdapter = createServerAdapter(async request => { + if (request.url.endsWith('/delay')) { + await new Promise(resolve => setTimeout(resolve, 1000)); + aborted = request.signal.aborted; + } + return Response.json({ + method: request.method, + url: request.url, + headers: Object.fromEntries(request.headers.entries()), + body: await request.text(), + }); + }); + const originalServer = createServer(originalAdapter); + const proxyAdapter = createServerAdapter(request => { + const proxyUrl = new URL(request.url); + return fetch( + `http://localhost:${(originalServer.address() as AddressInfo).port}${proxyUrl.pathname}`, + { + method: request.method, + headers: Object.fromEntries( + [...request.headers.entries()].filter(([key]) => key !== 'host'), + ), + body: request.body, + signal: request.signal, + }, + ); + }); + const proxyServer = createServer(proxyAdapter); + let libcurl: any; + beforeAll(async () => { + libcurl = globalThis['libcurl']; + globalThis['libcurl'] = undefined; + aborted = false; + await new Promise(resolve => originalServer.listen(0, resolve)); + await new Promise(resolve => proxyServer.listen(0, resolve)); + }); + afterAll(async () => { + globalThis['libcurl'] = libcurl; + await new Promise(resolve => originalServer.close(resolve)); + await new Promise(resolve => proxyServer.close(resolve)); + }); + it('proxies requests', async () => { + const response = await fetch( + `http://localhost:${(proxyServer.address() as AddressInfo).port}/test`, + { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + test: true, + }), + }, + ); + expect(await response.json()).toMatchObject({ + method: 'POST', + url: `http://localhost:${(originalServer.address() as AddressInfo).port}/test`, + headers: { + 'content-type': 'application/json', + host: `localhost:${(originalServer.address() as AddressInfo).port}`, + }, + body: JSON.stringify({ + test: true, + }), + }); + expect(response.status).toBe(200); + }); + it('handles aborted requests', async () => { + const response = fetch( + `http://localhost:${(proxyServer.address() as AddressInfo).port}/delay`, + { + signal: AbortSignal.timeout(500), + }, + ); + await expect(response).rejects.toThrow(); + await new Promise(resolve => setTimeout(resolve, 1000)); + expect(aborted).toBe(true); + }); +});