From bebc159e0a470a0ea89a8575f620ead3f1b6b594 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Fri, 26 Jul 2024 05:16:06 +0300 Subject: [PATCH] feat(node-fetch): `WritableStream`, `TransformStream` & `CompressionStream` (#1495) --- .changeset/shaggy-taxis-love.md | 8 + packages/fetch/dist/create-node-ponyfill.js | 36 +++- packages/fetch/dist/esm-ponyfill.js | 6 + packages/fetch/dist/global-ponyfill.js | 2 + packages/fetch/dist/index.d.ts | 4 + packages/fetch/dist/node-ponyfill.js | 2 + packages/node-fetch/src/CompressionStream.ts | 36 ++++ .../node-fetch/src/DecompressionStream.ts | 29 +++ packages/node-fetch/src/ReadableStream.ts | 27 ++- packages/node-fetch/src/TransformStream.ts | 76 ++++++++ packages/node-fetch/src/WritableStream.ts | 170 ++++++++++++++++++ packages/node-fetch/src/fetchNodeHttp.ts | 6 +- packages/node-fetch/src/index.ts | 4 + .../test/useCookies.spec.ts | 14 +- .../server/src/plugins/useContentEncoding.ts | 28 ++- packages/server/src/utils.ts | 58 +++--- packages/server/src/uwebsockets.ts | 46 +++-- packages/server/test/compression.spec.ts | 67 ++++--- packages/server/test/request-listener.spec.ts | 18 +- packages/server/test/test-fetch.ts | 4 +- packages/server/test/useErrorHandling.spec.ts | 7 +- 21 files changed, 554 insertions(+), 94 deletions(-) create mode 100644 .changeset/shaggy-taxis-love.md create mode 100644 packages/node-fetch/src/CompressionStream.ts create mode 100644 packages/node-fetch/src/DecompressionStream.ts create mode 100644 packages/node-fetch/src/TransformStream.ts create mode 100644 packages/node-fetch/src/WritableStream.ts diff --git a/.changeset/shaggy-taxis-love.md b/.changeset/shaggy-taxis-love.md new file mode 100644 index 00000000000..be665940c6e --- /dev/null +++ b/.changeset/shaggy-taxis-love.md @@ -0,0 +1,8 @@ +--- +'@whatwg-node/server-plugin-cookies': patch +'@whatwg-node/node-fetch': patch +'@whatwg-node/server': patch +'@whatwg-node/fetch': patch +--- + +Implement \`CompressionStream\`, \`WritableStream\` and \`TransformStream\` diff --git a/packages/fetch/dist/create-node-ponyfill.js b/packages/fetch/dist/create-node-ponyfill.js index b4196b036d2..9009bd37550 100644 --- a/packages/fetch/dist/create-node-ponyfill.js +++ b/packages/fetch/dist/create-node-ponyfill.js @@ -3,6 +3,8 @@ const shouldSkipPonyfill = require('./shouldSkipPonyfill'); module.exports = function createNodePonyfill(opts = {}) { const ponyfills = {}; + ponyfills.URLPattern = globalThis.URLPattern; + // We call this previously to patch `Bun` if (!ponyfills.URLPattern) { const urlPatternModule = require('urlpattern-polyfill'); @@ -10,7 +12,27 @@ module.exports = function createNodePonyfill(opts = {}) { } if (opts.skipPonyfill || shouldSkipPonyfill()) { - return globalThis; + return { + fetch: globalThis.fetch, + Headers: globalThis.Headers, + Request: globalThis.Request, + Response: globalThis.Response, + FormData: globalThis.FormData, + ReadableStream: globalThis.ReadableStream, + WritableStream: globalThis.WritableStream, + TransformStream: globalThis.TransformStream, + CompressionStream: globalThis.CompressionStream, + DecompressionStream: globalThis.DecompressionStream, + Blob: globalThis.Blob, + File: globalThis.File, + crypto: globalThis.crypto, + btoa: globalThis.btoa, + TextEncoder: globalThis.TextEncoder, + TextDecoder: globalThis.TextDecoder, + URLPattern: ponyfills.URLPattern, + URL: globalThis.URL, + URLSearchParams: globalThis.URLSearchParams + }; } const newNodeFetch = require('@whatwg-node/node-fetch'); @@ -25,14 +47,10 @@ module.exports = function createNodePonyfill(opts = {}) { ponyfills.URL = newNodeFetch.URL; ponyfills.URLSearchParams = newNodeFetch.URLSearchParams; - ponyfills.WritableStream = globalThis.WritableStream; - ponyfills.TransformStream = globalThis.TransformStream; - - if (!ponyfills.WritableStream) { - const streamsWeb = require("stream/web"); - ponyfills.WritableStream = streamsWeb.WritableStream; - ponyfills.TransformStream = streamsWeb.TransformStream; - } + ponyfills.WritableStream = newNodeFetch.WritableStream; + ponyfills.TransformStream = newNodeFetch.TransformStream; + ponyfills.CompressionStream = newNodeFetch.CompressionStream; + ponyfills.DecompressionStream = newNodeFetch.DecompressionStream; ponyfills.Blob = newNodeFetch.Blob; ponyfills.File = newNodeFetch.File; diff --git a/packages/fetch/dist/esm-ponyfill.js b/packages/fetch/dist/esm-ponyfill.js index 94b67852df0..219be36d5c6 100644 --- a/packages/fetch/dist/esm-ponyfill.js +++ b/packages/fetch/dist/esm-ponyfill.js @@ -6,6 +6,8 @@ const FormData = window.FormData; const ReadableStream = window.ReadableStream; const WritableStream = window.WritableStream; const TransformStream = window.TransformStream; +const CompressionStream = window.CompressionStream; +const DecompressionStream = window.DecompressionStream; const Blob = window.Blob; const File = window.File; const crypto = window.crypto; @@ -26,6 +28,8 @@ export { ReadableStream, WritableStream, TransformStream, + CompressionStream, + DecompressionStream, Blob, File, crypto, @@ -47,6 +51,8 @@ export function createFetch() { ReadableStream, WritableStream, TransformStream, + CompressionStream, + DecompressionStream, Blob, File, crypto, diff --git a/packages/fetch/dist/global-ponyfill.js b/packages/fetch/dist/global-ponyfill.js index acd56cf3eb1..80b2ac974e2 100644 --- a/packages/fetch/dist/global-ponyfill.js +++ b/packages/fetch/dist/global-ponyfill.js @@ -6,6 +6,8 @@ module.exports.FormData = globalThis.FormData; module.exports.ReadableStream = globalThis.ReadableStream; module.exports.WritableStream = globalThis.WritableStream; module.exports.TransformStream = globalThis.TransformStream; +module.exports.CompressionStream = globalThis.CompressionStream; +module.exports.DecompressionStream = globalThis.DecompressionStream; module.exports.Blob = globalThis.Blob; module.exports.File = globalThis.File; module.exports.crypto = globalThis.crypto; diff --git a/packages/fetch/dist/index.d.ts b/packages/fetch/dist/index.d.ts index aaf2dd66432..d16ddf2f4e3 100644 --- a/packages/fetch/dist/index.d.ts +++ b/packages/fetch/dist/index.d.ts @@ -15,6 +15,8 @@ declare module '@whatwg-node/fetch' { export const ReadableStream: typeof globalThis.ReadableStream; export const WritableStream: typeof globalThis.WritableStream; export const TransformStream: typeof globalThis.TransformStream; + export const CompressionStream: typeof globalThis.CompressionStream; + export const DecompressionStream: typeof globalThis.DecompressionStream; export const Blob: typeof globalThis.Blob; export const File: typeof globalThis.File; export const crypto: typeof globalThis.crypto; @@ -53,6 +55,8 @@ declare module '@whatwg-node/fetch' { ReadableStream: typeof ReadableStream; WritableStream: typeof WritableStream; TransformStream: typeof TransformStream; + CompressionStream: typeof CompressionStream; + DecompressionStream: typeof DecompressionStream; Blob: typeof Blob; File: typeof File; crypto: typeof crypto; diff --git a/packages/fetch/dist/node-ponyfill.js b/packages/fetch/dist/node-ponyfill.js index 601112710aa..fc41a0c43ab 100644 --- a/packages/fetch/dist/node-ponyfill.js +++ b/packages/fetch/dist/node-ponyfill.js @@ -18,6 +18,8 @@ module.exports.FormData = ponyfills.FormData; module.exports.ReadableStream = ponyfills.ReadableStream; module.exports.WritableStream = ponyfills.WritableStream; module.exports.TransformStream = ponyfills.TransformStream; +module.exports.CompressionStream = ponyfills.CompressionStream; +module.exports.DecompressionStream = ponyfills.DecompressionStream; module.exports.Blob = ponyfills.Blob; module.exports.File = ponyfills.File; module.exports.crypto = ponyfills.crypto; diff --git a/packages/node-fetch/src/CompressionStream.ts b/packages/node-fetch/src/CompressionStream.ts new file mode 100644 index 00000000000..11f7b5bc184 --- /dev/null +++ b/packages/node-fetch/src/CompressionStream.ts @@ -0,0 +1,36 @@ +import { createBrotliCompress, createDeflate, createDeflateRaw, createGzip } from 'node:zlib'; +import { PonyfillTransformStream } from './TransformStream.js'; + +export type PonyfillCompressionFormat = + | 'x-gzip' + | 'gzip' + | 'x-deflate' + | 'deflate' + | 'deflate-raw' + | 'br'; + +export class PonyfillCompressionStream + extends PonyfillTransformStream + implements CompressionStream +{ + constructor(compressionFormat: PonyfillCompressionFormat) { + switch (compressionFormat) { + case 'x-gzip': + case 'gzip': + super(createGzip()); + break; + case 'x-deflate': + case 'deflate': + super(createDeflate()); + break; + case 'deflate-raw': + super(createDeflateRaw()); + break; + case 'br': + super(createBrotliCompress()); + break; + default: + throw new Error(`Unsupported compression format: ${compressionFormat}`); + } + } +} diff --git a/packages/node-fetch/src/DecompressionStream.ts b/packages/node-fetch/src/DecompressionStream.ts new file mode 100644 index 00000000000..4d39fa78d5b --- /dev/null +++ b/packages/node-fetch/src/DecompressionStream.ts @@ -0,0 +1,29 @@ +import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'node:zlib'; +import { PonyfillCompressionFormat } from './CompressionStream.js'; +import { PonyfillTransformStream } from './TransformStream.js'; + +export class PonyfillDecompressionStream + extends PonyfillTransformStream + implements DecompressionStream +{ + constructor(compressionFormat: PonyfillCompressionFormat) { + switch (compressionFormat) { + case 'x-gzip': + case 'gzip': + super(createGunzip()); + break; + case 'x-deflate': + case 'deflate': + super(createInflate()); + break; + case 'deflate-raw': + super(createInflateRaw()); + break; + case 'br': + super(createBrotliDecompress()); + break; + default: + throw new Error(`Unsupported compression format: ${compressionFormat}`); + } + } +} diff --git a/packages/node-fetch/src/ReadableStream.ts b/packages/node-fetch/src/ReadableStream.ts index eca72f21c71..6313bd6f7f0 100644 --- a/packages/node-fetch/src/ReadableStream.ts +++ b/packages/node-fetch/src/ReadableStream.ts @@ -1,4 +1,5 @@ import { Readable } from 'stream'; +import { PonyfillWritableStream } from './WritableStream'; function createController( desiredSize: number, @@ -168,13 +169,23 @@ export class PonyfillReadableStream implements ReadableStream { } async pipeTo(destination: WritableStream): Promise { - const writer = destination.getWriter(); - await writer.ready; - for await (const chunk of this.readable) { - await writer.write(chunk); + if (isPonyfillWritableStream(destination)) { + return new Promise((resolve, reject) => { + this.readable.pipe(destination.writable); + destination.writable.once('finish', resolve); + destination.writable.once('error', reject); + }); + } else { + const writer = destination.getWriter(); + try { + for await (const chunk of this) { + await writer.write(chunk); + } + await writer.close(); + } catch (err) { + await writer.abort(err); + } } - await writer.ready; - return writer.close(); } pipeThrough({ @@ -192,3 +203,7 @@ export class PonyfillReadableStream implements ReadableStream { return isReadableStream(instance); } } + +function isPonyfillWritableStream(obj: any): obj is PonyfillWritableStream { + return obj?.writable != null; +} diff --git a/packages/node-fetch/src/TransformStream.ts b/packages/node-fetch/src/TransformStream.ts new file mode 100644 index 00000000000..2e54c8ef6de --- /dev/null +++ b/packages/node-fetch/src/TransformStream.ts @@ -0,0 +1,76 @@ +import { Transform } from 'node:stream'; +import { PonyfillReadableStream } from './ReadableStream.js'; +import { PonyfillWritableStream } from './WritableStream.js'; + +export class PonyfillTransformStream implements TransformStream { + transform: Transform; + writable: PonyfillWritableStream; + readable: PonyfillReadableStream; + + constructor(transformer?: Transformer | Transform) { + if (transformer instanceof Transform) { + this.transform = transformer; + } else if (transformer) { + const controller: TransformStreamDefaultController = { + enqueue(chunk: O) { + transform.push(chunk); + }, + error(reason: any) { + transform.destroy(reason); + }, + terminate() { + transform.end(); + }, + get desiredSize() { + return transform.writableLength; + }, + }; + const transform = new Transform({ + read() {}, + write(chunk: I, _encoding: BufferEncoding, callback: (error?: Error | null) => void) { + try { + const result = transformer.transform?.(chunk, controller); + if (result instanceof Promise) { + result.then( + () => { + callback(); + }, + err => { + callback(err); + }, + ); + } else { + callback(); + } + } catch (err) { + callback(err as Error); + } + }, + final(callback: (error?: Error | null) => void) { + try { + const result = transformer.flush?.(controller); + if (result instanceof Promise) { + result.then( + () => { + callback(); + }, + err => { + callback(err); + }, + ); + } else { + callback(); + } + } catch (err) { + callback(err as Error); + } + }, + }); + this.transform = transform; + } else { + this.transform = new Transform(); + } + this.writable = new PonyfillWritableStream(this.transform); + this.readable = new PonyfillReadableStream(this.transform); + } +} diff --git a/packages/node-fetch/src/WritableStream.ts b/packages/node-fetch/src/WritableStream.ts new file mode 100644 index 00000000000..24b387293d6 --- /dev/null +++ b/packages/node-fetch/src/WritableStream.ts @@ -0,0 +1,170 @@ +import { Writable } from 'stream'; + +export class PonyfillWritableStream implements WritableStream { + writable: Writable; + constructor(underlyingSink?: UnderlyingSink | Writable) { + if (underlyingSink instanceof Writable) { + this.writable = underlyingSink; + } else if (underlyingSink) { + const writable = new Writable({ + write(chunk: W, _encoding: BufferEncoding, callback: (error?: Error | null) => void) { + try { + const result = underlyingSink.write?.(chunk, controller); + if (result instanceof Promise) { + result.then( + () => { + callback(); + }, + err => { + callback(err); + }, + ); + } else { + callback(); + } + } catch (err) { + callback(err as Error); + } + }, + final(callback: (error?: Error | null) => void) { + const result = underlyingSink.close?.(); + if (result instanceof Promise) { + result.then( + () => { + callback(); + }, + err => { + callback(err); + }, + ); + } else { + callback(); + } + }, + }); + this.writable = writable; + let onabort: EventListener | null; + let reason: any; + const controller: WritableStreamDefaultController = { + signal: { + any(signals) { + return AbortSignal.any([...signals]); + }, + get reason() { + return reason; + }, + get aborted() { + return writable.destroyed; + }, + addEventListener: (_event: string, eventListener: EventListener) => { + writable.once('error', eventListener); + writable.once('close', eventListener); + }, + removeEventListener: (_event: string, eventListener: EventListener) => { + writable.off('error', eventListener); + writable.off('close', eventListener); + }, + dispatchEvent: (_event: Event) => { + return false; + }, + get onabort() { + return onabort; + }, + set onabort(value) { + if (onabort) { + this.removeEventListener('abort', onabort); + } + onabort = value; + if (onabort) { + this.addEventListener('abort', onabort); + } + }, + throwIfAborted() { + if (writable.destroyed) { + throw reason; + } + }, + }, + error: e => { + this.writable.destroy(e); + }, + }; + this.writable.once('error', err => { + reason = err; + }); + } else { + this.writable = new Writable(); + } + } + + getWriter(): WritableStreamDefaultWriter { + const writable = this.writable; + return { + closed: new Promise(resolve => { + writable.once('close', () => { + resolve(undefined); + }); + }), + get desiredSize() { + return writable.writableLength; + }, + ready: new Promise(resolve => { + writable.once('drain', () => { + resolve(undefined); + }); + }), + releaseLock() { + // no-op + }, + write(chunk: W) { + return new Promise((resolve, reject) => { + writable.write(chunk, (err: Error | null | undefined) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + }, + close() { + return new Promise((resolve, reject) => { + writable.end((err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + }, + abort(reason) { + return new Promise(resolve => { + writable.destroy(reason); + resolve(); + }); + }, + }; + } + + close(): Promise { + return new Promise((resolve, reject) => { + this.writable.end((err: Error | null) => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + } + + abort(reason: any): Promise { + return new Promise(resolve => { + this.writable.destroy(reason); + resolve(); + }); + } + + locked = false; +} diff --git a/packages/node-fetch/src/fetchNodeHttp.ts b/packages/node-fetch/src/fetchNodeHttp.ts index 16fe74bb193..5883a09f265 100644 --- a/packages/node-fetch/src/fetchNodeHttp.ts +++ b/packages/node-fetch/src/fetchNodeHttp.ts @@ -1,7 +1,7 @@ import { request as httpRequest } from 'http'; import { request as httpsRequest } from 'https'; import { PassThrough, Readable } from 'stream'; -import { createBrotliDecompress, createGunzip, createInflate } from 'zlib'; +import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'zlib'; import { PonyfillAbortError } from './AbortError.js'; import { PonyfillRequest } from './Request.js'; import { PonyfillResponse } from './Response.js'; @@ -57,6 +57,10 @@ export function fetchNodeHttp( case 'deflate': responseBody = nodeResponse.pipe(createInflate()); break; + case 'x-deflate-raw': + case 'deflate-raw': + responseBody = nodeResponse.pipe(createInflateRaw()); + break; case 'br': responseBody = nodeResponse.pipe(createBrotliDecompress()); break; diff --git a/packages/node-fetch/src/index.ts b/packages/node-fetch/src/index.ts index b5e49ad6ee6..de504e5a94d 100644 --- a/packages/node-fetch/src/index.ts +++ b/packages/node-fetch/src/index.ts @@ -14,3 +14,7 @@ export { } from './TextEncoderDecoder.js'; export { PonyfillURL as URL } from './URL.js'; export { PonyfillURLSearchParams as URLSearchParams } from './URLSearchParams.js'; +export { PonyfillWritableStream as WritableStream } from './WritableStream.js'; +export { PonyfillTransformStream as TransformStream } from './TransformStream.js'; +export { PonyfillCompressionStream as CompressionStream } from './CompressionStream.js'; +export { PonyfillDecompressionStream as DecompressionStream } from './DecompressionStream.js'; diff --git a/packages/server-plugin-cookies/test/useCookies.spec.ts b/packages/server-plugin-cookies/test/useCookies.spec.ts index f479f3eb600..5af188ec752 100644 --- a/packages/server-plugin-cookies/test/useCookies.spec.ts +++ b/packages/server-plugin-cookies/test/useCookies.spec.ts @@ -13,7 +13,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/', { + const response = await serverAdapter.fetch('http://localhost', { headers: { cookie: 'foo=bar', }, @@ -31,7 +31,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/'); + const response = await serverAdapter.fetch('http://localhost'); await response.text(); expect(response.headers.getSetCookie?.()).toMatchInlineSnapshot(` [ @@ -57,7 +57,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/'); + const response = await serverAdapter.fetch('http://localhost'); await response.text(); expect(response.headers.getSetCookie?.()).toMatchInlineSnapshot(` [ @@ -75,7 +75,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/'); + const response = await serverAdapter.fetch('http://localhost'); await response.text(); expect(response.headers.getSetCookie?.()).toMatchInlineSnapshot(` [ @@ -93,7 +93,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/', { + const response = await serverAdapter.fetch('http://localhost', { headers: { cookie: 'foo=bar', }, @@ -116,7 +116,7 @@ describe('Cookie Management', () => { plugins: [useCookies()], }, ); - const response = await serverAdapter.fetch('/'); + const response = await serverAdapter.fetch('http://localhost'); await response.text(); expect(response.headers.getSetCookie?.()).toMatchInlineSnapshot(` [ @@ -129,7 +129,7 @@ describe('Cookie Management', () => { const serverAdapter = createServerAdapter(() => { return new Response('OK'); }); - const response = await serverAdapter.fetch('/'); + const response = await serverAdapter.fetch('http://localhost'); await response.text(); expect(response.headers.getSetCookie?.()).toMatchInlineSnapshot(`[]`); }); diff --git a/packages/server/src/plugins/useContentEncoding.ts b/packages/server/src/plugins/useContentEncoding.ts index 04c07b5d52c..0a5cb5ec260 100644 --- a/packages/server/src/plugins/useContentEncoding.ts +++ b/packages/server/src/plugins/useContentEncoding.ts @@ -11,7 +11,7 @@ export function useContentEncoding(): ServerAdapterPlugin - getSupportedEncodings().includes(encoding as CompressionFormat), + getSupportedEncodings(fetchAPI).includes(encoding as CompressionFormat), ) ) { endResponse( @@ -25,12 +25,25 @@ export function useContentEncoding(): ServerAdapterPlugin(): ServerAdapterPlugin - getSupportedEncodings().includes(encoding as CompressionFormat), + getSupportedEncodings(fetchAPI).includes(encoding as CompressionFormat), ); if (supportedEncoding) { - const compressionStream = new CompressionStream(supportedEncoding as CompressionFormat); + const compressionStream = new fetchAPI.CompressionStream( + supportedEncoding as CompressionFormat, + ); // To calculate final content-length const contentLength = response.headers.get('content-length'); if (contentLength) { @@ -89,7 +104,8 @@ export function useContentEncoding(): ServerAdapterPlugin { return ( @@ -140,6 +140,15 @@ export function normalizeNodeRequest( const nodeResponse = nodeRequestResponseMap.get(nodeRequest); nodeRequestResponseMap.delete(nodeRequest); + let normalizedHeaders: Record = nodeRequest.headers; + if (nodeRequest.headers?.[':method']) { + normalizedHeaders = {}; + for (const key in nodeRequest.headers) { + if (!key.startsWith(':')) { + normalizedHeaders[key] = nodeRequest.headers[key]; + } + } + } if (nodeResponse?.once) { let sendAbortSignal: VoidFunction; @@ -171,7 +180,7 @@ export function normalizeNodeRequest( if (nodeRequest.method === 'GET' || nodeRequest.method === 'HEAD') { return new RequestCtor(fullUrl, { method: nodeRequest.method, - headers: nodeRequest.headers, + headers: normalizedHeaders, signal, }); } @@ -187,14 +196,14 @@ export function normalizeNodeRequest( if (isRequestBody(maybeParsedBody)) { return new RequestCtor(fullUrl, { method: nodeRequest.method, - headers: nodeRequest.headers, + headers: normalizedHeaders, body: maybeParsedBody, signal, }); } const request = new RequestCtor(fullUrl, { method: nodeRequest.method, - headers: nodeRequest.headers, + headers: normalizedHeaders, signal, }); if (!request.headers.get('content-type')?.includes('json')) { @@ -225,7 +234,7 @@ It will affect your performance. Please check our Bun integration recipe, and av } return new RequestCtor(fullUrl, { method: nodeRequest.method, - headers: nodeRequest.headers, + headers: normalizedHeaders, duplex: 'half', body: new ReadableStream({ start(controller) { @@ -250,7 +259,7 @@ It will affect your performance. Please check our Bun integration recipe, and av // perf: instead of spreading the object, we can just pass it as is and it performs better return new RequestCtor(fullUrl, { method: nodeRequest.method, - headers: nodeRequest.headers, + headers: normalizedHeaders, body: rawRequest as any, duplex: 'half', signal, @@ -595,26 +604,35 @@ export function handleAbortSignalAndPromiseResponse( export const decompressedResponseMap = new WeakMap(); -let SUPPORTED_ENCODINGS: CompressionFormat[]; - -export function getSupportedEncodings() { - if (!SUPPORTED_ENCODINGS) { - // TODO: deflate-raw is buggy in Node.js - const possibleEncodings: CompressionFormat[] = ['deflate', 'gzip' /* 'deflate-raw' */]; - SUPPORTED_ENCODINGS = possibleEncodings.filter(encoding => { +const supportedEncodingsByFetchAPI = new WeakMap(); + +export function getSupportedEncodings(fetchAPI: FetchAPI) { + let supportedEncodings = supportedEncodingsByFetchAPI.get(fetchAPI); + if (!supportedEncodings) { + const possibleEncodings = ['deflate', 'gzip', 'deflate-raw', 'br'] as CompressionFormat[]; + supportedEncodings = possibleEncodings.filter(encoding => { + // deflate-raw is not supported in Node.js >v20 + if ( + globalThis.process?.version?.startsWith('v2') && + fetchAPI.DecompressionStream === globalThis.DecompressionStream && + encoding === 'deflate-raw' + ) { + return false; + } try { // eslint-disable-next-line no-new - new DecompressionStream(encoding); + new fetchAPI.DecompressionStream(encoding); return true; } catch { return false; } }); + supportedEncodingsByFetchAPI.set(fetchAPI, supportedEncodings); } - return SUPPORTED_ENCODINGS; + return supportedEncodings; } -export function handleResponseDecompression(response: Response, ResponseCtor: typeof Response) { +export function handleResponseDecompression(response: Response, fetchAPI: FetchAPI) { const contentEncodingHeader = response?.headers.get('content-encoding'); if (!contentEncodingHeader || contentEncodingHeader === 'none') { return response; @@ -628,20 +646,20 @@ export function handleResponseDecompression(response: Response, ResponseCtor: ty const contentEncodings = contentEncodingHeader.split(','); if ( !contentEncodings.every(encoding => - getSupportedEncodings().includes(encoding as CompressionFormat), + getSupportedEncodings(fetchAPI).includes(encoding as CompressionFormat), ) ) { - return new ResponseCtor(`Unsupported 'Content-Encoding': ${contentEncodingHeader}`, { + return new fetchAPI.Response(`Unsupported 'Content-Encoding': ${contentEncodingHeader}`, { status: 415, statusText: 'Unsupported Media Type', }); } for (const contentEncoding of contentEncodings) { decompressedBody = decompressedBody.pipeThrough( - new DecompressionStream(contentEncoding as CompressionFormat), + new fetchAPI.DecompressionStream(contentEncoding as CompressionFormat), ); } - decompressedResponse = new ResponseCtor(decompressedBody, response); + decompressedResponse = new fetchAPI.Response(decompressedBody, response); decompressedResponseMap.set(response, decompressedResponse); } return decompressedResponse; diff --git a/packages/server/src/uwebsockets.ts b/packages/server/src/uwebsockets.ts index 6adbe8e06f5..968939716a8 100644 --- a/packages/server/src/uwebsockets.ts +++ b/packages/server/src/uwebsockets.ts @@ -39,18 +39,41 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequ let body: ReadableStream | undefined; const method = req.getMethod(); if (method !== 'get' && method !== 'head') { - body = new fetchAPI.ReadableStream({}); - const readable = (body as any).readable as Readable; - signal.addEventListener('abort', () => { - readable.push(null); + let controller: ReadableStreamDefaultController; + body = new fetchAPI.ReadableStream({ + start(c) { + controller = c; + }, }); - res.onData(function (ab, isLast) { - const chunk = Buffer.from(ab, 0, ab.byteLength); - readable.push(Buffer.from(chunk)); - if (isLast) { + const readable = (body as any).readable as Readable; + if (readable) { + signal.addEventListener('abort', () => { readable.push(null); - } - }); + }); + res.onData(function (ab, isLast) { + const chunk = Buffer.from(ab, 0, ab.byteLength); + readable.push(Buffer.from(chunk)); + if (isLast) { + readable.push(null); + } + }); + } else { + let closed = false; + signal.addEventListener('abort', () => { + if (!closed) { + closed = true; + controller.close(); + } + }); + res.onData(function (ab, isLast) { + const chunk = Buffer.from(ab, 0, ab.byteLength); + controller.enqueue(Buffer.from(chunk)); + if (isLast) { + closed = true; + controller.close(); + } + }); + } } const headers = new fetchAPI.Headers(); req.forEach((key, value) => { @@ -66,6 +89,9 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequ headers, body: body as any, signal, + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore - not in the TS types yet + duplex: 'half', }); } diff --git a/packages/server/test/compression.spec.ts b/packages/server/test/compression.spec.ts index 3547ee1e004..28199b69981 100644 --- a/packages/server/test/compression.spec.ts +++ b/packages/server/test/compression.spec.ts @@ -5,22 +5,22 @@ import { runTestsForEachServerImpl } from './test-server'; describe('Compression', () => { const exampleData = JSON.stringify(new Array(1000).fill('Hello, World!').join('')); - const encodings = [...getSupportedEncodings(), 'none']; describe('Adapter', () => { runTestsForEachFetchImpl( (_, { fetchAPI, createServerAdapter }) => { + const encodings = [...getSupportedEncodings(fetchAPI), 'none']; for (const encoding of encodings) { describe(encoding, () => { it('from the server to the client with "accept-encoding"', async () => { const adapter = createServerAdapter(() => new fetchAPI.Response(exampleData), { plugins: [useContentEncoding()], }); - let res = await adapter.fetch('/', { + let res = await adapter.fetch('http://localhost', { headers: { 'accept-encoding': encoding, }, }); - res = handleResponseDecompression(res, fetchAPI.Response); + res = handleResponseDecompression(res, fetchAPI); expect(res.status).toEqual(200); await expect(res.text()).resolves.toEqual(exampleData); }); @@ -38,9 +38,12 @@ describe('Compression', () => { }, ); if (encoding === 'none') { - const res = await adapter.fetch('/', { + const res = await adapter.fetch('http://localhost', { method: 'POST', body: exampleData, + headers: { + 'content-length': String(Buffer.byteLength(exampleData)), + }, }); await expect(res.json()).resolves.toEqual({ body: exampleData, @@ -48,7 +51,7 @@ describe('Compression', () => { }); return; } - const stream = new CompressionStream(encoding as CompressionFormat); + const stream = new fetchAPI.CompressionStream(encoding as CompressionFormat); const writer = stream.writable.getWriter(); writer.write(exampleData); writer.close(); @@ -64,14 +67,14 @@ describe('Compression', () => { } } const uint8Array = new Uint8Array(chunks); - let res = await adapter.fetch('/', { + let res = await adapter.fetch('http://localhost', { method: 'POST', headers: { 'content-encoding': encoding, }, body: uint8Array, }); - res = handleResponseDecompression(res, fetchAPI.Response); + res = handleResponseDecompression(res, fetchAPI); const { body, contentLength } = await res.json(); expect(body).toEqual(exampleData); expect(Number(contentLength)).toBeLessThan(Buffer.byteLength(body)); @@ -82,7 +85,7 @@ describe('Compression', () => { { noLibCurl: true }, ); }); - runTestsForEachFetchImpl((_, { fetchAPI, createServerAdapter }) => { + runTestsForEachFetchImpl((implName, { fetchAPI, createServerAdapter }) => { runTestsForEachServerImpl(server => { it(`from the server to the client without 'accept-encoding'`, async () => { let req: Request | undefined; @@ -109,7 +112,12 @@ describe('Compression', () => { Buffer.byteLength(exampleData), ); }); + const encodings = [...getSupportedEncodings(fetchAPI), 'none']; for (const encoding of encodings) { + // Skip deflate-raw with libcurl because it doesn't support it + if (encoding === 'deflate-raw' && implName === 'libcurl') { + continue; + } describe(encoding, () => { it(`from the server to the client`, async () => { const adapter = createServerAdapter( @@ -136,14 +144,12 @@ describe('Compression', () => { const returnedData = await res.text(); expect(returnedData).toEqual(exampleData); const contentLength = res.headers.get('content-length'); - if (contentLength) { - const numberContentLength = Number(contentLength); - const origSize = Buffer.byteLength(exampleData); - if (encoding === 'none') { - expect(numberContentLength).toEqual(origSize); - } else { - expect(numberContentLength).toBeLessThan(origSize); - } + const numberContentLength = Number(contentLength); + const origSize = Buffer.byteLength(exampleData); + if (encoding === 'none' && contentLength) { + expect(numberContentLength).toEqual(origSize); + } else { + expect(numberContentLength).toBeLessThan(origSize); } }); it(`from the client to the server`, async () => { @@ -164,13 +170,22 @@ describe('Compression', () => { const res = await fetchAPI.fetch(server.url, { method: 'POST', body: exampleData, + headers: { + 'content-length': String(Buffer.byteLength(exampleData)), + }, }); - const { body, contentLength } = await res.json(); - expect(body).toEqual(exampleData); - expect(Number(contentLength)).toEqual(Buffer.byteLength(exampleData)); + const resText = await res.text(); + let resJson: { body: string; contentLength: string }; + try { + resJson = JSON.parse(resText); + } catch { + throw new Error(`Could not parse JSON: ${resText}`); + } + expect(resJson.body).toEqual(exampleData); + expect(Number(resJson.contentLength)).toEqual(Buffer.byteLength(exampleData)); return; } - const stream = new CompressionStream(encoding as CompressionFormat); + const stream = new fetchAPI.CompressionStream(encoding as CompressionFormat); const writer = stream.writable.getWriter(); writer.write(exampleData); writer.close(); @@ -193,9 +208,15 @@ describe('Compression', () => { }, body: uint8Array, }); - const { body, contentLength } = await res.json(); - expect(body).toEqual(exampleData); - expect(Number(contentLength)).toBeLessThan(Buffer.byteLength(body)); + const resText = await res.text(); + let resJson: { body: string; contentLength: string }; + try { + resJson = JSON.parse(resText); + } catch { + throw new Error(`Could not parse JSON: ${resText}`); + } + expect(resJson.body).toEqual(exampleData); + expect(Number(resJson.contentLength)).toBeLessThan(Buffer.byteLength(resJson.body)); }); }); } diff --git a/packages/server/test/request-listener.spec.ts b/packages/server/test/request-listener.spec.ts index 9931f0bbdc6..87b6d33b9ab 100644 --- a/packages/server/test/request-listener.spec.ts +++ b/packages/server/test/request-listener.spec.ts @@ -6,15 +6,17 @@ const methodsWithoutBody = ['GET', 'DELETE']; const methodsWithBody = ['POST', 'PUT', 'PATCH']; describe('Request Listener', () => { - runTestsForEachFetchImpl((_, { createServerAdapter, fetchAPI }) => { + runTestsForEachFetchImpl((impl, { createServerAdapter, fetchAPI }) => { runTestsForEachServerImpl(testServer => { [...methodsWithBody, ...methodsWithoutBody].forEach(method => { + // PATCH is buggy in Native + if (impl === 'native' && method === 'PATCH') return; it(`should handle regular requests with ${method}`, () => { const requestInit: RequestInit = { method, headers: { - accept: 'application/json', - 'content-type': 'application/json', + accept: 'application/json;charset=utf-8', + 'content-type': 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, }; @@ -24,7 +26,7 @@ describe('Request Listener', () => { const expectedResponse = new fetchAPI.Response(getRegularResponseBody(), { status: 200, headers: { - 'content-type': 'application/json', + accept: 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, }); @@ -41,7 +43,7 @@ describe('Request Listener', () => { const requestInit: RequestInit = { method, headers: { - accept: 'application/json', + accept: 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, }; @@ -51,7 +53,7 @@ describe('Request Listener', () => { const expectedResponse = new fetchAPI.Response(getIncrementalResponseBody(), { status: 200, headers: { - 'content-type': 'application/json', + accept: 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, }); @@ -68,7 +70,7 @@ describe('Request Listener', () => { const requestInit: RequestInit = { method, headers: { - accept: 'application/json', + accept: 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, // @ts-expect-error duplex is not part of the RequestInit type yet @@ -80,7 +82,7 @@ describe('Request Listener', () => { const expectedResponse = new fetchAPI.Response(getRegularResponseBody(), { status: 200, headers: { - 'content-type': 'application/json', + 'content-type': 'application/json;charset=utf-8', 'random-header': Date.now().toString(), }, }); diff --git a/packages/server/test/test-fetch.ts b/packages/server/test/test-fetch.ts index 87f40d92115..bbe43eae6ae 100644 --- a/packages/server/test/test-fetch.ts +++ b/packages/server/test/test-fetch.ts @@ -66,7 +66,9 @@ export function runTestsForEachFetchImpl( }); }); }); - if (!opts.noNativeFetch) { + const nodeMajor = parseInt(process.versions.node.split('.')[0], 10); + // Node 18 is leaking memory with native fetch + if (!opts.noNativeFetch && process.env.LEAK_TEST && nodeMajor >= 22) { describe('Native', () => { const fetchAPI = createFetch({ skipPonyfill: true }); callback('native', { diff --git a/packages/server/test/useErrorHandling.spec.ts b/packages/server/test/useErrorHandling.spec.ts index b9f2b494ba1..9fed1be0229 100644 --- a/packages/server/test/useErrorHandling.spec.ts +++ b/packages/server/test/useErrorHandling.spec.ts @@ -4,7 +4,7 @@ import { runTestsForEachFetchImpl } from './test-fetch.js'; describe('useErrorHandling', () => { runTestsForEachFetchImpl( (_, { createServerAdapter, fetchAPI }) => { - it('should return 500 when error is thrown', async () => { + it('should return error response when error is thrown', async () => { const errorHandler = jest.fn(); let request: Request | undefined; const router = createServerAdapter( @@ -18,8 +18,9 @@ describe('useErrorHandling', () => { }, ); const response = await router.fetch('http://localhost/greetings/John'); - expect(response.status).toBe(500); - expect(response.statusText).toBe('Internal Server Error'); + const errRes = fetchAPI.Response.error(); + expect(response.status).toBe(errRes.status); + expect(response.statusText).toBe(errRes.statusText); const text = await response.text(); expect(text).toHaveLength(0); expect(errorHandler).toHaveBeenCalledWith(new Error('Unexpected error'), request, {});