From 32254988ba7c0c0a68d8dfbc32430466370bf170 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Wed, 18 Jan 2023 13:26:49 +0530 Subject: [PATCH] stream: implement finished() for ReadableStream and WritableStream Refs: https://github.com/nodejs/node/issues/39316 PR-URL: https://github.com/nodejs/node/pull/46205 Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina Reviewed-By: Darshan Sen Reviewed-By: James M Snell --- lib/internal/streams/end-of-stream.js | 25 ++- lib/internal/streams/utils.js | 25 +++ lib/internal/webstreams/readablestream.js | 14 +- lib/internal/webstreams/writablestream.js | 14 +- test/parallel/test-webstreams-finished.js | 232 ++++++++++++++++++++++ 5 files changed, 301 insertions(+), 9 deletions(-) create mode 100644 test/parallel/test-webstreams-finished.js diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index ca42174c86459a..07f80aedc69cd5 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,20 +22,23 @@ const { validateBoolean } = require('internal/validators'); -const { Promise } = primordials; +const { Promise, PromisePrototypeThen } = primordials; const { isClosed, isReadable, isReadableNodeStream, + isReadableStream, isReadableFinished, isReadableErrored, isWritable, isWritableNodeStream, + isWritableStream, isWritableFinished, isWritableErrored, isNodeStream, willEmitClose: _willEmitClose, + kIsClosedPromise, } = require('internal/streams/utils'); function isRequest(stream) { @@ -58,14 +61,17 @@ function eos(stream, options, callback) { callback = once(callback); - const readable = options.readable ?? isReadableNodeStream(stream); - const writable = options.writable ?? isWritableNodeStream(stream); + if (isReadableStream(stream) || isWritableStream(stream)) { + return eosWeb(stream, options, callback); + } if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream); + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); } + const readable = options.readable ?? isReadableNodeStream(stream); + const writable = options.writable ?? isWritableNodeStream(stream); + const wState = stream._writableState; const rState = stream._readableState; @@ -255,6 +261,15 @@ function eos(stream, options, callback) { return cleanup; } +function eosWeb(stream, opts, callback) { + PromisePrototypeThen( + stream[kIsClosedPromise].promise, + () => process.nextTick(() => callback.call(stream)), + (err) => process.nextTick(() => callback.call(stream, err)), + ); + return nop; +} + function finished(stream, opts) { let autoCleanup = false; if (opts === null) { diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 4d4f00ab456fa7..9d08af6f31a280 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -4,6 +4,7 @@ const { Symbol, SymbolAsyncIterator, SymbolIterator, + SymbolFor, } = primordials; const kDestroyed = Symbol('kDestroyed'); @@ -11,6 +12,8 @@ const kIsErrored = Symbol('kIsErrored'); const kIsReadable = Symbol('kIsReadable'); const kIsDisturbed = Symbol('kIsDisturbed'); +const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); + function isReadableNodeStream(obj, strict = false) { return !!( obj && @@ -55,6 +58,25 @@ function isNodeStream(obj) { ); } +function isReadableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.pipeThrough === 'function' && + typeof obj.getReader === 'function' && + typeof obj.cancel === 'function' + ); +} + +function isWritableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.getWriter === 'function' && + typeof obj.abort === 'function' + ); +} + function isIterable(obj, isAsync) { if (obj == null) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -269,18 +291,21 @@ module.exports = { kIsErrored, isReadable, kIsReadable, + kIsClosedPromise, isClosed, isDestroyed, isDuplexNodeStream, isFinished, isIterable, isReadableNodeStream, + isReadableStream, isReadableEnded, isReadableFinished, isReadableErrored, isNodeStream, isWritable, isWritableNodeStream, + isWritableStream, isWritableEnded, isWritableFinished, isWritableErrored, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 59155c76a4a262..109332a4de8f6d 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -85,6 +85,7 @@ const { kIsDisturbed, kIsErrored, kIsReadable, + kIsClosedPromise, } = require('internal/streams/utils'); const { @@ -258,9 +259,11 @@ class ReadableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, }; + this[kIsClosedPromise] = createDeferredPromise(); + // The spec requires handling of the strategy first // here. Specifically, if getting the size and // highWaterMark from the strategy fail, that has @@ -652,8 +655,9 @@ function TransferredReadableStream() { writable: undefined, port: undefined, promise: undefined, - } + }, }; + this[kIsClosedPromise] = createDeferredPromise(); }, [], ReadableStream)); } @@ -1213,8 +1217,9 @@ function createTeeReadableStream(start, pull, cancel) { writable: undefined, port: undefined, promise: undefined, - } + }, }; + this[kIsClosedPromise] = createDeferredPromise(); setupReadableStreamDefaultControllerFromSource( this, ObjectCreate(null, { @@ -1887,6 +1892,7 @@ function readableStreamCancel(stream, reason) { function readableStreamClose(stream) { assert(stream[kState].state === 'readable'); stream[kState].state = 'closed'; + stream[kIsClosedPromise].resolve(); const { reader, @@ -1908,6 +1914,8 @@ function readableStreamError(stream, error) { assert(stream[kState].state === 'readable'); stream[kState].state = 'errored'; stream[kState].storedError = error; + stream[kIsClosedPromise].reject(error); + setPromiseHandled(stream[kIsClosedPromise].promise); const { reader diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index 3d5851753057a9..f0fdf35d643695 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -69,6 +69,10 @@ const { kState, } = require('internal/webstreams/util'); +const { + kIsClosedPromise, +} = require('internal/streams/utils'); + const { AbortController, } = require('internal/abort_controller'); @@ -191,9 +195,11 @@ class WritableStream { port1: undefined, port2: undefined, promise: undefined, - } + }, }; + this[kIsClosedPromise] = createDeferredPromise(); + const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -363,6 +369,7 @@ function TransferredWritableStream() { readable: undefined, }, }; + this[kIsClosedPromise] = createDeferredPromise(); }, [], WritableStream)); } @@ -742,6 +749,10 @@ function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { resolve: undefined, }; } + + stream[kIsClosedPromise].reject(stream[kState]?.storedError); + setPromiseHandled(stream[kIsClosedPromise].promise); + const { writer, } = stream[kState]; @@ -855,6 +866,7 @@ function writableStreamFinishInFlightClose(stream) { stream[kState].state = 'closed'; if (stream[kState].writer !== undefined) stream[kState].writer[kState].close.resolve?.(); + stream[kIsClosedPromise].resolve?.(); assert(stream[kState].pendingAbortRequest.abort.promise === undefined); assert(stream[kState].storedError === undefined); } diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js new file mode 100644 index 00000000000000..65a14d863eb922 --- /dev/null +++ b/test/parallel/test-webstreams-finished.js @@ -0,0 +1,232 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); +const { finished } = require('stream'); +const { finished: finishedPromise } = require('stream/promises'); + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + }, + }); + finished(rs, common.mustSucceed()); + async function test() { + const values = []; + for await (const chunk of rs) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + test(); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); +} + +{ + const rs = new ReadableStream({ + async start(controller) { + throw new Error('asd'); + } + }); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + async function test() { + const values = []; + for await (const chunk of rs) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + + finishedPromise(rs).then(common.mustSucceed()); + + test(); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); +} + +{ + const rs = new ReadableStream({ + async start(controller) { + throw new Error('asd'); + } + }); + + finishedPromise(rs).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + const { 0: s1, 1: s2 } = rs.tee(); + + finished(s1, common.mustSucceed()); + finished(s2, common.mustSucceed()); + + async function test(stream) { + const values = []; + for await (const chunk of stream) { + values.push(chunk); + } + assert.deepStrictEqual(values, ['asd']); + } + + Promise.all([ + test(s1), + test(s2), + ]).then(common.mustCall()); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.error(new Error('asd')); + } + }); + + const { 0: s1, 1: s2 } = rs.tee(); + + finished(s1, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); + + finished(s2, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); +} + +{ + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + + finished(rs, common.mustSucceed()); + + rs.cancel(); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + str += chunk; + } + }); + + finished(ws, common.mustSucceed(() => { + assert.strictEqual(str, 'asd'); + })); + + const writer = ws.getWriter(); + writer.write('asd'); + writer.close(); +} + +{ + const ws = new WritableStream({ + async write(chunk) { + throw new Error('asd'); + } + }); + + finished(ws, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); + + const writer = ws.getWriter(); + writer.write('asd').catch((err) => { + assert.strictEqual(err?.message, 'asd'); + }); +} + +{ + let str = ''; + const ws = new WritableStream({ + write(chunk) { + str += chunk; + } + }); + + finishedPromise(ws).then(common.mustSucceed(() => { + assert.strictEqual(str, 'asd'); + })); + + const writer = ws.getWriter(); + writer.write('asd'); + writer.close(); +} + +{ + const ws = new WritableStream({ + write(chunk) { } + }); + finished(ws, common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); + + const writer = ws.getWriter(); + writer.abort(new Error('asd')); +} + +{ + const ws = new WritableStream({ + async write(chunk) { + throw new Error('asd'); + } + }); + + finishedPromise(ws).then(common.mustNotCall()).catch(common.mustCall((err) => { + assert.strictEqual(err?.message, 'asd'); + })); + + const writer = ws.getWriter(); + writer.write('asd').catch((err) => { + assert.strictEqual(err?.message, 'asd'); + }); +}