From 650c9bd02298f7dd2458cbd99d8d88d1761b6785 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 19 Nov 2021 19:14:32 +0100 Subject: [PATCH] stream: pipeline with end option Currently pipeline cannot fully replace pipe due to the missing end option. This PR adds the end option to the promisified pipeline method. PR-URL: https://github.com/nodejs/node/pull/40886 Reviewed-By: Luigi Pinca Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- lib/internal/streams/pipeline.js | 40 +++++++++++++++++---------- lib/stream/promises.js | 4 ++- test/parallel/test-stream-pipeline.js | 23 ++++++++++++++- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ab7e3cbb6c8a26..f5f489577854ac 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -109,7 +109,7 @@ async function* fromReadable(val) { yield* Readable.prototype[SymbolAsyncIterator].call(val); } -async function pump(iterable, writable, finish) { +async function pump(iterable, writable, finish, opts) { let error; let onresolve = null; @@ -153,7 +153,9 @@ async function pump(iterable, writable, finish) { } } - writable.end(); + if (opts?.end !== false) { + writable.end(); + } await wait(); @@ -227,17 +229,22 @@ function pipelineImpl(streams, callback, opts) { const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; + const end = reading || opts?.end !== false; if (isNodeStream(stream)) { - finishCount++; - destroys.push(destroyer(stream, reading, writing, (err) => { - if (!err && !reading && isReadableFinished(stream, false)) { - stream.read(0); - destroyer(stream, true, writing, finish); - } else { - finish(err); - } - })); + if (end) { + finishCount++; + destroys.push(destroyer(stream, reading, writing, (err) => { + if (!err && !reading && isReadableFinished(stream, false)) { + stream.read(0); + destroyer(stream, true, writing, finish); + } else { + finish(err); + } + })); + } else { + stream.on('error', finish); + } } if (i === 0) { @@ -282,14 +289,17 @@ function pipelineImpl(streams, callback, opts) { then.call(ret, (val) => { value = val; - pt.end(val); + pt.write(val); + if (end) { + pt.end(); + } }, (err) => { pt.destroy(err); }, ); } else if (isIterable(ret, true)) { finishCount++; - pump(ret, pt, finish); + pump(ret, pt, finish, { end }); } else { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable or Promise', 'destination', ret); @@ -302,7 +312,7 @@ function pipelineImpl(streams, callback, opts) { } } else if (isNodeStream(stream)) { if (isReadableNodeStream(ret)) { - ret.pipe(stream); + ret.pipe(stream, { end }); // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. @@ -314,7 +324,7 @@ function pipelineImpl(streams, callback, opts) { ret = makeAsyncIterable(ret); finishCount++; - pump(ret, stream, finish); + pump(ret, stream, finish, { end }); } ret = stream; } else { diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 0db01a8b208d60..2fdcad3cc4aa3a 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -16,11 +16,13 @@ const eos = require('internal/streams/end-of-stream'); function pipeline(...streams) { return new Promise((resolve, reject) => { let signal; + let end; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; + end = options.end; } pl(streams, (err, value) => { @@ -29,7 +31,7 @@ function pipeline(...streams) { } else { resolve(value); } - }, { signal }); + }, { signal, end }); }); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 1fc3386fc16257..ae4e76352f3545 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1465,5 +1465,26 @@ const tsp = require('timers/promises'); assert.strictEqual(duplex.destroyed, true); } - run(); + run().then(common.mustCall()); +} + +{ + const pipelinePromise = promisify(pipeline); + + async function run() { + const read = new Readable({ + read() {} + }); + + const duplex = new PassThrough(); + + read.push(null); + + await pipelinePromise(read, duplex, { end: false }); + + assert.strictEqual(duplex.destroyed, false); + assert.strictEqual(duplex.writableEnded, false); + } + + run().then(common.mustCall()); }