Skip to content

Commit

Permalink
Tick with limit (#10)
Browse files Browse the repository at this point in the history
- Added limit: Option<usize> to TickedAsyncExecutor::tick
- Added example to project README
  • Loading branch information
coder137 authored Sep 12, 2024
1 parent 51d01f7 commit dbb56fd
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

Rust based Async Executor which executes woken tasks only when it is ticked

# Example

```rust
let executor = TickedAsyncExecutor::default();

executor.spawn_local("MyIdentifier", async move {}).detach();

// Make sure to tick your executor to run the tasks
executor.tick(DELTA, LIMIT);
```

# Limitation

- Does not work with the tokio runtime and async constructs that use the tokio runtime internally
58 changes: 48 additions & 10 deletions src/ticked_async_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,23 @@ where
/// `delta` is used for timing based operations
/// - `TickedTimer` uses this delta value to tick till completion
///
/// `maybe_limit` is used to limit the number of woken tasks run per tick
/// `limit` is used to limit the number of woken tasks run per tick
/// - None would imply that there is no limit (all woken tasks would run)
/// - Some(limit) would imply that [0..limit] woken tasks would run,
/// even if more tasks are woken.
///
/// Tick is !Sync i.e cannot be invoked from multiple threads
///
/// NOTE: Will not run tasks that are woken/scheduled immediately after `Runnable::run`
pub fn tick(&self, delta: f64) {
pub fn tick(&self, delta: f64, limit: Option<usize>) {
let _r = self.tick_event.send(delta);

// Clamp woken tasks to limit
let num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
let mut num_woken_tasks = self.num_woken_tasks.load(Ordering::Relaxed);
if let Some(limit) = limit {
// Woken tasks should not exceed the allowed limit
num_woken_tasks = num_woken_tasks.min(limit);
}

self.channel
.1
.try_iter()
Expand Down Expand Up @@ -153,6 +157,20 @@ mod tests {

const DELTA: f64 = 1000.0 / 60.0;

#[test]
fn test_one_task() {
const DELTA: f64 = 1.0 / 60.0;
const LIMIT: Option<usize> = None;

let executor = TickedAsyncExecutor::default();

executor.spawn_local("MyIdentifier", async move {}).detach();

// Make sure to tick your executor to run the tasks
executor.tick(DELTA, LIMIT);
assert_eq!(executor.num_tasks(), 0);
}

#[test]
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::default();
Expand All @@ -168,10 +186,10 @@ mod tests {
})
.detach();

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 2);

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 0);
}

Expand All @@ -190,7 +208,7 @@ mod tests {
}
});
assert_eq!(executor.num_tasks(), 2);
executor.tick(DELTA);
executor.tick(DELTA, None);

executor
.spawn_local("CancelTasks", async move {
Expand All @@ -203,7 +221,7 @@ mod tests {

// Since we have cancelled the tasks above, the loops should eventually end
while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}

Expand All @@ -224,7 +242,7 @@ mod tests {
let mut instances = vec![];
while executor.num_tasks() != 0 {
let current = Instant::now();
executor.tick(DELTA);
executor.tick(DELTA, None);
instances.push(current.elapsed());
std::thread::sleep(Duration::from_millis(16));
}
Expand Down Expand Up @@ -276,8 +294,28 @@ mod tests {
})
.detach();

executor.tick(DELTA);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 4);
drop(executor);
}

#[test]
fn test_limit() {
let executor = TickedAsyncExecutor::default();
for i in 0..10 {
executor
.spawn_local(format!("{i}"), async move {
println!("Finish {i}");
})
.detach();
}

for i in 0..10 {
let woken_tasks = executor.num_woken_tasks.load(Ordering::Relaxed);
assert_eq!(woken_tasks, 10 - i);
executor.tick(0.1, Some(1));
}

assert_eq!(executor.num_tasks(), 0);
}
}
8 changes: 4 additions & 4 deletions tests/tokio_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ fn test_tokio_join() {
tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
for _ in 0..10 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
tx2.try_send(20).unwrap();
tx4.try_send(20).unwrap();

while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}

Expand Down Expand Up @@ -70,12 +70,12 @@ fn test_tokio_select() {
.detach();

for _ in 0..10 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}

tx1.try_send(10).unwrap();
tx3.try_send(10).unwrap();
while executor.num_tasks() != 0 {
executor.tick(DELTA);
executor.tick(DELTA, None);
}
}

0 comments on commit dbb56fd

Please sign in to comment.