Skip to content

Commit

Permalink
Merge #835
Browse files Browse the repository at this point in the history
835: channel: Replace Spinlock with Mutex r=ibraheemdev a=taiki-e

Addresses one of `@thomcc's` reviews in rust-lang/rust#93563 (comment) (sorry for the late response!)

Historically, the use of spinlock was introduced when the dependency on parking_lot was removed (5208895).
However, given that it is used in Waker, which includes vectors that may be reallocated, using mutex seems to be the right choice here.

r? `@ibraheemdev`
bors d=ibraheemdev 

Co-authored-by: Taiki Endo <[email protected]>
  • Loading branch information
bors[bot] and taiki-e authored May 23, 2022
2 parents 80224bc + 89cf973 commit 78b7ac3
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 88 deletions.
58 changes: 34 additions & 24 deletions crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Instant;
use std::{fmt, ptr};

Expand All @@ -13,7 +14,6 @@ use crossbeam_utils::Backoff;
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::utils::Spinlock;
use crate::waker::Waker;

/// A pointer to a packet.
Expand Down Expand Up @@ -95,7 +95,7 @@ struct Inner {
/// Zero-capacity channel.
pub(crate) struct Channel<T> {
/// Inner representation of the channel.
inner: Spinlock<Inner>,
inner: Mutex<Inner>,

/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
Expand All @@ -105,7 +105,7 @@ impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
pub(crate) fn new() -> Self {
Channel {
inner: Spinlock::new(Inner {
inner: Mutex::new(Inner {
senders: Waker::new(),
receivers: Waker::new(),
is_disconnected: false,
Expand All @@ -126,7 +126,7 @@ impl<T> Channel<T> {

/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
Expand Down Expand Up @@ -155,7 +155,7 @@ impl<T> Channel<T> {

/// Attempts to pair up with a sender.
fn start_recv(&self, token: &mut Token) -> bool {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<T> Channel<T> {
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
Expand All @@ -222,7 +222,7 @@ impl<T> Channel<T> {
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
Expand Down Expand Up @@ -254,12 +254,12 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner.lock().senders.unregister(oper).unwrap();
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Timeout(msg))
}
Selected::Disconnected => {
self.inner.lock().senders.unregister(oper).unwrap();
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Disconnected(msg))
}
Expand All @@ -275,7 +275,7 @@ impl<T> Channel<T> {
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
Expand All @@ -292,7 +292,7 @@ impl<T> Channel<T> {
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
Expand Down Expand Up @@ -325,11 +325,21 @@ impl<T> Channel<T> {
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner.lock().receivers.unregister(oper).unwrap();
self.inner
.lock()
.unwrap()
.receivers
.unregister(oper)
.unwrap();
Err(RecvTimeoutError::Timeout)
}
Selected::Disconnected => {
self.inner.lock().receivers.unregister(oper).unwrap();
self.inner
.lock()
.unwrap()
.receivers
.unregister(oper)
.unwrap();
Err(RecvTimeoutError::Disconnected)
}
Selected::Operation(_) => {
Expand All @@ -345,7 +355,7 @@ impl<T> Channel<T> {
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();

if !inner.is_disconnected {
inner.is_disconnected = true;
Expand Down Expand Up @@ -396,7 +406,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());

let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner
.receivers
.register_with_packet(oper, packet as *mut (), cx);
Expand All @@ -405,7 +415,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
}

fn unregister(&self, oper: Operation) {
if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet as *mut Packet<T>));
}
Expand All @@ -418,18 +428,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
}

fn is_ready(&self) -> bool {
let inner = self.0.inner.lock();
let inner = self.0.inner.lock().unwrap();
inner.senders.can_select() || inner.is_disconnected
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner.receivers.watch(oper, cx);
inner.senders.can_select() || inner.is_disconnected
}

fn unwatch(&self, oper: Operation) {
let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner.receivers.unwatch(oper);
}
}
Expand All @@ -446,7 +456,7 @@ impl<T> SelectHandle for Sender<'_, T> {
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());

let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner
.senders
.register_with_packet(oper, packet as *mut (), cx);
Expand All @@ -455,7 +465,7 @@ impl<T> SelectHandle for Sender<'_, T> {
}

fn unregister(&self, oper: Operation) {
if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet as *mut Packet<T>));
}
Expand All @@ -468,18 +478,18 @@ impl<T> SelectHandle for Sender<'_, T> {
}

fn is_ready(&self) -> bool {
let inner = self.0.inner.lock();
let inner = self.0.inner.lock().unwrap();
inner.receivers.can_select() || inner.is_disconnected
}

fn watch(&self, oper: Operation, cx: &Context) -> bool {
let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner.senders.watch(oper, cx);
inner.receivers.can_select() || inner.is_disconnected
}

fn unwatch(&self, oper: Operation) {
let mut inner = self.0.inner.lock();
let mut inner = self.0.inner.lock().unwrap();
inner.senders.unwatch(oper);
}
}
56 changes: 1 addition & 55 deletions crossbeam-channel/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
//! Miscellaneous utilities.
use std::cell::{Cell, UnsafeCell};
use std::cell::Cell;
use std::num::Wrapping;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};

use crossbeam_utils::Backoff;

/// Randomly shuffles a slice.
pub(crate) fn shuffle<T>(v: &mut [T]) {
let len = v.len();
Expand Down Expand Up @@ -68,53 +64,3 @@ pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant {
None => Instant::now() + Duration::from_secs(86400 * 365 * 30),
}
}

/// A simple spinlock.
pub(crate) struct Spinlock<T> {
flag: AtomicBool,
value: UnsafeCell<T>,
}

impl<T> Spinlock<T> {
/// Returns a new spinlock initialized with `value`.
pub(crate) fn new(value: T) -> Spinlock<T> {
Spinlock {
flag: AtomicBool::new(false),
value: UnsafeCell::new(value),
}
}

/// Locks the spinlock.
pub(crate) fn lock(&self) -> SpinlockGuard<'_, T> {
let backoff = Backoff::new();
while self.flag.swap(true, Ordering::Acquire) {
backoff.snooze();
}
SpinlockGuard { parent: self }
}
}

/// A guard holding a spinlock locked.
pub(crate) struct SpinlockGuard<'a, T> {
parent: &'a Spinlock<T>,
}

impl<T> Drop for SpinlockGuard<'_, T> {
fn drop(&mut self) {
self.parent.flag.store(false, Ordering::Release);
}
}

impl<T> Deref for SpinlockGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.parent.value.get() }
}
}

impl<T> DerefMut for SpinlockGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.parent.value.get() }
}
}
18 changes: 9 additions & 9 deletions crossbeam-channel/src/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::thread::{self, ThreadId};

use crate::context::Context;
use crate::select::{Operation, Selected};
use crate::utils::Spinlock;

/// Represents a thread blocked on a specific channel operation.
pub(crate) struct Entry {
Expand Down Expand Up @@ -176,7 +176,7 @@ impl Drop for Waker {
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
pub(crate) struct SyncWaker {
/// The inner `Waker`.
inner: Spinlock<Waker>,
inner: Mutex<Waker>,

/// `true` if the waker is empty.
is_empty: AtomicBool,
Expand All @@ -187,15 +187,15 @@ impl SyncWaker {
#[inline]
pub(crate) fn new() -> Self {
SyncWaker {
inner: Spinlock::new(Waker::new()),
inner: Mutex::new(Waker::new()),
is_empty: AtomicBool::new(true),
}
}

/// Registers the current thread with an operation.
#[inline]
pub(crate) fn register(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
inner.register(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Expand All @@ -206,7 +206,7 @@ impl SyncWaker {
/// Unregisters an operation previously registered by the current thread.
#[inline]
pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
let entry = inner.unregister(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Expand All @@ -219,7 +219,7 @@ impl SyncWaker {
#[inline]
pub(crate) fn notify(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
if !self.is_empty.load(Ordering::SeqCst) {
inner.try_select();
inner.notify();
Expand All @@ -234,7 +234,7 @@ impl SyncWaker {
/// Registers an operation waiting to be ready.
#[inline]
pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
inner.watch(oper, cx);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Expand All @@ -245,7 +245,7 @@ impl SyncWaker {
/// Unregisters an operation waiting to be ready.
#[inline]
pub(crate) fn unwatch(&self, oper: Operation) {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
inner.unwatch(oper);
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Expand All @@ -256,7 +256,7 @@ impl SyncWaker {
/// Notifies all threads that the channel is disconnected.
#[inline]
pub(crate) fn disconnect(&self) {
let mut inner = self.inner.lock();
let mut inner = self.inner.lock().unwrap();
inner.disconnect();
self.is_empty.store(
inner.selectors.is_empty() && inner.observers.is_empty(),
Expand Down

0 comments on commit 78b7ac3

Please sign in to comment.