Skip to content

Commit

Permalink
stream: add a non-destroying iterator to Readable
Browse files Browse the repository at this point in the history
add a non-destroying iterator to Readable

fixes: #38491

PR-URL: #38526
Fixes: #38491
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
Linkgoron authored and danielleadams committed May 31, 2021
1 parent e2f28c8 commit 9054d25
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 10 deletions.
60 changes: 58 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1506,13 +1506,69 @@ async function print(readable) {
print(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be
destroyed. In other terms, iterating over a stream will consume the stream
If the loop terminates with a `break`, `return`, or a `throw`, the stream will
be destroyed. In other terms, iterating over a stream will consume the stream
fully. The stream will be read in chunks of size equal to the `highWaterMark`
option. In the code example above, data will be in a single chunk if the file
has less then 64KB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable.iterator([options])`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `options` {Object}
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
async iterator, or exiting a `for await...of` iteration using a `break`,
`return`, or `throw` will not destroy the stream. **Default:** `true`.
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
error while it's being iterated, the iterator will not destroy the stream.
**Default:** `true`.
* Returns: {AsyncIterator} to consume the stream.

The iterator created by this method gives users the option to cancel the
destruction of the stream if the `for await...of` loop is exited by `return`,
`break`, or `throw`, or if the iterator should destroy the stream if the stream
emitted an error during iteration.

```js
const { Readable } = require('stream');

async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}

console.log(readable.destroyed); // false

for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}

console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}

console.log(readable.destroyed); // true
}

async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
38 changes: 30 additions & 8 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');

Expand Down Expand Up @@ -1062,8 +1063,17 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
let stream = this;
return streamToAsyncIterator(this);
};

Readable.prototype.iterator = function(options) {
if (options !== undefined) {
validateObject(options, 'options');
}
return streamToAsyncIterator(this, options);
};

function streamToAsyncIterator(stream, options) {
if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
Expand All @@ -1076,14 +1086,20 @@ Readable.prototype[SymbolAsyncIterator] = function() {
}).wrap(src);
}

const iter = createAsyncIterator(stream);
const iter = createAsyncIterator(stream, options);
iter.stream = stream;
return iter;
};
}

async function* createAsyncIterator(stream) {
async function* createAsyncIterator(stream, options) {
let callback = nop;

const opts = {
destroyOnReturn: true,
destroyOnError: true,
...options,
};

function next(resolve) {
if (this === stream) {
callback();
Expand Down Expand Up @@ -1116,6 +1132,7 @@ async function* createAsyncIterator(stream) {
next.call(this);
});

let errorThrown = false;
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
Expand All @@ -1132,12 +1149,17 @@ async function* createAsyncIterator(stream) {
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
if (opts.destroyOnError) {
destroyImpl.destroyer(stream, err);
}
errorThrown = true;
throw err;
} finally {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
destroyImpl.destroyer(stream, null);
}
}
}
}
Expand Down
116 changes: 116 additions & 0 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,122 @@ async function tests() {
});
}

// AsyncIterator non-destroying iterator
{
function createReadable() {
return Readable.from((async function* () {
await Promise.resolve();
yield 5;
await Promise.resolve();
yield 7;
await Promise.resolve();
})());
}

function createErrorReadable() {
const opts = { read() { throw new Error('inner'); } };
return new Readable(opts);
}

// Check default destroys on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator()) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroying on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check default destroys on error
(async function() {
const readable = createErrorReadable();
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroys on error
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: true, destroyOnReturn: false };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: false, destroyOnReturn: true };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(!readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createReadable();
const opts = { destroyOnReturn: false };
for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(!readable.destroyed);

for await (const chunk of readable.iterator(opts)) {
assert.strictEqual(chunk, 7);
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check non-object options.
{
const readable = createReadable();
assert.throws(
() => readable.iterator(42),
{
code: 'ERR_INVALID_ARG_TYPE',
name: 'TypeError',
message: 'The "options" argument must be of type object. Received ' +
'type number (42)',
}
);
}
}

{
let _req;
const server = http.createServer((request, response) => {
Expand Down

0 comments on commit 9054d25

Please sign in to comment.