Skip to content

Commit

Permalink
doc: update stream.reduce concurrency note
Browse files Browse the repository at this point in the history
PR-URL: #47166
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Mohammed Keyvanzadeh <[email protected]>
  • Loading branch information
rluvaton authored and RafaelGSS committed Apr 7, 2023
1 parent 8a69929 commit 908798a
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2499,21 +2499,44 @@ This method calls `fn` on each chunk of the stream in order, passing it the
result from the calculation on the previous element. It returns a promise for
the final value of the reduction.

The reducer function iterates the stream element-by-element which means that
there is no `concurrency` parameter or parallelism. To perform a `reduce`
concurrently, it can be chained to the [`readable.map`][] method.

If no `initial` value is supplied the first chunk of the stream is used as the
initial value. If the stream is empty, the promise is rejected with a
`TypeError` with the `ERR_INVALID_ARGS` code property.

```mjs
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
return previous + data;
});
console.log(ten); // 10
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);

console.log(folderSize);
```

The reducer function iterates the stream element-by-element which means that
there is no `concurrency` parameter or parallelism. To perform a `reduce`
concurrently, you can extract the async function to [`readable.map`][] method.

```mjs
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);

console.log(folderSize);
```

### Duplex and transform streams
Expand Down

0 comments on commit 908798a

Please sign in to comment.