Skip to content

Commit

Permalink
feat(node-fetch): WritableStream, TransformStream & `CompressionS…
Browse files Browse the repository at this point in the history
…tream` (#1495)
  • Loading branch information
ardatan authored Jul 26, 2024
1 parent c6ceb19 commit bebc159
Show file tree
Hide file tree
Showing 21 changed files with 554 additions and 94 deletions.
8 changes: 8 additions & 0 deletions .changeset/shaggy-taxis-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@whatwg-node/server-plugin-cookies': patch
'@whatwg-node/node-fetch': patch
'@whatwg-node/server': patch
'@whatwg-node/fetch': patch
---

Implement \`CompressionStream\`, \`WritableStream\` and \`TransformStream\`
36 changes: 27 additions & 9 deletions packages/fetch/dist/create-node-ponyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,36 @@ const shouldSkipPonyfill = require('./shouldSkipPonyfill');
module.exports = function createNodePonyfill(opts = {}) {
const ponyfills = {};

ponyfills.URLPattern = globalThis.URLPattern;

// We call this previously to patch `Bun`
if (!ponyfills.URLPattern) {
const urlPatternModule = require('urlpattern-polyfill');
ponyfills.URLPattern = urlPatternModule.URLPattern;
}

if (opts.skipPonyfill || shouldSkipPonyfill()) {
return globalThis;
return {
fetch: globalThis.fetch,
Headers: globalThis.Headers,
Request: globalThis.Request,
Response: globalThis.Response,
FormData: globalThis.FormData,
ReadableStream: globalThis.ReadableStream,
WritableStream: globalThis.WritableStream,
TransformStream: globalThis.TransformStream,
CompressionStream: globalThis.CompressionStream,
DecompressionStream: globalThis.DecompressionStream,
Blob: globalThis.Blob,
File: globalThis.File,
crypto: globalThis.crypto,
btoa: globalThis.btoa,
TextEncoder: globalThis.TextEncoder,
TextDecoder: globalThis.TextDecoder,
URLPattern: ponyfills.URLPattern,
URL: globalThis.URL,
URLSearchParams: globalThis.URLSearchParams
};
}

const newNodeFetch = require('@whatwg-node/node-fetch');
Expand All @@ -25,14 +47,10 @@ module.exports = function createNodePonyfill(opts = {}) {
ponyfills.URL = newNodeFetch.URL;
ponyfills.URLSearchParams = newNodeFetch.URLSearchParams;

ponyfills.WritableStream = globalThis.WritableStream;
ponyfills.TransformStream = globalThis.TransformStream;

if (!ponyfills.WritableStream) {
const streamsWeb = require("stream/web");
ponyfills.WritableStream = streamsWeb.WritableStream;
ponyfills.TransformStream = streamsWeb.TransformStream;
}
ponyfills.WritableStream = newNodeFetch.WritableStream;
ponyfills.TransformStream = newNodeFetch.TransformStream;
ponyfills.CompressionStream = newNodeFetch.CompressionStream;
ponyfills.DecompressionStream = newNodeFetch.DecompressionStream;

ponyfills.Blob = newNodeFetch.Blob;
ponyfills.File = newNodeFetch.File;
Expand Down
6 changes: 6 additions & 0 deletions packages/fetch/dist/esm-ponyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const FormData = window.FormData;
const ReadableStream = window.ReadableStream;
const WritableStream = window.WritableStream;
const TransformStream = window.TransformStream;
const CompressionStream = window.CompressionStream;
const DecompressionStream = window.DecompressionStream;
const Blob = window.Blob;
const File = window.File;
const crypto = window.crypto;
Expand All @@ -26,6 +28,8 @@ export {
ReadableStream,
WritableStream,
TransformStream,
CompressionStream,
DecompressionStream,
Blob,
File,
crypto,
Expand All @@ -47,6 +51,8 @@ export function createFetch() {
ReadableStream,
WritableStream,
TransformStream,
CompressionStream,
DecompressionStream,
Blob,
File,
crypto,
Expand Down
2 changes: 2 additions & 0 deletions packages/fetch/dist/global-ponyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module.exports.FormData = globalThis.FormData;
module.exports.ReadableStream = globalThis.ReadableStream;
module.exports.WritableStream = globalThis.WritableStream;
module.exports.TransformStream = globalThis.TransformStream;
module.exports.CompressionStream = globalThis.CompressionStream;
module.exports.DecompressionStream = globalThis.DecompressionStream;
module.exports.Blob = globalThis.Blob;
module.exports.File = globalThis.File;
module.exports.crypto = globalThis.crypto;
Expand Down
4 changes: 4 additions & 0 deletions packages/fetch/dist/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ declare module '@whatwg-node/fetch' {
export const ReadableStream: typeof globalThis.ReadableStream;
export const WritableStream: typeof globalThis.WritableStream;
export const TransformStream: typeof globalThis.TransformStream;
export const CompressionStream: typeof globalThis.CompressionStream;
export const DecompressionStream: typeof globalThis.DecompressionStream;
export const Blob: typeof globalThis.Blob;
export const File: typeof globalThis.File;
export const crypto: typeof globalThis.crypto;
Expand Down Expand Up @@ -53,6 +55,8 @@ declare module '@whatwg-node/fetch' {
ReadableStream: typeof ReadableStream;
WritableStream: typeof WritableStream;
TransformStream: typeof TransformStream;
CompressionStream: typeof CompressionStream;
DecompressionStream: typeof DecompressionStream;
Blob: typeof Blob;
File: typeof File;
crypto: typeof crypto;
Expand Down
2 changes: 2 additions & 0 deletions packages/fetch/dist/node-ponyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ module.exports.FormData = ponyfills.FormData;
module.exports.ReadableStream = ponyfills.ReadableStream;
module.exports.WritableStream = ponyfills.WritableStream;
module.exports.TransformStream = ponyfills.TransformStream;
module.exports.CompressionStream = ponyfills.CompressionStream;
module.exports.DecompressionStream = ponyfills.DecompressionStream;
module.exports.Blob = ponyfills.Blob;
module.exports.File = ponyfills.File;
module.exports.crypto = ponyfills.crypto;
Expand Down
36 changes: 36 additions & 0 deletions packages/node-fetch/src/CompressionStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { createBrotliCompress, createDeflate, createDeflateRaw, createGzip } from 'node:zlib';
import { PonyfillTransformStream } from './TransformStream.js';

export type PonyfillCompressionFormat =
| 'x-gzip'
| 'gzip'
| 'x-deflate'
| 'deflate'
| 'deflate-raw'
| 'br';

export class PonyfillCompressionStream
extends PonyfillTransformStream
implements CompressionStream
{
constructor(compressionFormat: PonyfillCompressionFormat) {
switch (compressionFormat) {
case 'x-gzip':
case 'gzip':
super(createGzip());
break;
case 'x-deflate':
case 'deflate':
super(createDeflate());
break;
case 'deflate-raw':
super(createDeflateRaw());
break;
case 'br':
super(createBrotliCompress());
break;
default:
throw new Error(`Unsupported compression format: ${compressionFormat}`);
}
}
}
29 changes: 29 additions & 0 deletions packages/node-fetch/src/DecompressionStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { createBrotliDecompress, createGunzip, createInflate, createInflateRaw } from 'node:zlib';
import { PonyfillCompressionFormat } from './CompressionStream.js';
import { PonyfillTransformStream } from './TransformStream.js';

export class PonyfillDecompressionStream
extends PonyfillTransformStream
implements DecompressionStream
{
constructor(compressionFormat: PonyfillCompressionFormat) {
switch (compressionFormat) {
case 'x-gzip':
case 'gzip':
super(createGunzip());
break;
case 'x-deflate':
case 'deflate':
super(createInflate());
break;
case 'deflate-raw':
super(createInflateRaw());
break;
case 'br':
super(createBrotliDecompress());
break;
default:
throw new Error(`Unsupported compression format: ${compressionFormat}`);
}
}
}
27 changes: 21 additions & 6 deletions packages/node-fetch/src/ReadableStream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Readable } from 'stream';
import { PonyfillWritableStream } from './WritableStream';

function createController<T>(
desiredSize: number,
Expand Down Expand Up @@ -168,13 +169,23 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
}

async pipeTo(destination: WritableStream<T>): Promise<void> {
const writer = destination.getWriter();
await writer.ready;
for await (const chunk of this.readable) {
await writer.write(chunk);
if (isPonyfillWritableStream(destination)) {
return new Promise((resolve, reject) => {
this.readable.pipe(destination.writable);
destination.writable.once('finish', resolve);
destination.writable.once('error', reject);
});
} else {
const writer = destination.getWriter();
try {
for await (const chunk of this) {
await writer.write(chunk);
}
await writer.close();
} catch (err) {
await writer.abort(err);
}
}
await writer.ready;
return writer.close();
}

pipeThrough<T2>({
Expand All @@ -192,3 +203,7 @@ export class PonyfillReadableStream<T> implements ReadableStream<T> {
return isReadableStream(instance);
}
}

function isPonyfillWritableStream(obj: any): obj is PonyfillWritableStream {
return obj?.writable != null;
}
76 changes: 76 additions & 0 deletions packages/node-fetch/src/TransformStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { Transform } from 'node:stream';
import { PonyfillReadableStream } from './ReadableStream.js';
import { PonyfillWritableStream } from './WritableStream.js';

export class PonyfillTransformStream<I = any, O = any> implements TransformStream<I, O> {
transform: Transform;
writable: PonyfillWritableStream<I>;
readable: PonyfillReadableStream<O>;

constructor(transformer?: Transformer<I, O> | Transform) {
if (transformer instanceof Transform) {
this.transform = transformer;
} else if (transformer) {
const controller: TransformStreamDefaultController = {
enqueue(chunk: O) {
transform.push(chunk);
},
error(reason: any) {
transform.destroy(reason);
},
terminate() {
transform.end();
},
get desiredSize() {
return transform.writableLength;
},
};
const transform = new Transform({
read() {},
write(chunk: I, _encoding: BufferEncoding, callback: (error?: Error | null) => void) {
try {
const result = transformer.transform?.(chunk, controller);
if (result instanceof Promise) {
result.then(
() => {
callback();
},
err => {
callback(err);
},
);
} else {
callback();
}
} catch (err) {
callback(err as Error);
}
},
final(callback: (error?: Error | null) => void) {
try {
const result = transformer.flush?.(controller);
if (result instanceof Promise) {
result.then(
() => {
callback();
},
err => {
callback(err);
},
);
} else {
callback();
}
} catch (err) {
callback(err as Error);
}
},
});
this.transform = transform;
} else {
this.transform = new Transform();
}
this.writable = new PonyfillWritableStream(this.transform);
this.readable = new PonyfillReadableStream(this.transform);
}
}
Loading

0 comments on commit bebc159

Please sign in to comment.