Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channel: Remove ptr-to-int casts #764

Merged
merged 1 commit into from
Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ci/miri.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ MIRIFLAGS="-Zmiri-disable-isolation" \
-p crossbeam-utils

# -Zmiri-ignore-leaks is needed because we use detached threads in tests/docs: https://github.com/rust-lang/miri/issues/1371
# When enable -Zmiri-tag-raw-pointers, miri reports stacked borrows violation: https://github.com/crossbeam-rs/crossbeam/issues/762
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks" \
MIRIFLAGS="-Zmiri-tag-raw-pointers -Zmiri-disable-isolation -Zmiri-ignore-leaks" \
cargo miri test \
-p crossbeam-channel

Expand Down
18 changes: 10 additions & 8 deletions crossbeam-channel/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Thread-local context used in select.

use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread, ThreadId};
use std::time::Instant;
Expand All @@ -11,6 +12,7 @@ use crossbeam_utils::Backoff;
use crate::select::Selected;

/// Thread-local context used in select.
// This is a private API that is used by the select macro.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
Expand All @@ -23,7 +25,7 @@ struct Inner {
select: AtomicUsize,

/// A slot into which another thread may store a pointer to its `Packet`.
packet: AtomicUsize,
packet: AtomicPtr<()>,

/// Thread handle.
thread: Thread,
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
packet: AtomicUsize::new(0),
packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread_id: thread::current().id(),
}),
Expand All @@ -82,7 +84,7 @@ impl Context {
self.inner
.select
.store(Selected::Waiting.into(), Ordering::Release);
self.inner.packet.store(0, Ordering::Release);
self.inner.packet.store(ptr::null_mut(), Ordering::Release);
}

/// Attempts to select an operation.
Expand Down Expand Up @@ -112,19 +114,19 @@ impl Context {
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
pub fn store_packet(&self, packet: usize) {
if packet != 0 {
pub fn store_packet(&self, packet: *mut ()) {
if !packet.is_null() {
self.inner.packet.store(packet, Ordering::Release);
}
}

/// Waits until a packet is provided and returns it.
#[inline]
pub fn wait_packet(&self) -> usize {
pub fn wait_packet(&self) -> *mut () {
let backoff = Backoff::new();
loop {
let packet = self.inner.packet.load(Ordering::Acquire);
if packet != 0 {
if !packet.is_null() {
return packet;
}
backoff.snooze();
Expand Down
63 changes: 39 additions & 24 deletions crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
use std::{fmt, ptr};

use crossbeam_utils::Backoff;

Expand All @@ -16,7 +17,19 @@ use crate::utils::Spinlock;
use crate::waker::Waker;

/// A pointer to a packet.
pub(crate) type ZeroToken = usize;
pub struct ZeroToken(*mut ());

impl Default for ZeroToken {
fn default() -> Self {
Self(ptr::null_mut())
}
}

impl fmt::Debug for ZeroToken {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&(self.0 as usize), f)
}
}

/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
Expand Down Expand Up @@ -117,10 +130,10 @@ impl<T> Channel<T> {

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
token.zero = 0;
token.zero.0 = ptr::null_mut();
true
} else {
false
Expand All @@ -130,11 +143,11 @@ impl<T> Channel<T> {
/// Writes a message into the packet.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
if token.zero == 0 {
if token.zero.0.is_null() {
return Err(msg);
}

let packet = &*(token.zero as *const Packet<T>);
let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
Expand All @@ -146,10 +159,10 @@ impl<T> Channel<T> {

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
token.zero = 0;
token.zero.0 = ptr::null_mut();
true
} else {
false
Expand All @@ -159,11 +172,11 @@ impl<T> Channel<T> {
/// Reads a message from the packet.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
if token.zero == 0 {
if token.zero.0.is_null() {
return Err(());
}

let packet = &*(token.zero as *const Packet<T>);
let packet = &*(token.zero.0 as *const Packet<T>);

if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
Expand All @@ -177,7 +190,7 @@ impl<T> Channel<T> {
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
Copy link
Member Author

@taiki-e taiki-e Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another stacked borrows violation that was fixed by this PR is:

error: Undefined Behavior: trying to reborrow for Unique at alloc5094412+0x8, but parent tag <14690213> does not have an appropriate item in the borrow stack
    --> /Users/taiki/.rustup/toolchains/nightly-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:977:9
     |
977  |         Box(unsafe { Unique::new_unchecked(raw) }, alloc)
     |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ trying to reborrow for Unique at alloc5094412+0x8, but parent tag <14690213> does not have an appropriate item in the borrow stack
     |
     = help: this indicates a potential bug in the program: it performed an invalid operation, but the rules it violated are still experimental
     = help: see https://github.com/rust-lang/unsafe-code-guidelines/blob/master/wip/stacked-borrows.md for further information
             
     = note: inside `std::boxed::Box::<crossbeam_channel::flavors::zero::Packet<i32>>::from_raw_in` at /Users/taiki/.rustup/toolchains/nightly-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:977:9
     = note: inside `std::boxed::Box::<crossbeam_channel::flavors::zero::Packet<i32>>::from_raw` at /Users/taiki/.rustup/toolchains/nightly-x86_64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/boxed.rs:921:18
     = note: inside `crossbeam_channel::flavors::zero::Channel::<i32>::read` at /Users/taiki/projects/crossbeam/crossbeam-channel/src/flavors/zero.rs:193:18
     = note: inside `crossbeam_channel::flavors::zero::Channel::<i32>::recv` at /Users/taiki/projects/crossbeam/crossbeam-channel/src/flavors/zero.rs:302:24
     = note: inside `crossbeam_channel::Receiver::<i32>::recv` at /Users/taiki/projects/crossbeam/crossbeam-channel/src/channel.rs:802:43
note: inside `Chan::<i32>::recv` at crossbeam-channel/tests/golang.rs:64:9
    --> crossbeam-channel/tests/golang.rs:64:9
     |
64   |         r.recv().ok()
     |         ^^^^^^^^
note: inside closure at crossbeam-channel/tests/golang.rs:1420:28
    --> crossbeam-channel/tests/golang.rs:1420:28
     |
1420 |                     l[i] = c.recv().unwrap();
     |                            ^

}
}
Expand All @@ -189,7 +202,7 @@ impl<T> Channel<T> {

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
Expand All @@ -213,7 +226,7 @@ impl<T> Channel<T> {

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
Expand All @@ -228,10 +241,10 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
let packet = Packet::<T>::message_on_stack(msg);
let mut packet = Packet::<T>::message_on_stack(msg);
inner
.senders
.register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);

Expand Down Expand Up @@ -266,7 +279,7 @@ impl<T> Channel<T> {

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
Expand All @@ -283,7 +296,7 @@ impl<T> Channel<T> {

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero = operation.packet;
token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
Expand All @@ -297,10 +310,12 @@ impl<T> Channel<T> {
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
let packet = Packet::<T>::empty_on_stack();
inner
.receivers
.register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
let mut packet = Packet::<T>::empty_on_stack();
inner.receivers.register_with_packet(
oper,
&mut packet as *mut Packet<T> as *mut (),
cx,
);
inner.senders.notify();
drop(inner);

Expand Down Expand Up @@ -385,7 +400,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
let mut inner = self.0.inner.lock();
inner
.receivers
.register_with_packet(oper, packet as usize, cx);
.register_with_packet(oper, packet as *mut (), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
Expand All @@ -399,7 +414,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}

fn accept(&self, token: &mut Token, cx: &Context) -> bool {
token.zero = cx.wait_packet();
token.zero.0 = cx.wait_packet();
true
}

Expand Down Expand Up @@ -435,7 +450,7 @@ impl<T> SelectHandle for Sender<'_, T> {
let mut inner = self.0.inner.lock();
inner
.senders
.register_with_packet(oper, packet as usize, cx);
.register_with_packet(oper, packet as *mut (), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
Expand All @@ -449,7 +464,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}

fn accept(&self, token: &mut Token, cx: &Context) -> bool {
token.zero = cx.wait_packet();
token.zero.0 = cx.wait_packet();
true
}

Expand Down
5 changes: 5 additions & 0 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::utils;
/// `read` or `write`.
///
/// Each field contains data associated with a specific channel flavor.
// This is a private API that is used by the select macro.
#[derive(Debug, Default)]
pub struct Token {
pub at: flavors::at::AtToken,
Expand Down Expand Up @@ -93,6 +94,7 @@ impl Into<usize> for Selected {
///
/// This is a handle that assists select in executing an operation, registration, deciding on the
/// appropriate deadline for blocking, etc.
// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
pub trait SelectHandle {
/// Attempts to select an operation and returns `true` on success.
fn try_select(&self, token: &mut Token) -> bool;
Expand Down Expand Up @@ -442,6 +444,7 @@ fn run_ready(
}

/// Attempts to select one of the operations without blocking.
// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
Expand All @@ -458,6 +461,7 @@ pub fn try_select<'a>(
}

/// Blocks until one of the operations becomes ready and selects it.
// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
Expand All @@ -476,6 +480,7 @@ pub fn select<'a>(
}

/// Blocks for a limited time until one of the operations becomes ready and selects it.
// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
#[inline]
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
Expand Down
9 changes: 5 additions & 4 deletions crossbeam-channel/src/waker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Waking mechanism for threads blocked on channel operations.

use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, ThreadId};

Expand All @@ -13,7 +14,7 @@ pub(crate) struct Entry {
pub(crate) oper: Operation,

/// Optional packet.
pub(crate) packet: usize,
pub(crate) packet: *mut (),

/// Context associated with the thread owning this operation.
pub(crate) cx: Context,
Expand Down Expand Up @@ -44,12 +45,12 @@ impl Waker {
/// Registers a select operation.
#[inline]
pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
self.register_with_packet(oper, 0, cx);
self.register_with_packet(oper, ptr::null_mut(), cx);
}

/// Registers a select operation and a packet.
#[inline]
pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
self.selectors.push(Entry {
oper,
packet,
Expand Down Expand Up @@ -117,7 +118,7 @@ impl Waker {
pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
self.observers.push(Entry {
oper,
packet: 0,
packet: ptr::null_mut(),
cx: cx.clone(),
});
}
Expand Down