Skip to content

Commit

Permalink
stream: add toArray
Browse files Browse the repository at this point in the history
Add the toArray method from the TC39 iterator helper proposal to
Readable streams. This also enables a common-use case of converting a
stream to an array.

Co-Authored-By: Robert Nagy <[email protected]>
PR-URL: #41553
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
2 people authored and danielleadams committed Apr 21, 2022
1 parent adb88fc commit ac8526e
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
40 changes: 40 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,46 @@ await dnsResults.forEach((result) => {
console.log('done'); // Stream has finished
```

### `readable.toArray([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `options` {Object}
* `signal` {AbortSignal} allows cancelling the toArray operation if the
signal is aborted.
* Returns: {Promise} a promise containing an array (if the stream is in
object mode) or Buffer with the contents of the stream.

This method allows easily obtaining the contents of a stream. If the
stream is in [object mode][object-mode] an array of its contents is returned.
If the stream is not in object mode a Buffer containing its data is returned.

As this method reads the entire stream into memory, it negates the benefits of
streams. It's intended for interoperability and convenience, not as the primary
way to consume streams.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an aray using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
17 changes: 17 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const { Buffer } = require('buffer');

const {
codes: {
ERR_INVALID_ARG_TYPE,
Expand All @@ -10,6 +12,7 @@ const {
const { validateInteger } = require('internal/validators');

const {
ArrayPrototypePush,
MathFloor,
Promise,
PromiseReject,
Expand Down Expand Up @@ -174,11 +177,25 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}

async function toArray(options) {
const result = [];
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
ArrayPrototypePush(result, val);
}
if (!this.readableObjectMode) {
return Buffer.concat(result);
}
return result;
}
module.exports.streamReturningOperators = {
filter,
map,
};

module.exports.promiseReturningOperators = {
forEach,
toArray,
};
79 changes: 79 additions & 0 deletions test/parallel/test-stream-toArray.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');

{
// Works on a synchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test);
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Works on a non-object-mode stream and flattens it
(async () => {
const stream = Readable.from(
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
, { objectMode: false });
const result = await stream.toArray();
assert.strictEqual(Buffer.isBuffer(result), true);
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
})().then(common.mustCall());
}

{
// Works on an asynchronous stream
(async () => {
const tests = [
[],
[1],
[1, 2, 3],
Array(100).fill().map((_, i) => i),
];
for (const test of tests) {
const stream = Readable.from(test).map((x) => Promise.resolve(x));
const result = await stream.toArray();
assert.deepStrictEqual(result, test);
}
})().then(common.mustCall());
}

{
// Support for AbortSignal
const ac = new AbortController();
let stream;
assert.rejects(async () => {
stream = Readable.from([1, 2, 3]).map(async (x) => {
if (x === 3) {
await new Promise(() => {}); // Explicitly do not pass signal here
}
return Promise.resolve(x);
});
await stream.toArray({ signal: ac.signal });
}, {
name: 'AbortError',
}).then(common.mustCall(() => {
// Only stops toArray, does not destory the stream
assert(stream.destroyed, false);
}));
ac.abort();
}
{
// Test result is a Promise
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
assert.strictEqual(result instanceof Promise, true);
}

0 comments on commit ac8526e

Please sign in to comment.