diff --git a/Cargo.toml b/Cargo.toml index 907bcfe..a1b647a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" } maintenance = { status = "passively-maintained" } [dependencies] -atomic-option = "0.1" num_cpus = "1.6.2" parking_lot_core = "0.9" crossbeam-channel = "0.5" diff --git a/src/lib.rs b/src/lib.rs index bd3247b..bf5618d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,12 +100,13 @@ #![deny(missing_docs)] #![warn(rust_2018_idioms)] -use atomic_option::AtomicOption; use crossbeam_channel as mpsc; use parking_lot_core::SpinWait; use std::cell::UnsafeCell; +use std::marker::PhantomData; use std::ops::Deref; +use std::ptr; use std::sync::atomic; use std::sync::mpsc as std_mpsc; use std::sync::Arc; @@ -190,7 +191,7 @@ impl Seat { // we're the last reader, so we may need to notify the writer there's space in the buf. // can be relaxed, since the acquire at the top already guarantees that we'll see // updates. - waiting = self.waiting.take(atomic::Ordering::Relaxed); + waiting = self.waiting.take(); // since we're the last reader, no-one else will be cloning this value, so we can // safely take a mutable reference, and just take the val instead of cloning it. @@ -381,7 +382,7 @@ impl Bus { // no, so block by parking and telling readers to notify on last read self.state.ring[fence] .waiting - .replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed); + .swap(Some(Box::new(thread::current()))); // need the atomic fetch_add to ensure reader threads will see the new .waiting self.state.ring[fence] @@ -419,7 +420,7 @@ impl Bus { let state = unsafe { &mut *next.state.get() }; state.max = readers; state.val = Some(val); - next.waiting.replace(None, atomic::Ordering::Relaxed); + next.waiting.take(); next.read.store(0, atomic::Ordering::Release); } self.rleft[tail] = 0; @@ -820,3 +821,48 @@ impl Iterator for BusIntoIter { self.0.recv().ok() } } + +struct AtomicOption { + ptr: atomic::AtomicPtr, + _marker: PhantomData>>, +} + +unsafe impl Send for AtomicOption {} +unsafe impl Sync for AtomicOption {} + +impl AtomicOption { + fn empty() -> Self { + Self { + ptr: atomic::AtomicPtr::new(ptr::null_mut()), + _marker: PhantomData, + } + } + + fn swap(&self, val: Option>) -> Option> { + let old = match val { + Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel), + // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr + // will never be dereferenced, there is no need to synchronize the store of a null ptr. + None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire), + }; + if old.is_null() { + None + } else { + // SAFETY: + // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory. + // - We've checked that old is not null. + // - We do not store invalid pointers other than null in self.ptr. + Some(unsafe { Box::from_raw(old) }) + } + } + + fn take(&self) -> Option> { + self.swap(None) + } +} + +impl Drop for AtomicOption { + fn drop(&mut self) { + drop(self.take()); + } +}