Skip to content

Commit

Permalink
Merge pull request #29 from taiki-e/deps
Browse files Browse the repository at this point in the history
Update dependencies and fix unsoundness and CI failure
  • Loading branch information
jonhoo authored Jun 19, 2022
2 parents ca8b16f + 5d85f31 commit 6e41fb6
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 24 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" }
maintenance = { status = "passively-maintained" }

[dependencies]
atomic-option = "0.1"
num_cpus = "1.6.2"
parking_lot_core = "0.7"
crossbeam-channel = "0.4"
parking_lot_core = "0.9"
crossbeam-channel = "0.5"

[profile.release]
debug = true
21 changes: 10 additions & 11 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
jobs:
- template: default.yml@templates
parameters:
minrust: 1.36.0 # MaybeUninit
minrust: 1.49.0 # parking_lot_core
- job: miri
displayName: "Run miri on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
rust: nightly
components:
- miri
- bash: rustup toolchain install nightly --component miri && rustup default nightly
displayName: install rust
# ignore leaks due to https://github.com/crossbeam-rs/crossbeam/issues/464
- bash: yes | cargo miri -Zmiri-ignore-leaks test
# disable preemption due to https://github.com/rust-lang/rust/issues/55005
# disable weak memory emulation due to https://github.com/rust-lang/miri/issues/2223
- bash: MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-ignore-leaks -Zmiri-preemption-rate=0 -Zmiri-disable-weak-memory-emulation" cargo miri test
displayName: cargo miri test
- job: asan
displayName: "Run address sanitizer on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
Expand All @@ -33,7 +32,7 @@ jobs:
- job: lsan
displayName: "Run leak sanitizer on test suite"
pool:
vmImage: ubuntu-16.04
vmImage: ubuntu-latest
steps:
- template: install-rust.yml@templates
parameters:
Expand All @@ -58,5 +57,5 @@ resources:
- repository: templates
type: github
name: crate-ci/azure-pipelines
ref: refs/heads/v0.3
ref: refs/heads/v0.4
endpoint: jonhoo
69 changes: 59 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
//! Multi-send, multi-consumer example
//!
//! ```rust
//! # if cfg!(miri) { return } // Miri is too slow
//! use bus::Bus;
//! use std::thread;
//!
Expand Down Expand Up @@ -99,12 +100,13 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

use atomic_option::AtomicOption;
use crossbeam_channel as mpsc;
use parking_lot_core::SpinWait;

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic;
use std::sync::mpsc as std_mpsc;
use std::sync::Arc;
Expand Down Expand Up @@ -189,22 +191,24 @@ impl<T: Clone + Sync> Seat<T> {
// we're the last reader, so we may need to notify the writer there's space in the buf.
// can be relaxed, since the acquire at the top already guarantees that we'll see
// updates.
waiting = self.waiting.take(atomic::Ordering::Relaxed);
waiting = self.waiting.take();

// since we're the last reader, no-one else will be cloning this value, so we can
// safely take a mutable reference, and just take the val instead of cloning it.
unsafe { &mut *self.state.get() }.val.take().unwrap()
} else {
state
let v = state
.val
.clone()
.expect("seat that should be occupied was empty")
.expect("seat that should be occupied was empty");

// let writer know that we no longer need this item.
// state is no longer safe to access.
#[allow(clippy::drop_ref)]
drop(state);
v
};

// let writer know that we no longer need this item.
// state is no longer safe to access.
#[allow(clippy::drop_ref)]
drop(state);
self.read.fetch_add(1, atomic::Ordering::AcqRel);

if let Some(t) = waiting {
Expand Down Expand Up @@ -378,7 +382,7 @@ impl<T> Bus<T> {
// no, so block by parking and telling readers to notify on last read
self.state.ring[fence]
.waiting
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed);
.swap(Some(Box::new(thread::current())));

// need the atomic fetch_add to ensure reader threads will see the new .waiting
self.state.ring[fence]
Expand Down Expand Up @@ -416,7 +420,7 @@ impl<T> Bus<T> {
let state = unsafe { &mut *next.state.get() };
state.max = readers;
state.val = Some(val);
next.waiting.replace(None, atomic::Ordering::Relaxed);
next.waiting.take();
next.read.store(0, atomic::Ordering::Release);
}
self.rleft[tail] = 0;
Expand Down Expand Up @@ -817,3 +821,48 @@ impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
self.0.recv().ok()
}
}

struct AtomicOption<T> {
ptr: atomic::AtomicPtr<T>,
_marker: PhantomData<Option<Box<T>>>,
}

unsafe impl<T: Send> Send for AtomicOption<T> {}
unsafe impl<T: Send> Sync for AtomicOption<T> {}

impl<T> AtomicOption<T> {
fn empty() -> Self {
Self {
ptr: atomic::AtomicPtr::new(ptr::null_mut()),
_marker: PhantomData,
}
}

fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = match val {
Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel),
// Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
// will never be dereferenced, there is no need to synchronize the store of a null ptr.
None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire),
};
if old.is_null() {
None
} else {
// SAFETY:
// - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
// - We've checked that old is not null.
// - We do not store invalid pointers other than null in self.ptr.
Some(unsafe { Box::from_raw(old) })
}
}

fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}

impl<T> Drop for AtomicOption<T> {
fn drop(&mut self) {
drop(self.take());
}
}

0 comments on commit 6e41fb6

Please sign in to comment.