Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple readers to iterate over a stream at the same time #52086

Open
ehmicky opened this issue Mar 14, 2024 · 9 comments
Open

Allow multiple readers to iterate over a stream at the same time #52086

ehmicky opened this issue Mar 14, 2024 · 9 comments
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.

Comments

@ehmicky
Copy link

ehmicky commented Mar 14, 2024

What is the problem this feature will solve?

When multiple callers use Readable[Symbol.asyncIterator], each caller receives a partial result.

import {Readable} from 'node:stream'

const stream = Readable.from(['a', 'b', 'c'])

const iterate = async (readerName) => {
  for await (const chunk of stream) {
    console.log(readerName, chunk)
  }
}

await Promise.all([
  iterate('one'),
  iterate('two'),
])
one a
two b
one c

Some callers might expect that result. But others might expect the following result instead.

one a
two a
one b
two b
one c
two c

What is the feature you are proposing to solve the problem?

Adding an option to readable.iterator() to use readable.on('data') instead of readable.read(). This would enable the above behavior.

What alternatives have you considered?

Implementing this user-land.

See an example of it at sindresorhus/get-stream#121

@ehmicky ehmicky added the feature request Issues that request new features to be added to Node.js. label Mar 14, 2024
@juanarbol juanarbol added the stream Issues and PRs related to the stream subsystem. label Mar 14, 2024
@jakecastelli
Copy link
Member

Hi @ehmicky, I had a look at your implementation and realised you took the effort to make it work with both node 18 and 20, I think if this is implemented in nodejs core it shouldn't have any backward compatibility concern right (since it was a breaking change)? Correct me if I am wrong.

@ehmicky
Copy link
Author

ehmicky commented Jun 29, 2024

Hi @jakecastelli,

The implementation in get-stream works with Node 18, 20 and 22. However, it would have some subtle differences and breaking changes from the current implementation in Node.js.

  • The stream is in flowing mode
  • Error handling and stream termination detection might behave slightly differently, since it relies on events on() instead of using stream finished(). For example, multiple errors are currently aggregated.
  • Different event listeners are setup on the stream, which might be breaking if some users are directly manipulating stream event listeners of specific types.
  • When stream.destroy() is called might be slightly different. Also, it does not take into account any destroyOnReturn option nor whether autoDestroy is set.

Overall, IMHO, it would be safer to keep the current Node.js implementation, and provide this new behavior as an opt-in, for backward compatibility. For example, using a boolean option to stream.iterator().


As a side note, please note that the implementation in get-stream does not pass the highWaterMark option to events.on() (see sindresorhus/get-stream#125). That's because get-stream consumes the iterable synchronously and right away.

However, since events.on() buffers incoming chunks, it might be a good idea to allow the user to specify the highWaterMark option, in case they are consuming the iterable too slowly. This will automatically pause/resume the stream in order to prevent the buffer from leaking memory. This is different from stream.readableHighWaterMark: it is measured in number of chunks (not number of bytes), and it paces events.on() (not stream.on('data')). This might presumably be confusing to some users, so I am not sure whether a different name would help clear that confusion.

@mycoleb
Copy link

mycoleb commented Jul 1, 2024

My team is claiming this issue.

@MoLow
Copy link
Member

MoLow commented Jul 1, 2024

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

@benjamingr
Copy link
Member

@mycoleb your what is what?

@benjamingr
Copy link
Member

@nodejs/streams should Readable have a tee() helper/method like webstream readable?

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

@MoLow
Copy link
Member

MoLow commented Jul 1, 2024

Maybe? It's just tricky to get semantics right namely backpressure/watermarking and cleanup/resources. A tee'd stream should get destroyed when all its forks get destroyed, it should communicate backpressure etc - and we need to decide what happens when one "fork" is flowing and one is not

I didn't say it is simple, but this is a very common need

@benjamingr
Copy link
Member

Yeah my point is we shouldn't land an implementation that doesn't deal with these things

@mcollina
Copy link
Member

mcollina commented Jul 1, 2024

Unfortunately, using on('data') would be complex and prone to errors due to backpressure requirements. Using .read() and on('readable') simplified quite a lot of that handling.

A proper implementation of this is exceptionally hard.
https://www.npmjs.com/package/cloneable-readable works, but it's complex, and I couldn't compress it down in a nice API like .tee() without adding more state varibles.

I'm not opposed to add cloneable-readable to Node.js core, or moving it to the Node.js org, as I always considered it as part of my activities in Node.js.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.
Projects
Status: Awaiting Triage
Development

No branches or pull requests

7 participants