-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent polling on async methods #424
Conversation
MethodCallback::Sync(callback) => (callback)(req.id.clone(), params, tx, conn_id), | ||
match self { | ||
MethodCallback::Sync(callback) => { | ||
(callback)(id, params, tx, conn_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just how you have to call a variable that happens to be a function IIRC :D
(at least, I'm sure I've had to do similar before, but right offhand I can't think why.. and you don't have to do the same for closures that you declare... maybe I've just hit it when calling eg (something.variable_that_is_fn)(params)
.. so is it actually needed here?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok cool, this totally makes sense, now, its just calling the callback and passing in the params, I was distracted by the syntax for a second.. Thanks for clarification, definitely overlooked that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of comments/questions, at least one of them unrelaetd to your actual review changes :D
@@ -0,0 +1,132 @@ | |||
// Copyright 2019 Parity Technologies (UK) Ltd. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be updated to 2021? (good reminder; need to add these to telemetry files..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't updated those headers yet, I think it's just a copy-paste thingy.
I guess we should run a script and update all headers in another PR :)
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | ||
// DEALINGS IN THE SOFTWARE. | ||
|
||
//! Utilities for handling async code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Just the one utility by the looks of it :D
pub(crate) struct FutureDriver<F> { | ||
futures: Vec<F>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep thinking that this is quite similar to https://docs.rs/futures/0.3.16/futures/stream/struct.FuturesUnordered.html in some ways, and wonder whether it can be used to simplify some of the code here or not (I had a look at its implementation though; it's way more complicated than I'd have assumed!)?
struct DriverSelect<'a, S, F> { | ||
selector: S, | ||
driver: &'a mut FutureDriver<F>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is the bit that is different from FuturesUnordered
; it's basically FuturesUnordered
+ a "foreground" future that you care about the result of (and may need polling forever).
I guess the "obvious" approach is to spawn the background futures into tasks and just await the "selector", and as you've noted, that's more costly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(which is sortof weird; the advantage of spawning the background future into a task is that it's only woken up once when the waker is called.
In this approach, every future is polled every time any of them are woken up to make progress. So, I'd have assumed that spawning onto separate tasks would actually be faster when you have a bunch of concurrent requests (and generally scale better), but this approach is faster when there are very few concurrent requests (and so the wakeups are less of an issue and the synchronisation cost relatively greater
When I was benchmarking telemetry I ran into a similar sortof thing; selecting just the two tasks (read from websocket and write to websocket) actually had a not negligable cost when one of those things was firing a lot and casuing the other one to be needlessly polled a bunch as a result.)
|
||
while !shutdown.is_closed() { | ||
data.clear(); | ||
|
||
receiver.receive_data(&mut data).await?; | ||
method_executors.select_with(receiver.receive_data(&mut data)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a client has a WS connection open and so receive_data
here is waiting for incoming data from them, we won't loop around and notice that we've been shutdown.is_closed()
is true and we should finish.
So, will shutdown not complete until all open connections from clients have been voluntarily closed?
Should we select
on shutdown + receive_data here so that we can see as soon as we've shutdown and can end this loop (which I think will also end the task spawned above gracefully)? (receive_data isn't cancel safe, but I don't think it matters if we're shutting down anyway.. at least I made the same assumption in telemetry!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks generally good but I'm a little scared of that soketto
is not cancel-safe when running select on the futures. We had some issues with it before such as #154
I think we could either put the soketto::Receiver
in a stream that lives as long the connection or do tokio::spawn
EDIT: Maciej explained to me offline that it's not doing select over the receiver, so ignore this comment, instead when polling the receiver
at the same we check if any other futures can be resolved as I understood it.
F: Future + Unpin, | ||
{ | ||
pub(crate) async fn select_with<S: Future>(&mut self, selector: S) -> S::Output { | ||
tokio::pin!(selector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio is brought in a dependency just for pin
?
is there any difference between tokio::pin
and futures::pin
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a gander, and both tokio::pin
and futures::pin_mut
have identical impls for this usage; the tokio version just supports an alternate usage as well (assign variable to async call, then pin it) that isn't useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions to resolve, but overall I think this is good even if the logic is a bit more complicated to follow.
I benched with #400 and it was slightly faster than master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Apologies for the late response here)
I think the code looks fine modulo the concerns/nits already expressed by others. I don't know that we have enough real-world usage data to actually settle the conversation we had on our call about how the library will be used; the trade-offs of this solution might be exactly what we need, or maybe not. We just don't know.
For the time being I think it addresses a real concern of an actual user and while it complicates the code a bit it's not too bad. So it's a 👍 from me.
It sounds like this benchmarks OK, and while I think it would be really good to have a couple of benchmarks to test this with a large number of concurrent connections/requests (which might be around in another branch; I haven't looked carefully), I'd have no issues with this merging as it is! #424 (comment) still could potentially be an issue (I'd need to look more carefully to see if those connections are handled elsewhere), but if so, it wasn't caused by the work here anyway. |
Fixes #422, alternative to #423.
Tested a couple different options here and settled on this solution.
Everything on a single connection is still single-threaded, but it uses a generic
FutureDriver
(logic extracted out from theConnDriver
) so that multiple futures can be polled at the same time.Performance is on par with current master, there are some minor speedups here and there, and some oddball regressions (mostly in http batch requests, but I reckon there is something else happening there). Additionally it gives us a clear way to count concurrent calls per connection for future throttling.
In case we still want to spawn async methods on tasks, we can use the
JoinHandle
instead of boxed futures to keep the driver lean, though from what I've benched so far spawning tasks is a performance hit to running on single thread without synchronization costs.