diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 441fcb471858bc..5f63e1c7d29b76 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -5,6 +5,7 @@ const { ArrayIsArray, + Promise, SymbolAsyncIterator, } = primordials; @@ -13,21 +14,27 @@ let eos; const { once } = require('internal/util'); const destroyImpl = require('internal/streams/destroy'); const { - ERR_INVALID_ARG_TYPE, - ERR_INVALID_RETURN_VALUE, - ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED -} = require('internal/errors').codes; + aggregateTwoErrors, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, + ERR_MISSING_ARGS, + ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE, + }, +} = require('internal/errors'); const { validateCallback } = require('internal/validators'); +function noop() {} + const { isIterable, isReadable, isStream, } = require('internal/streams/utils'); +const assert = require('internal/assert'); -let EE; let PassThrough; let Readable; @@ -102,25 +109,62 @@ async function* fromReadable(val) { } async function pump(iterable, writable, finish) { - if (!EE) { - EE = require('events'); - } let error; + let callback = noop; + const resume = (err) => { + error = aggregateTwoErrors(error, err); + const _callback = callback; + callback = noop; + _callback(); + }; + const onClose = () => { + resume(new ERR_STREAM_PREMATURE_CLOSE()); + }; + + const waitForDrain = () => new Promise((resolve) => { + assert(callback === noop); + if (error || writable.destroyed) { + resolve(); + } else { + callback = resolve; + } + }); + + writable + .on('drain', resume) + .on('error', resume) + .on('close', onClose); + try { - if (writable.writableNeedDrain === true) { - await EE.once(writable, 'drain'); + if (writable.writableNeedDrain) { + await waitForDrain(); + } + + if (error) { + return; } for await (const chunk of iterable) { if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); + await waitForDrain(); + } + if (error) { + return; } } + + if (error) { + return; + } + writable.end(); } catch (err) { - error = err; + error = aggregateTwoErrors(error, err); } finally { + writable + .off('drain', resume) + .off('error', resume) + .off('close', onClose); finish(error); } } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..aaf726ea5a0350 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1387,3 +1387,36 @@ const net = require('net'); assert.strictEqual(res, content); })); } + +{ + const writableLike = new Stream(); + writableLike.writableNeedDrain = true; + + pipeline( + async function *() {}, + writableLike, + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + }) + ); + + writableLike.emit('close'); +} + +{ + const writableLike = new Stream(); + writableLike.write = () => false; + + pipeline( + async function *() { + yield null; + yield null; + }, + writableLike, + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + }) + ); + + writableLike.emit('close'); +}