Skip to content

Commit

Permalink
stream: compose with async functions
Browse files Browse the repository at this point in the history
Enables async function support for stream.compose.

PR-URL: #39435
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag committed Jul 21, 2021
1 parent 5100c3c commit 48ff0a1
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
55 changes: 50 additions & 5 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,7 @@ failure, this can cause event listener leaks and swallowed errors.
added: REPLACEME
-->

* `streams` {Stream[]}
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
* Returns: {stream.Duplex}

Combines two or more streams into a `Duplex` stream that writes to the
Expand All @@ -1875,6 +1875,9 @@ when passing streams to `stream.pipeline`, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.

If passed a `Function` it must be a factory method taking a `source`
`Iterable`.

```mjs
import { compose, Transform } from 'stream';

Expand All @@ -1884,11 +1887,11 @@ const removeSpaces = new Transform({
}
});

const toUpper = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).toUpperCase());
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
Expand All @@ -1898,6 +1901,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
console.log(res); // prints 'HELLOWORLD'
```

`stream.compose` can be used to convert async iterables, generators and
functions into streams.

* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
`null`.
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
Must take a source `AsyncIterable` as first parameter. Cannot yield
`null`.
* `AsyncFunction` converts into a writable `Duplex`. Must return
either `null` or `undefined`.

```mjs
import { compose } from 'stream';
import { finished } from 'stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'
```

### `stream.Readable.from(iterable, [options])`
<!-- YAML
added:
Expand Down
24 changes: 2 additions & 22 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,10 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const _compose = require('internal/streams/compose');
const compose = require('internal/streams/compose');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
const { isNodeStream } = require('internal/streams/utils');
const {
codes: {
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');

const promises = require('stream/promises');

Expand All @@ -54,21 +48,7 @@ const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;

Stream.compose = function compose(...streams) {
// TODO (ronag): Remove this once async function API
// has been discussed.
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
streams[n],
'must be stream'
);
}
}
return _compose(...streams);
};
Stream.compose = compose;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
Expand Down

0 comments on commit 48ff0a1

Please sign in to comment.