Skip to content

Commit

Permalink
Add alternate implementations of synchronization primitives (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull authored Oct 7, 2022
1 parent 06c9953 commit 8c42b8d
Show file tree
Hide file tree
Showing 8 changed files with 551 additions and 95 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ jobs:
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
- run: cargo test --features portable-atomic
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- name: Run with Loom enabled
run: cargo test --test loom --features loom
env:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom
LOOM_MAX_PREEMPTIONS: 2
- run: rustup target add thumbv7m-none-eabi
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps

Expand All @@ -46,6 +52,7 @@ jobs:
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
- run: cargo build --features portable-atomic
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: rustup target add thumbv7m-none-eabi
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ bench = false

[dependencies]
crossbeam-utils = { version = "0.8.11", default-features = false }
portable-atomic = { version = "0.3", default-features = false, optional = true }

# Enables loom testing. This feature is permanently unstable and the API may
# change at any time.
[target.'cfg(loom)'.dependencies]
loom = { version = "0.5", optional = true }

[[bench]]
name = "bench"
Expand Down
97 changes: 60 additions & 37 deletions src/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use alloc::{boxed::Box, vec::Vec};
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_utils::CachePadded;

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};

/// A slot in a queue.
Expand Down Expand Up @@ -118,9 +120,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Write the value into the slot and update the stamp.
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.value.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Expand All @@ -138,6 +140,10 @@ impl<T> Bounded<T> {
return Err(PushError::Full(value));
}

// Loom complains if there isn't an explicit busy wait here.
#[cfg(loom)]
busy_wait();

tail = self.tail.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
Expand Down Expand Up @@ -181,7 +187,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Read the value from the slot and update the stamp.
let value = unsafe { slot.value.get().read().assume_init() };
let value = slot
.value
.with_mut(|slot| unsafe { slot.read().assume_init() });
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(value);
Expand All @@ -204,6 +212,10 @@ impl<T> Bounded<T> {
}
}

// Loom complains if there isn't a busy-wait here.
#[cfg(loom)]
busy_wait();

head = self.head.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
Expand Down Expand Up @@ -284,37 +296,48 @@ impl<T> Bounded<T> {
impl<T> Drop for Bounded<T> {
fn drop(&mut self) {
// Get the index of the head.
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();

let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
self.buffer.len() - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.buffer.len()
};

// Loop over all slots that hold a value and drop them.
for i in 0..len {
// Compute the index of the next slot holding a value.
let index = if hix + i < self.buffer.len() {
hix + i
} else {
hix + i - self.buffer.len()
};
let Self {
head,
tail,
buffer,
mark_bit,
..
} = self;

// Drop the value in the slot.
let slot = &self.buffer[index];
unsafe {
let value = &mut *slot.value.get();
value.as_mut_ptr().drop_in_place();
}
}
let mark_bit = *mark_bit;

head.with_mut(|&mut head| {
tail.with_mut(|&mut tail| {
let hix = head & (mark_bit - 1);
let tix = tail & (mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
buffer.len() - hix + tix
} else if (tail & !mark_bit) == head {
0
} else {
buffer.len()
};

// Loop over all slots that hold a value and drop them.
for i in 0..len {
// Compute the index of the next slot holding a value.
let index = if hix + i < buffer.len() {
hix + i
} else {
hix + i - buffer.len()
};

// Drop the value in the slot.
let slot = &buffer[index];
slot.value.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
});
});
}
}
31 changes: 16 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@
//!
//! # Features
//!
//! `concurrent-queue` used an `std` default feature. With this feature enabled, this crate will
//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
//!
//! There is also a `portable-atomic` feature, which uses a polyfill from the
//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on
//! single-threaded targets. Note that even with this feature enabled, `concurrent-queue` still
//! requires a global allocator to be available. See the documentation for the
//! [`std::alloc::GlobalAlloc`] trait for more information.
//!
//! [Bounded]: `ConcurrentQueue::bounded()`
//! [Unbounded]: `ConcurrentQueue::unbounded()`
//! [closed]: `ConcurrentQueue::close()`
//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![no_std]
Expand All @@ -44,7 +53,7 @@ extern crate std;

use alloc::boxed::Box;
use core::fmt;
use core::sync::atomic::{self, AtomicUsize, Ordering};
use sync::atomic::{self, AtomicUsize, Ordering};

#[cfg(feature = "std")]
use std::error;
Expand All @@ -53,12 +62,15 @@ use std::panic::{RefUnwindSafe, UnwindSafe};

use crate::bounded::Bounded;
use crate::single::Single;
use crate::sync::busy_wait;
use crate::unbounded::Unbounded;

mod bounded;
mod single;
mod unbounded;

mod sync;

/// A concurrent queue.
///
/// # Examples
Expand Down Expand Up @@ -463,24 +475,13 @@ impl<T> fmt::Display for PushError<T> {
}
}

/// Notify the CPU that we are currently busy-waiting.
#[inline]
fn busy_wait() {
#[cfg(feature = "std")]
std::thread::yield_now();
// Use the deprecated `spin_loop_hint` here in order to
// avoid bumping the MSRV.
#[allow(deprecated)]
#[cfg(not(feature = "std"))]
core::sync::atomic::spin_loop_hint()
}

/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(all(
any(target_arch = "x86", target_arch = "x86_64"),
not(miri)
not(miri),
not(loom)
)) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
Expand Down
27 changes: 18 additions & 9 deletions src/single.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};

const LOCKED: usize = 1 << 0;
Expand Down Expand Up @@ -33,7 +35,9 @@ impl<T> Single<T> {

if state == 0 {
// Write the value and unlock.
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
self.slot.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
self.state.fetch_and(!LOCKED, Ordering::Release);
Ok(())
} else if state & CLOSED != 0 {
Expand All @@ -60,7 +64,9 @@ impl<T> Single<T> {

if prev == state {
// Read the value and unlock.
let value = unsafe { self.slot.get().read().assume_init() };
let value = self
.slot
.with_mut(|slot| unsafe { slot.read().assume_init() });
self.state.fetch_and(!LOCKED, Ordering::Release);
return Ok(value);
}
Expand Down Expand Up @@ -118,11 +124,14 @@ impl<T> Single<T> {
impl<T> Drop for Single<T> {
fn drop(&mut self) {
// Drop the value in the slot.
if *self.state.get_mut() & PUSHED != 0 {
unsafe {
let value = &mut *self.slot.get();
value.as_mut_ptr().drop_in_place();
let Self { state, slot } = self;
state.with_mut(|state| {
if *state & PUSHED != 0 {
slot.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
}
});
}
}
Loading

0 comments on commit 8c42b8d

Please sign in to comment.