From 0a005521229b7130f833b32af55dffcff0d78d97 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 2 Mar 2020 17:59:06 +0100 Subject: [PATCH] stream: do not swallow errors with async iterators and pipeline Before this patch, pipeline() could swallow errors by pre-emptively producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful to the user. Co-Authored-By: Robert Nagy PR-URL: https://github.com/nodejs/node/pull/32051 Reviewed-By: Robert Nagy Reviewed-By: Benjamin Gruenbaum --- lib/internal/streams/pipeline.js | 39 ++++++++++++++++----------- test/parallel/test-stream-pipeline.js | 27 +++++++++++++++++++ 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 069ce8e1d5d874..edbfed3e516c05 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -109,6 +109,7 @@ async function pump(iterable, writable, finish) { if (!EE) { EE = require('events'); } + let error; try { for await (const chunk of iterable) { if (!writable.write(chunk)) { @@ -118,7 +119,9 @@ async function pump(iterable, writable, finish) { } writable.end(); } catch (err) { - finish(err); + error = err; + } finally { + finish(error); } } @@ -135,15 +138,21 @@ function pipeline(...streams) { let value; const destroys = []; - function finish(err, final) { - if (!error && err) { + let finishCount = 0; + + function finish(err) { + const final = --finishCount === 0; + + if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { error = err; } - if (error || final) { - for (const destroy of destroys) { - destroy(error); - } + if (!error && !final) { + return; + } + + while (destroys.length) { + destroys.shift()(error); } if (final) { @@ -151,12 +160,6 @@ function pipeline(...streams) { } } - function wrap(stream, reading, writing, final) { - destroys.push(destroyer(stream, reading, writing, final, (err) => { - finish(err, final); - })); - } - let ret; for (let i = 0; i < streams.length; i++) { const stream = streams[i]; @@ -164,7 +167,8 @@ function pipeline(...streams) { const writing = i > 0; if (isStream(stream)) { - wrap(stream, reading, writing, !reading); + finishCount++; + destroys.push(destroyer(stream, reading, writing, !reading, finish)); } if (i === 0) { @@ -210,6 +214,7 @@ function pipeline(...streams) { pt.destroy(err); }); } else if (isIterable(ret, true)) { + finishCount++; pump(ret, pt, finish); } else { throw new ERR_INVALID_RETURN_VALUE( @@ -217,13 +222,17 @@ function pipeline(...streams) { } ret = pt; - wrap(ret, false, true, true); + + finishCount++; + destroys.push(destroyer(ret, false, true, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { ret.pipe(stream); } else { ret = makeAsyncIterable(ret); + + finishCount++; pump(ret, stream, finish); } ret = stream; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 0d90f65f5a6643..9939a16494499c 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -968,3 +968,30 @@ const { promisify } = require('util'); })); src.end(); } + +{ + let res = ''; + const rs = new Readable({ + read() { + setImmediate(() => { + rs.push('hello'); + }); + } + }); + const ws = new Writable({ + write: common.mustNotCall() + }); + pipeline(rs, async function*(stream) { + /* eslint no-unused-vars: off */ + for await (const chunk of stream) { + throw new Error('kaboom'); + } + }, async function *(source) { + for await (const chunk of source) { + res += chunk; + } + }, ws, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); +}