-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: Add try_recv
back
#3639
sync: Add try_recv
back
#3639
Conversation
Signed-off-by: Zahari Dichev <[email protected]>
Signed-off-by: Zahari Dichev <[email protected]>
Generally speaking, we try to avoid spinning. It’s going to be tricky to avoid that here though. I’m interested in thoughts on that. |
@carllerche Thought about it a little bit. The way things are setup at the moment, we need to either spin on Another option that I have not spent too much time on is to make the receiver be able to "look ahead". So if there is a slot ahead that has been set as |
tokio/src/sync/mpsc/chan.rs
Outdated
return Ok(value); | ||
} | ||
Some(Closed) => return Err(TryRecvError::Closed), | ||
Some(NotReady) => thread::yield_now(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be using std::hint::spin_loop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I borrowed it from https://github.com/tokio-rs/tokio/blob/master/tokio/src/sync/mpsc/block.rs#L348. It has a comment that convinced me this is the right thing to use. But is this comment actually correct? Unclear. Any thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what about this bit of docs: https://docs.rs/loom/0.4.0/loom/#yielding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment you linked appears to be wrong, as the non-loom version just re-exports std::thread
. It also has a comment on MSRV.
Signed-off-by: Zahari Dichev <[email protected]>
How does it go? |
The PR is relatively complicated, and I haven't found the time to read it thoroughly. |
None => { | ||
// The compare-and-swap succeeded and the newly allocated block | ||
// is successfully pushed. | ||
self.set_has_next(); | ||
return new_block; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thread could see the compare and swap before this set_has_next
has completed. Is that a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Darksonn Trying to page all of that back as it has been a while. I had a thought about that when I was writing it and I think it is ok. I think what happens in the situation that you are describing is that for example if you have two blocks such:
B1 B2
| a | b | c | _ | _ | -------> | _ | _ | _ | _ |
If we are trying to read at the second to last slot of B1 and a value is being written at the first slot of B2 and the CAS has succeeded and the thread reading B1 observes that, this means that we have a block but a value will not be written to this block before the current one is marked as having a next block. In this case, we will receive an Empty error, which will technically be correct because the value is still not there. Does that sound correct or I am making things up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the consequence of this is that it reports Empty
if a send is happening concurrently, then that's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Darksonn is that PR still relevant? Is there plan to merge it at some point. If not we can close it I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do want to add back try_recv
, but I'm not sufficiently familiar with the internals of the mpsc channel to properly review it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, makes sense. Thanks for looking at it nevertheless
// Notify the rx tasks | ||
self.rx_waker.wake(); | ||
self.not_ready_waker.wake(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other places that wake rx_waker
. Should the other places not be updated as well?
run(&ctx); | ||
|
||
for th in ths { | ||
th.join().unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
th.join().unwrap() | |
th.join().unwrap(); |
/// Marks the block as having a next one | ||
fn set_has_next(&self) { | ||
self.ready_slots.fetch_or(HAS_NEXT, Release); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You call this in grow
, but not in try_push
. That seems incorrect to me.
Thank you for doing this PR. We did not end up accepting it because it was complicated and a simpler solution was available, but the solution that just got merged used several ideas from your PR, and reviewing your PR was helpful in coming up with the solution. |
Signed-off-by: Zahari Dichev [email protected]
Motivation
Some time ago, the
Receiver::try_recv
was removed, due to users experiencingrace conditions when consuming from the chan. This problem caused consumers
to receive a
TryRecvError::Empty
even though a write to the chan has been complete.Solution
The solution in this PR is to destinguish between a slot in the concurrent
block list not being ready and the list being empty. When the chan is not
empty and a slot is still not ready to be consumed, we spin in
try_recv
.Fix: #3350