diff --git a/Cargo.toml b/Cargo.toml index 573d27e..a1b647a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,9 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" } maintenance = { status = "passively-maintained" } [dependencies] -atomic-option = "0.1" num_cpus = "1.6.2" -parking_lot_core = "0.7" -crossbeam-channel = "0.4" +parking_lot_core = "0.9" +crossbeam-channel = "0.5" [profile.release] debug = true diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 3a793dd..f457f9c 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,24 +1,23 @@ jobs: - template: default.yml@templates parameters: - minrust: 1.36.0 # MaybeUninit + minrust: 1.49.0 # parking_lot_core - job: miri displayName: "Run miri on test suite" pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-latest steps: - - template: install-rust.yml@templates - parameters: - rust: nightly - components: - - miri + - bash: rustup toolchain install nightly --component miri && rustup default nightly + displayName: install rust # ignore leaks due to https://github.com/crossbeam-rs/crossbeam/issues/464 - - bash: yes | cargo miri -Zmiri-ignore-leaks test + # disable preemption due to https://github.com/rust-lang/rust/issues/55005 + # disable weak memory emulation due to https://github.com/rust-lang/miri/issues/2223 + - bash: MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks -Zmiri-preemption-rate=0 -Zmiri-disable-weak-memory-emulation" cargo miri test displayName: cargo miri test - job: asan displayName: "Run address sanitizer on test suite" pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-latest steps: - template: install-rust.yml@templates parameters: @@ -33,7 +32,7 @@ jobs: - job: lsan displayName: "Run leak sanitizer on test suite" pool: - vmImage: ubuntu-16.04 + vmImage: ubuntu-latest steps: - template: install-rust.yml@templates parameters: @@ -58,5 +57,5 @@ resources: - repository: templates type: github name: crate-ci/azure-pipelines - ref: refs/heads/v0.3 + ref: refs/heads/v0.4 endpoint: jonhoo diff --git a/src/lib.rs b/src/lib.rs index 3016c71..bf5618d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ //! Multi-send, multi-consumer example //! //! ```rust +//! # if cfg!(miri) { return } // Miri is too slow //! use bus::Bus; //! use std::thread; //! @@ -99,12 +100,13 @@ #![deny(missing_docs)] #![warn(rust_2018_idioms)] -use atomic_option::AtomicOption; use crossbeam_channel as mpsc; use parking_lot_core::SpinWait; use std::cell::UnsafeCell; +use std::marker::PhantomData; use std::ops::Deref; +use std::ptr; use std::sync::atomic; use std::sync::mpsc as std_mpsc; use std::sync::Arc; @@ -189,22 +191,24 @@ impl Seat { // we're the last reader, so we may need to notify the writer there's space in the buf. // can be relaxed, since the acquire at the top already guarantees that we'll see // updates. - waiting = self.waiting.take(atomic::Ordering::Relaxed); + waiting = self.waiting.take(); // since we're the last reader, no-one else will be cloning this value, so we can // safely take a mutable reference, and just take the val instead of cloning it. unsafe { &mut *self.state.get() }.val.take().unwrap() } else { - state + let v = state .val .clone() - .expect("seat that should be occupied was empty") + .expect("seat that should be occupied was empty"); + + // let writer know that we no longer need this item. + // state is no longer safe to access. + #[allow(clippy::drop_ref)] + drop(state); + v }; - // let writer know that we no longer need this item. - // state is no longer safe to access. - #[allow(clippy::drop_ref)] - drop(state); self.read.fetch_add(1, atomic::Ordering::AcqRel); if let Some(t) = waiting { @@ -378,7 +382,7 @@ impl Bus { // no, so block by parking and telling readers to notify on last read self.state.ring[fence] .waiting - .replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed); + .swap(Some(Box::new(thread::current()))); // need the atomic fetch_add to ensure reader threads will see the new .waiting self.state.ring[fence] @@ -416,7 +420,7 @@ impl Bus { let state = unsafe { &mut *next.state.get() }; state.max = readers; state.val = Some(val); - next.waiting.replace(None, atomic::Ordering::Relaxed); + next.waiting.take(); next.read.store(0, atomic::Ordering::Release); } self.rleft[tail] = 0; @@ -817,3 +821,48 @@ impl Iterator for BusIntoIter { self.0.recv().ok() } } + +struct AtomicOption { + ptr: atomic::AtomicPtr, + _marker: PhantomData>>, +} + +unsafe impl Send for AtomicOption {} +unsafe impl Sync for AtomicOption {} + +impl AtomicOption { + fn empty() -> Self { + Self { + ptr: atomic::AtomicPtr::new(ptr::null_mut()), + _marker: PhantomData, + } + } + + fn swap(&self, val: Option>) -> Option> { + let old = match val { + Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel), + // Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr + // will never be dereferenced, there is no need to synchronize the store of a null ptr. + None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire), + }; + if old.is_null() { + None + } else { + // SAFETY: + // - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory. + // - We've checked that old is not null. + // - We do not store invalid pointers other than null in self.ptr. + Some(unsafe { Box::from_raw(old) }) + } + } + + fn take(&self) -> Option> { + self.swap(None) + } +} + +impl Drop for AtomicOption { + fn drop(&mut self) { + drop(self.take()); + } +}