Skip to content

Commit

Permalink
Remove TickedAsyncExecutor::spawn API (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
coder137 authored Aug 24, 2024
1 parent 45c76da commit 51d01f7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 40 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
files: target/cobertura.xml

- name: Miri
run: |
rustup toolchain install nightly --component miri
rustup override set nightly
cargo miri setup
cargo miri test
# - name: Miri
# run: |
# rustup toolchain install nightly --component miri
# rustup override set nightly
# cargo miri setup
# cargo miri test
41 changes: 11 additions & 30 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Payload = (TaskIdentifier, async_task::Runnable);
pub struct TickedAsyncExecutor<O> {
channel: (mpsc::Sender<Payload>, mpsc::Receiver<Payload>),
num_woken_tasks: Arc<AtomicUsize>,

num_spawned_tasks: Arc<AtomicUsize>,

// TODO, Or we need a Single Producer - Multi Consumer channel i.e Broadcast channel
Expand Down Expand Up @@ -52,22 +53,6 @@ where
}
}

pub fn spawn<T>(
&self,
identifier: impl Into<TaskIdentifier>,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T>
where
T: Send + 'static,
{
let identifier = identifier.into();
let future = self.droppable_future(identifier.clone(), future);
let schedule = self.runnable_schedule_cb(identifier);
let (runnable, task) = async_task::spawn(future, schedule);
runnable.schedule();
task
}

pub fn spawn_local<T>(
&self,
identifier: impl Into<TaskIdentifier>,
Expand Down Expand Up @@ -172,7 +157,7 @@ mod tests {
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::default();
executor
.spawn("A", async move {
.spawn_local("A", async move {
tokio::task::yield_now().await;
})
.detach();
Expand Down Expand Up @@ -226,15 +211,6 @@ mod tests {
fn test_ticked_timer() {
let executor = TickedAsyncExecutor::default();

for _ in 0..10 {
let timer: TickedTimer = executor.create_timer();
executor
.spawn("ThreadedTimer", async move {
timer.sleep_for(256.0).await;
})
.detach();
}

for _ in 0..10 {
let timer = executor.create_timer();
executor
Expand All @@ -255,25 +231,30 @@ mod tests {
let elapsed = now.elapsed();
println!("Elapsed: {:?}", elapsed);
println!("Total: {:?}", instances);
println!(
"Min: {:?}, Max: {:?}",
instances.iter().min(),
instances.iter().max()
);

// Test Timer cancellation
let timer = executor.create_timer();
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let timer = executor.create_timer();
executor
.spawn_local("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
timer.sleep_for(1000.0).await;
})
.detach();

let mut tick_event = executor.tick_channel();
executor
.spawn("ThreadedTickFuture", async move {
.spawn_local("LocalTickFuture1", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
Expand All @@ -285,7 +266,7 @@ mod tests {

let mut tick_event = executor.tick_channel();
executor
.spawn_local("LocalTickFuture", async move {
.spawn_local("LocalTickFuture2", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
Expand Down
8 changes: 4 additions & 4 deletions tests/tokio_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn test_tokio_join() {
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
let (tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
let (a, b) = tokio::join!(rx1.recv(), rx2.recv());
assert_eq!(a.unwrap(), 10);
assert_eq!(b.unwrap(), 20);
Expand All @@ -19,7 +19,7 @@ fn test_tokio_join() {
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
let (tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
let (a, b) = tokio::join!(rx3.recv(), rx4.recv());
assert_eq!(a.unwrap(), 10);
assert_eq!(b.unwrap(), 20);
Expand All @@ -46,7 +46,7 @@ fn test_tokio_select() {
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<usize>(1);
let (_tx2, mut rx2) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("ThreadedFuture", async move {
.spawn_local("LocalFuture1", async move {
tokio::select! {
data = rx1.recv() => {
assert_eq!(data.unwrap(), 10);
Expand All @@ -59,7 +59,7 @@ fn test_tokio_select() {
let (tx3, mut rx3) = tokio::sync::mpsc::channel::<usize>(1);
let (_tx4, mut rx4) = tokio::sync::mpsc::channel::<usize>(1);
executor
.spawn("LocalFuture", async move {
.spawn_local("LocalFuture2", async move {
tokio::select! {
data = rx3.recv() => {
assert_eq!(data.unwrap(), 10);
Expand Down

0 comments on commit 51d01f7

Please sign in to comment.