diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 441fcb471858bc..8b44c41426d280 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -16,18 +16,21 @@ const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, - ERR_STREAM_DESTROYED + ERR_STREAM_DESTROYED, + ERR_STREAM_PREMATURE_CLOSE } = require('internal/errors').codes; const { validateCallback } = require('internal/validators'); +function noop() {} + const { isIterable, isReadable, isStream, } = require('internal/streams/utils'); +const assert = require('internal/assert'); -let EE; let PassThrough; let Readable; @@ -102,25 +105,52 @@ async function* fromReadable(val) { } async function pump(iterable, writable, finish) { - if (!EE) { - EE = require('events'); - } let error; + let callback = noop; + const resume = (err) => { + if (!error && err) { + error = err; + } + const _callback = callback; + callback = noop; + _callback(); + }; + const onClose = () => { + resume(new ERR_STREAM_PREMATURE_CLOSE()); + }; + + const waitForDrain = () => new Promise((resolve) => { + assert.strictEqual(callback, noop); + if (error || writable.destroyed) { + resolve(); + } else { + callback = resolve; + } + }); + + writable + .on('drain', resume) + .on('error', resume) + .on('close', onClose); + try { - if (writable.writableNeedDrain === true) { - await EE.once(writable, 'drain'); + if (writable.writableNeedDrain) { + await waitForDrain(); } for await (const chunk of iterable) { if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); + await waitForDrain(); } } writable.end(); } catch (err) { error = err; } finally { + writable + .off('drain', resume) + .off('error', resume) + .off('close', onClose); finish(error); } }