Skip to content

Commit

Permalink
metrics: fixed flaky worker_steal_count test (#6932)
Browse files Browse the repository at this point in the history
  • Loading branch information
jofas authored Oct 28, 2024
1 parent 070a825 commit 4468f27
Showing 1 changed file with 46 additions and 30 deletions.
76 changes: 46 additions & 30 deletions tokio/tests/rt_unstable_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
))]

use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;
Expand Down Expand Up @@ -295,42 +295,34 @@ fn worker_noop_count() {
}

#[test]
#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470
fn worker_steal_count() {
// This metric only applies to the multi-threaded runtime.
//
// We use a blocking channel to backup one worker thread.
use std::sync::mpsc::channel;

let rt = threaded_no_lifo();
let metrics = rt.metrics();

rt.block_on(async {
let (tx, rx) = channel();
for _ in 0..10 {
let rt = threaded_no_lifo();
let metrics = rt.metrics();

// Move to the runtime.
tokio::spawn(async move {
// Spawn the task that sends to the channel
//
// Since the lifo slot is disabled, this task is stealable.
tokio::spawn(async move {
tx.send(()).unwrap();
});
let successfully_spawned_stealable_task = rt.block_on(async {
// The call to `try_spawn_stealable_task` may time out, which means
// that the sending task couldn't be scheduled due to a deadlock in
// the runtime.
// This is expected behaviour, we just retry until we succeed or
// exhaust all tries, the latter causing this test to fail.
try_spawn_stealable_task().await.is_ok()
});

// Blocking receive on the channel.
rx.recv().unwrap();
})
.await
.unwrap();
});
drop(rt);

drop(rt);
if successfully_spawned_stealable_task {
let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_steal_count(i))
.sum();

let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_steal_count(i))
.sum();
assert_eq!(1, n);
return;
}
}

assert_eq!(1, n);
panic!("exhausted every try to schedule the stealable task");
}

#[test]
Expand Down Expand Up @@ -835,6 +827,30 @@ fn io_driver_ready_count() {
assert_eq!(metrics.io_driver_ready_count(), 1);
}

async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> {
// We use a blocking channel to synchronize the tasks.
let (tx, rx) = mpsc::channel();

// Make sure we are in the context of the runtime.
tokio::spawn(async move {
// Spawn the task that sends to the channel.
//
// Note that the runtime needs to have the lifo slot disabled to make
// this task stealable.
tokio::spawn(async move {
tx.send(()).unwrap();
});

// Blocking receive on the channel, timing out if the sending task
// wasn't scheduled in time.
rx.recv_timeout(Duration::from_secs(1))
})
.await
.unwrap()?;

Ok(())
}

fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down

0 comments on commit 4468f27

Please sign in to comment.