Skip to content

Commit

Permalink
Merge pull request #75 from smol-rs/notgull/efficient-rng
Browse files Browse the repository at this point in the history
Add a way to provide your own seed to racey futures
  • Loading branch information
notgull authored Sep 3, 2023
2 parents 7896fcc + b85ecf5 commit 2234456
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.48']
rust: ['1.61']
steps:
- uses: actions/checkout@v3
- name: Install Rust
Expand Down
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = [
"Contributors to futures-rs",
]
edition = "2018"
rust-version = "1.48"
rust-version = "1.61"
description = "Futures, streams, and async I/O combinators"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/futures-lite"
Expand All @@ -20,12 +20,13 @@ categories = ["asynchronous", "concurrency"]
exclude = ["/.*"]

[features]
default = ["std"]
std = ["alloc", "fastrand", "futures-io", "parking", "memchr", "waker-fn"]
default = ["race", "std"]
std = ["alloc", "fastrand/std", "futures-io", "parking", "memchr", "waker-fn"]
alloc = []
race = ["fastrand"]

[dependencies]
fastrand = { version = "2.0.0", optional = true }
fastrand = { version = "2.0.0", optional = true, default-features = false }
futures-core = { version = "0.3.5", default-features = false }
futures-io = { version = "0.3.5", optional = true }
memchr = { version = "2.3.3", optional = true }
Expand Down
58 changes: 52 additions & 6 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use std::{
panic::{catch_unwind, AssertUnwindSafe, UnwindSafe},
};

#[cfg(feature = "race")]
use fastrand::Rng;

#[cfg(feature = "alloc")]
use alloc::boxed::Box;
use core::task::{Context, Poll};
Expand Down Expand Up @@ -474,16 +477,57 @@ where
/// let res = future::race(ready(1), ready(2)).await;
/// # })
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "race", feature = "std"))]
pub fn race<T, F1, F2>(future1: F1, future2: F2) -> Race<F1, F2>
where
F1: Future<Output = T>,
F2: Future<Output = T>,
{
Race { future1, future2 }
Race {
future1,
future2,
rng: Rng::new(),
}
}

#[cfg(feature = "std")]
/// Race two futures but with a predefined random seed.
///
/// This function is identical to [`race`], but instead of using a random seed from a thread-local
/// RNG, it allows the user to provide a seed. It is useful for when you already have a source of
/// randomness available, or if you want to use a fixed seed.
///
/// See documentation of the [`race`] function for features and caveats.
///
/// # Examples
///
/// ```
/// use futures_lite::future::{self, pending, ready};
///
/// // A fixed seed is used, so the result is deterministic.
/// const SEED: u64 = 0x42;
///
/// # spin_on::spin_on(async {
/// assert_eq!(future::race_with_seed(ready(1), pending(), SEED).await, 1);
/// assert_eq!(future::race_with_seed(pending(), ready(2), SEED).await, 2);
///
/// // One of the two futures is randomly chosen as the winner.
/// let res = future::race_with_seed(ready(1), ready(2), SEED).await;
/// # })
/// ```
#[cfg(feature = "race")]
pub fn race_with_seed<T, F1, F2>(future1: F1, future2: F2, seed: u64) -> Race<F1, F2>
where
F1: Future<Output = T>,
F2: Future<Output = T>,
{
Race {
future1,
future2,
rng: Rng::with_seed(seed),
}
}

#[cfg(feature = "race")]
pin_project! {
/// Future for the [`race()`] function and the [`FutureExt::race()`] method.
#[derive(Debug)]
Expand All @@ -493,10 +537,11 @@ pin_project! {
future1: F1,
#[pin]
future2: F2,
rng: Rng,
}
}

#[cfg(feature = "std")]
#[cfg(feature = "race")]
impl<T, F1, F2> Future for Race<F1, F2>
where
F1: Future<Output = T>,
Expand All @@ -507,7 +552,7 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if fastrand::bool() {
if this.rng.bool() {
if let Poll::Ready(t) = this.future1.poll(cx) {
return Poll::Ready(t);
}
Expand Down Expand Up @@ -635,7 +680,7 @@ pub trait FutureExt: Future {
/// let res = ready(1).race(ready(2)).await;
/// # })
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", feature = "race"))]
fn race<F>(self, other: F) -> Race<Self, F>
where
Self: Sized,
Expand All @@ -644,6 +689,7 @@ pub trait FutureExt: Future {
Race {
future1: self,
future2: other,
rng: Rng::new(),
}
}

Expand Down
53 changes: 47 additions & 6 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};

#[cfg(feature = "race")]
use fastrand::Rng;

use pin_project_lite::pin_project;

use crate::ready;
Expand Down Expand Up @@ -1747,7 +1750,7 @@ pub trait StreamExt: Stream {
/// let res = once(1).race(once(2)).next().await;
/// # })
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", feature = "race"))]
fn race<S>(self, other: S) -> Race<Self, S>
where
Self: Sized,
Expand All @@ -1756,6 +1759,7 @@ pub trait StreamExt: Stream {
Race {
stream1: self,
stream2: other,
rng: Rng::new(),
}
}

Expand Down Expand Up @@ -2371,16 +2375,52 @@ where
/// // One of the two stream is randomly chosen as the winner.
/// let res = stream::race(once(1), once(2)).next().await;
/// # })
#[cfg(feature = "std")]
/// ```
#[cfg(all(feature = "std", feature = "race"))]
pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
Race { stream1, stream2 }
Race {
stream1,
stream2,
rng: Rng::new(),
}
}

#[cfg(feature = "std")]
/// Races two streams, but with a user-provided seed for randomness.
///
/// # Examples
///
/// ```
/// use futures_lite::stream::{self, once, pending, StreamExt};
///
/// // A fixed seed is used for reproducibility.
/// const SEED: u64 = 123;
///
/// # spin_on::spin_on(async {
/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
///
/// // One of the two stream is randomly chosen as the winner.
/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
/// # })
/// ```
#[cfg(feature = "race")]
pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
where
S1: Stream<Item = T>,
S2: Stream<Item = T>,
{
Race {
stream1,
stream2,
rng: Rng::with_seed(seed),
}
}

#[cfg(feature = "race")]
pin_project! {
/// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
#[derive(Clone, Debug)]
Expand All @@ -2390,10 +2430,11 @@ pin_project! {
stream1: S1,
#[pin]
stream2: S2,
rng: Rng,
}
}

#[cfg(feature = "std")]
#[cfg(feature = "race")]
impl<T, S1, S2> Stream for Race<S1, S2>
where
S1: Stream<Item = T>,
Expand All @@ -2404,7 +2445,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if fastrand::bool() {
if this.rng.bool() {
if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
return Poll::Ready(Some(t));
}
Expand Down

0 comments on commit 2234456

Please sign in to comment.