Skip to content

Commit

Permalink
Bug
Browse files Browse the repository at this point in the history
In the thread code, `worker.try_tick` was only being called once. This
meant that only a tick's worth of work was being done concurrently on
a thread before the thread was done. This would leave any remaining work
to be done by the awaiter.

I put this in a loop, and now the worker will run as long as three is
a task to churn on.
  • Loading branch information
uberFoo committed Feb 11, 2024
1 parent b7e9e29 commit a1cc9bd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 17 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- update workflow for manual initiation

### Changed

- Minor logging tweaks.

### Fixed

- There was a bug in the threading code where `worker.try_tick()` was only being called once, instead of in a loop. This resulted in spawned tasks only executing a single tick concurrently, before the `await` code ran it to completion.

## [0.0.3] - 2023-11-28

### Added

- code coverage
- additional tests
- new category to Cargo.toml

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "puteketeke"
version = "0.0.3"
authors = ["Keith T. Star <[email protected]>"]
categories = ["asynchronous", "concurrency"]
categories = ["asynchronous", "concurrency", "compilers"]
description = "An asynchronous runtime built on smol."
include = ["src/**/*.rs", "README.md", "LICENSE-APACHE", "LICENSE-MIT"]
keywords = ["executor", "async", "non-blocking", "futures"]
Expand Down
51 changes: 35 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,17 +660,35 @@ impl<'a> UberExecutor<'a> {
self.pool.execute(move || {
let _enter = span.enter();
while let Ok(worker) = receiver.recv() {
tracing::trace!("Executor::run: worker found");
tracing::trace!("Executor::run: worker: {worker:?}");
match worker.try_tick() {
true => {
tracing::trace!("Executor::run: worker ticked");
}
false => {
tracing::trace!("Executor::run: tick failed");
tracing::trace!(
target = "async",
"Executor::run: worker found: {}",
worker.id
);
loop {
match worker.try_tick() {
true => {
tracing::trace!(
target = "async",
"Executor::run: worker ticked: {}",
worker.id
);
}
false => {
tracing::trace!(
target = "async",
"Executor::run: tick failed: {} ",
worker.id
);
break;
}
}
}
tracing::debug!("Executor::run: worker finished");
tracing::debug!(
target = "async",
"Executor::run: worker finished: {}",
worker.id
);
}
});
}
Expand Down Expand Up @@ -774,14 +792,15 @@ impl<'a, T> AsyncTask<'a, T> {
// is how many simultaneous tasks are we likely to have, because that's the size
// of the upper word.
let id = unsafe { TASK_COUNT.fetch_add(1, Ordering::SeqCst) };
tracing::trace!("AsyncTask::new: {id}");
tracing::trace!(target: "async", "AsyncTask::new: {id}");

// spawn a task that spawns a task 🌈
let inner = worker.clone();
let future = async move {
tracing::trace!("AsyncTask::future: spawn inner task: {id}");

inner.spawn(future).await
tracing::trace!(target: "async", "AsyncTask spawning task: {id}");
let result = inner.spawn(future).await;
tracing::trace!(target: "async", "AsyncTask finished task: {id}");
result
};

Self {
Expand Down Expand Up @@ -818,15 +837,15 @@ where

#[tracing::instrument(level = "trace", target = "async")]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
tracing::trace!("AsyncTask::poll {:?}", self);
tracing::trace!(target = "async", "AsyncTask::poll {:?}", self);
let this = std::pin::Pin::into_inner(self);

if this.started.load(Ordering::SeqCst) {
tracing::trace!("AsyncTask::poll: ready: {}", this.id,);
tracing::trace!(target = "async", "AsyncTask::poll: ready: {}", this.id,);
let task = this.inner.take().unwrap();
Poll::Ready(future::block_on(this.worker.resolve_task(task)))
} else {
tracing::trace!("AsyncTask::poll: pending: {}", this.id,);
tracing::trace!(target = "async", "AsyncTask::poll: pending: {}", this.id,);
this.waker = Some(cx.waker().clone());
Poll::Pending
}
Expand Down

0 comments on commit a1cc9bd

Please sign in to comment.