From b2fb01a68d3f68f0c8bd524d15d4116f673b777e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 1 May 2020 23:02:42 +0200 Subject: [PATCH] stream: make from read one at a time Currently from will eagerly buffer up items which means that errors are also eagerly encountered and items which are buffer when an error occurs will be discarded, which is inconsistent with how generators work. Fixes: https://github.com/nodejs/node/issues/29428 PR-URL: https://github.com/nodejs/node/pull/33201 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Benjamin Gruenbaum Reviewed-By: Ruben Bridgewater --- lib/internal/streams/from.js | 2 ++ test/parallel/test-readable-from.js | 26 +++++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 6752679ae3bc2b..13e8a73980ddd1 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -33,6 +33,8 @@ function from(Readable, iterable, opts) { const readable = new Readable({ objectMode: true, + highWaterMark: 1, + // TODO(ronag): What options should be allowed? ...opts }); diff --git a/test/parallel/test-readable-from.js b/test/parallel/test-readable-from.js index 6cc9216a306378..94bc2c1ae4b926 100644 --- a/test/parallel/test-readable-from.js +++ b/test/parallel/test-readable-from.js @@ -159,6 +159,29 @@ async function asTransformStream() { } } +async function endWithError() { + async function* generate() { + yield 1; + yield 2; + yield Promise.reject('Boum'); + } + + const stream = Readable.from(generate()); + + const expected = [1, 2]; + + try { + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } + throw new Error(); + } catch (err) { + strictEqual(expected.length, 0); + strictEqual(err, 'Boum'); + } +} + + Promise.all([ toReadableBasicSupport(), toReadableSyncIterator(), @@ -168,5 +191,6 @@ Promise.all([ toReadableOnData(), toReadableOnDataNonObject(), destroysTheStreamWhenThrowing(), - asTransformStream() + asTransformStream(), + endWithError() ]).then(mustCall());