Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete stream write ChannelFuture on the actual write of a Muxer frame #316

Open
Nashatyrev opened this issue Sep 12, 2023 · 3 comments
Open

Comments

@Nashatyrev
Copy link
Collaborator

The issue closely relates to #282

The problem:

When writing anything to a stream [child] Channel there is an associated ChannelFuture (either passed to or returned from the corresponding write() method).
That ChannelFuture should be completed when the associated message buffer is flushed to the wire. In other words when the associated muxer frame(s) write operation is completed. In yet other words when any buffer(s) associated with the message are released.

That allows various backpressure mechanisms to function correctly on the client side

Currently a stream write ChannelFuture is completed earlier here:


So it happens effectively right upon a stream Channel.flush() call. Thus even when the client code is respecting backpressure, i.e. writes the next chunk of data (e.g. writes the next Ethereum block in a batch) only after the previous write was completed, it could still overflow write buffers.

E.g. for the mplex muxer internal Netty write buffers are filled up if the remote reader is slower than local writer
E.g. for the yamux muxer it's internal stream write buffer is filled up

@Nashatyrev
Copy link
Collaborator Author

The fix complicated by the following:

override fun doWrite(buf: ChannelOutboundBuffer) {
while (true) {
val msg = buf.current() ?: break
try {
if (localDisconnected) {
throw ConnectionClosedException("The stream was closed for writing locally: $id")
}
// the msg is released by both onChildWrite and buf.remove() so we need to retain
// however it is still to be confirmed that no buf leaks happen here TODO
ReferenceCountUtil.retain(msg)
@Suppress("UNCHECKED_CAST")
parent.onChildWrite(this, msg as TData)
buf.remove()
} catch (cause: Throwable) {
buf.remove(cause)
}
}
}

doWrite() is the only entry point in the AbstractChannel supposed to handle flush operation. It receives ChannelOutboundBuffer where the only way to get the next buffer is calling remove() which in turn completes corresponding write operation ChannelFuture

Not sure if the AbstractChannel design is suitable for child channel usage scenarios at all

@Nashatyrev
Copy link
Collaborator Author

Another potential problem:
Channel.Unsafe.outboundBuffer() returns ChannelOutboundBuffer which is a final class.

Nashatyrev added a commit to Nashatyrev/jvm-libp2p that referenced this issue Sep 12, 2023
@Nashatyrev
Copy link
Collaborator Author

Test and draft attempt trying to address the issue: ad6c2d3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant