From 9054d25acc5d3b6dcb8ea53351d9b3027f9d68e1 Mon Sep 17 00:00:00 2001 From: Nitzan Uziely Date: Sat, 17 Apr 2021 21:16:46 +0300 Subject: [PATCH] stream: add a non-destroying iterator to Readable add a non-destroying iterator to Readable fixes: https://github.com/nodejs/node/issues/38491 PR-URL: https://github.com/nodejs/node/pull/38526 Fixes: https://github.com/nodejs/node/issues/38491 Reviewed-By: James M Snell Reviewed-By: Matteo Collina Reviewed-By: Robert Nagy Reviewed-By: Benjamin Gruenbaum --- doc/api/stream.md | 60 ++++++++- lib/internal/streams/readable.js | 38 ++++-- .../test-stream-readable-async-iterators.js | 116 ++++++++++++++++++ 3 files changed, 204 insertions(+), 10 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 4e35340c04812d..988790a6f097b8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1506,13 +1506,69 @@ async function print(readable) { print(fs.createReadStream('file')).catch(console.error); ``` -If the loop terminates with a `break` or a `throw`, the stream will be -destroyed. In other terms, iterating over a stream will consume the stream +If the loop terminates with a `break`, `return`, or a `throw`, the stream will +be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the `highWaterMark` option. In the code example above, data will be in a single chunk if the file has less then 64KB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable.iterator([options])` + + +> Stability: 1 - Experimental + +* `options` {Object} + * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the + async iterator, or exiting a `for await...of` iteration using a `break`, + `return`, or `throw` will not destroy the stream. **Default:** `true`. + * `destroyOnError` {boolean} When set to `false`, if the stream emits an + error while it's being iterated, the iterator will not destroy the stream. + **Default:** `true`. +* Returns: {AsyncIterator} to consume the stream. + +The iterator created by this method gives users the option to cancel the +destruction of the stream if the `for await...of` loop is exited by `return`, +`break`, or `throw`, or if the iterator should destroy the stream if the stream +emitted an error during iteration. + +```js +const { Readable } = require('stream'); + +async function printIterator(readable) { + for await (const chunk of readable.iterator({ destroyOnReturn: false })) { + console.log(chunk); // 1 + break; + } + + console.log(readable.destroyed); // false + + for await (const chunk of readable.iterator({ destroyOnReturn: false })) { + console.log(chunk); // Will print 2 and then 3 + } + + console.log(readable.destroyed); // True, stream was totally consumed +} + +async function printSymbolAsyncIterator(readable) { + for await (const chunk of readable) { + console.log(chunk); // 1 + break; + } + + console.log(readable.destroyed); // true +} + +async function showBoth() { + await printIterator(Readable.from([1, 2, 3])); + await printSymbolAsyncIterator(Readable.from([1, 2, 3])); +} + +showBoth(); +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index aa0f5f94886427..2a818db22dfb1d 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -62,6 +62,7 @@ const { ERR_METHOD_NOT_IMPLEMENTED, ERR_STREAM_UNSHIFT_AFTER_END_EVENT } = require('internal/errors').codes; +const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); @@ -1062,8 +1063,17 @@ Readable.prototype.wrap = function(stream) { }; Readable.prototype[SymbolAsyncIterator] = function() { - let stream = this; + return streamToAsyncIterator(this); +}; +Readable.prototype.iterator = function(options) { + if (options !== undefined) { + validateObject(options, 'options'); + } + return streamToAsyncIterator(this, options); +}; + +function streamToAsyncIterator(stream, options) { if (typeof stream.read !== 'function') { // v1 stream const src = stream; @@ -1076,14 +1086,20 @@ Readable.prototype[SymbolAsyncIterator] = function() { }).wrap(src); } - const iter = createAsyncIterator(stream); + const iter = createAsyncIterator(stream, options); iter.stream = stream; return iter; -}; +} -async function* createAsyncIterator(stream) { +async function* createAsyncIterator(stream, options) { let callback = nop; + const opts = { + destroyOnReturn: true, + destroyOnError: true, + ...options, + }; + function next(resolve) { if (this === stream) { callback(); @@ -1116,6 +1132,7 @@ async function* createAsyncIterator(stream) { next.call(this); }); + let errorThrown = false; try { while (true) { const chunk = stream.destroyed ? null : stream.read(); @@ -1132,12 +1149,17 @@ async function* createAsyncIterator(stream) { } } } catch (err) { - destroyImpl.destroyer(stream, err); + if (opts.destroyOnError) { + destroyImpl.destroyer(stream, err); + } + errorThrown = true; throw err; } finally { - if (state.autoDestroy || !endEmitted) { - // TODO(ronag): ERR_PREMATURE_CLOSE? - destroyImpl.destroyer(stream, null); + if (!errorThrown && opts.destroyOnReturn) { + if (state.autoDestroy || !endEmitted) { + // TODO(ronag): ERR_PREMATURE_CLOSE? + destroyImpl.destroyer(stream, null); + } } } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 7c457fdc3da24b..a497317565fb4c 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -693,6 +693,122 @@ async function tests() { }); } +// AsyncIterator non-destroying iterator +{ + function createReadable() { + return Readable.from((async function* () { + await Promise.resolve(); + yield 5; + await Promise.resolve(); + yield 7; + await Promise.resolve(); + })()); + } + + function createErrorReadable() { + const opts = { read() { throw new Error('inner'); } }; + return new Readable(opts); + } + + // Check default destroys on return + (async function() { + const readable = createReadable(); + for await (const chunk of readable.iterator()) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit destroying on return + (async function() { + const readable = createReadable(); + for await (const chunk of readable.iterator({ destroyOnReturn: true })) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check default destroys on error + (async function() { + const readable = createErrorReadable(); + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit destroys on error + (async function() { + const readable = createErrorReadable(); + const opts = { destroyOnError: true, destroyOnReturn: false }; + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable.iterator(opts)) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check explicit non-destroy with return true + (async function() { + const readable = createErrorReadable(); + const opts = { destroyOnError: false, destroyOnReturn: true }; + try { + // eslint-disable-next-line no-unused-vars + for await (const chunk of readable.iterator(opts)) { } + assert.fail('should have thrown'); + } catch (err) { + assert.strictEqual(err.message, 'inner'); + } + + assert.ok(!readable.destroyed); + })().then(common.mustCall()); + + // Check explicit non-destroy with return true + (async function() { + const readable = createReadable(); + const opts = { destroyOnReturn: false }; + for await (const chunk of readable.iterator(opts)) { + assert.strictEqual(chunk, 5); + break; + } + + assert.ok(!readable.destroyed); + + for await (const chunk of readable.iterator(opts)) { + assert.strictEqual(chunk, 7); + } + + assert.ok(readable.destroyed); + })().then(common.mustCall()); + + // Check non-object options. + { + const readable = createReadable(); + assert.throws( + () => readable.iterator(42), + { + code: 'ERR_INVALID_ARG_TYPE', + name: 'TypeError', + message: 'The "options" argument must be of type object. Received ' + + 'type number (42)', + } + ); + } +} + { let _req; const server = http.createServer((request, response) => {