Skip to content

Commit

Permalink
Ignore poisoning of active
Browse files Browse the repository at this point in the history
Closes smol-rs#135.

This enables the executor to be used in presence of panics in user
callbacks, such as the iterator and `impl Extend` in `spawn_many`.

Mutex poisoning is more of a lint than a safety requirement, as
containers (such as `Slab`) and wakers have to be sound in presence of
panics anyway. In this particular case, the exact behavior of `active`
is not relied upon for soundness.
  • Loading branch information
purplesyringa committed Nov 30, 2024
1 parent 9335b7e commit ea9e6e4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
use std::task::{Poll, Waker};

use async_task::{Builder, Runnable};
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<'a> Executor<'a> {
/// assert!(ex.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.state().active.lock().unwrap().is_empty()
self.state().active().is_empty()
}

/// Spawns a task onto the executor.
Expand All @@ -160,7 +160,7 @@ impl<'a> Executor<'a> {
/// });
/// ```
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let mut active = self.state().active();

// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
Expand Down Expand Up @@ -211,7 +211,7 @@ impl<'a> Executor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active.lock().unwrap());
let mut active = Some(self.state().active());

// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
Expand All @@ -221,7 +221,7 @@ impl<'a> Executor<'a> {
// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
drop(active.take());
active = Some(self.state().active.lock().unwrap());
active = Some(self.state().active());
}

task
Expand All @@ -246,7 +246,7 @@ impl<'a> Executor<'a> {
let index = entry.key();
let state = self.state_as_arc();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
let _guard = CallOnDrop(move || drop(state.active().try_remove(index)));
future.await
};

Expand Down Expand Up @@ -415,7 +415,7 @@ impl Drop for Executor<'_> {
// via Arc::into_raw in state_ptr.
let state = unsafe { Arc::from_raw(ptr) };

let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
let mut active = state.active();
for w in active.drain() {
w.wake();
}
Expand Down Expand Up @@ -517,7 +517,7 @@ impl<'a> LocalExecutor<'a> {
/// });
/// ```
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active.lock().unwrap();
let mut active = self.inner().state().active();

// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
Expand Down Expand Up @@ -569,7 +569,7 @@ impl<'a> LocalExecutor<'a> {
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active.lock().unwrap();
let mut active = self.inner().state().active();

// Convert all of the futures to tasks.
let tasks = futures.into_iter().map(|future| {
Expand Down Expand Up @@ -694,6 +694,11 @@ impl State {
}
}

/// Returns a reference to currently active tasks.
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
self.active.lock().unwrap_or_else(|e| e.into_inner())
}

/// Notifies a sleeping ticker.
#[inline]
fn notify(&self) {
Expand Down Expand Up @@ -1099,7 +1104,7 @@ fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Re
match self.0.try_lock() {
Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
Err(TryLockError::Poisoned(err)) => fmt::Debug::fmt(&err.into_inner().len(), f),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ fn iterator_panics_mid_run() {
)
});
assert!(panic.is_err());

let task = ex.spawn(future::ready(0));
assert_eq!(future::block_on(ex.run(task)), 0);
}

struct CallOnDrop<F: Fn()>(F);
Expand Down

0 comments on commit ea9e6e4

Please sign in to comment.