diff --git a/doc/api/stream.md b/doc/api/stream.md index d530ed0d62a9e3..5cc312abb42ccb 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1861,7 +1861,7 @@ failure, this can cause event listener leaks and swallowed errors. added: REPLACEME --> -* `streams` {Stream[]} +* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]} * Returns: {stream.Duplex} Combines two or more streams into a `Duplex` stream that writes to the @@ -1875,6 +1875,9 @@ when passing streams to `stream.pipeline`, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit. +If passed a `Function` it must be a factory method taking a `source` +`Iterable`. + ```mjs import { compose, Transform } from 'stream'; @@ -1884,11 +1887,11 @@ const removeSpaces = new Transform({ } }); -const toUpper = new Transform({ - transform(chunk, encoding, callback) { - callback(null, String(chunk).toUpperCase()); +async function* toUpper(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); } -}); +} let res = ''; for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { @@ -1898,6 +1901,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { console.log(res); // prints 'HELLOWORLD' ``` +`stream.compose` can be used to convert async iterables, generators and +functions into streams. + +* `AsyncIterable` converts into a readable `Duplex`. Cannot yield + `null`. +* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. + Must take a source `AsyncIterable` as first parameter. Cannot yield + `null`. +* `AsyncFunction` converts into a writable `Duplex`. Must return + either `null` or `undefined`. + +```mjs +import { compose } from 'stream'; +import { finished } from 'stream/promises'; + +// Convert AsyncIterable into readable Duplex. +const s1 = compose(async function*() { + yield 'Hello'; + yield 'World'; +}()); + +// Convert AsyncGenerator into transform Duplex. +const s2 = compose(async function*(source) { + for await (const chunk of source) { + yield String(chunk).toUpperCase(); + } +}); + +let res = ''; + +// Convert AsyncFunction into writable Duplex. +const s3 = compose(async function(source) { + for await (const chunk of source) { + res += chunk; + } +}); + +await finished(compose(s1, s2, s3)); + +console.log(res); // prints 'HELLOWORLD' +``` + ### `stream.Readable.from(iterable, [options])`