Skip to content

Commit

Permalink
stream: add map method to Readable
Browse files Browse the repository at this point in the history
Implement the map method on readable stream. This starts the alignment
with the tc39-iterator-helpers proposal and adds a `.map` method to
every Node.js readable stream.

Co-Authored-By: Robert Nagy <[email protected]>

PR-URL: #40815
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
benjamingr authored and danielleadams committed Feb 1, 2022
1 parent f31a3a2 commit 9c718f8
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 3 deletions.
44 changes: 44 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,50 @@ async function showBoth() {
showBoth();
```

### `readable.map(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `fn` {Function|AsyncFunction} a function to map over every item in the stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream mapped with the function `fn`.

This method allows mapping over the stream. The `fn` function will be called
for every item in the stream. If the `fn` function returns a promise - that
promise will be `await`ed before being passed to the result stream.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous mapper.
for await (const item of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(item); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Logs the DNS result of resolver.resolve4.
}
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
152 changes: 152 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
'use strict';

const { AbortController } = require('internal/abort_controller');
const {
codes: {
ERR_INVALID_ARG_TYPE,
},
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');

const {
MathFloor,
Promise,
PromiseReject,
PromisePrototypeCatch,
Symbol,
} = primordials;

const kEmpty = Symbol('kEmpty');
const kEof = Symbol('kEof');

async function * map(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this);
}

if (options != null && typeof options !== 'object') {
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
}

let concurrency = 1;
if (options?.concurrency != null) {
concurrency = MathFloor(options.concurrency);
}

validateInteger(concurrency, 'concurrency', 1);

const ac = new AbortController();
const stream = this;
const queue = [];
const signal = ac.signal;
const signalOpt = { signal };

const abort = () => ac.abort();
options?.signal?.addEventListener('abort', abort);

let next;
let resume;
let done = false;

function onDone() {
done = true;
}

async function pump() {
try {
for await (let val of stream) {
if (done) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

try {
val = fn(val, signalOpt);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}

if (typeof val?.catch === 'function') {
val.catch(onDone);
}

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
await new Promise((resolve) => {
resume = resolve;
});
}
}
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeCatch(val, onDone);
queue.push(val);
} finally {
done = true;
if (next) {
next();
next = null;
}
options?.signal?.removeEventListener('abort', abort);
}
}

pump();

try {
while (true) {
while (queue.length > 0) {
const val = await queue[0];

if (val === kEof) {
return;
}

if (signal.aborted) {
throw new AbortError();
}

if (val !== kEmpty) {
yield val;
}

queue.shift();
if (resume) {
resume();
resume = null;
}
}

await new Promise((resolve) => {
next = resolve;
});
}
} finally {
ac.abort();

done = true;
if (resume) {
resume();
resume = null;
}
}
}

module.exports = {
map,
};
9 changes: 9 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

const {
ObjectDefineProperty,
ObjectKeys,
ReflectApply,
} = primordials;

const {
promisify: { custom: customPromisify },
} = require('internal/util');

const operators = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -42,6 +45,12 @@ const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = utils.isDisturbed;
Stream.isErrored = utils.isErrored;
Stream.Readable = require('internal/streams/readable');
for (const key of ObjectKeys(operators)) {
const op = operators[key];
Stream.Readable.prototype[key] = function(...args) {
return Stream.Readable.from(ReflectApply(op, this, args));
};
}
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ const expectedModules = new Set([
'NativeModule internal/streams/end-of-stream',
'NativeModule internal/streams/from',
'NativeModule internal/streams/legacy',
'NativeModule internal/streams/operators',
'NativeModule internal/streams/passthrough',
'NativeModule internal/streams/pipeline',
'NativeModule internal/streams/readable',
Expand Down
108 changes: 108 additions & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on synchronous streams with an asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x + x);
const result = [4, 8, 12, 16, 20];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

{
// Concurrency result order
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
await setTimeout(10 - item, { signal });
return item;
}, { concurrency: 2 });

(async () => {
const expected = [1, 2];
for await (const item of stream) {
assert.strictEqual(item, expected.shift());
}
})().then(common.mustCall());
}

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).map(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).map((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
assert.strictEqual(stream.readable, true);
}
6 changes: 3 additions & 3 deletions tools/doc/type-parser.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ const customTypesMap = {

'Stream': 'stream.html#stream',
'stream.Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Duplex': 'stream.html#class-streamduplex',
'stream.Readable': 'stream.html#class-streamreadable',
'Readable': 'stream.html#class-streamreadable',
'stream.Transform': 'stream.html#class-streamtransform',
'Transform': 'stream.html#class-streamtransform',
'stream.Writable': 'stream.html#class-streamwritable',
'Writable': 'stream.html#class-streamwritable',

'Immediate': 'timers.html#class-immediate',
Expand Down

0 comments on commit 9c718f8

Please sign in to comment.