Skip to content

Commit

Permalink
tracing: instrument task wakers
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jun 3, 2021
1 parent 9d8b37d commit 1d40aee
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 28 deletions.
11 changes: 11 additions & 0 deletions tokio/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ cfg_sync! {
mod block_on;
pub(crate) use block_on::block_on;
}

cfg_trace! {
mod trace;
pub(crate) use trace::InstrumentedFuture as Future;
}

cfg_not_trace! {
cfg_rt! {
pub(crate) use std::future::Future;
}
}
11 changes: 11 additions & 0 deletions tokio/src/future/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::future::Future;

pub(crate) trait InstrumentedFuture: Future {
fn id(&self) -> Option<tracing::Id>;
}

impl<F: Future> InstrumentedFuture for tracing::instrument::Instrumented<F> {
fn id(&self) -> Option<tracing::Id> {
self.span().id()
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
Expand Down
13 changes: 0 additions & 13 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
Expand Down Expand Up @@ -86,18 +85,6 @@ where
rt.spawn_blocking(func)
}

#[allow(dead_code)]
pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let rt = context::current().expect(CONTEXT_MISSING_ERROR);

let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
}

// ===== impl BlockingPool =====

impl BlockingPool {
Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,11 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);

#[cfg(all(tokio_unstable, feature = "tracing"))]
let func = {
let fut = {
use tracing::Instrument;
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
#[cfg(tokio_track_caller)]
Expand All @@ -193,12 +196,9 @@ impl Handle {
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
func()
}
fut.instrument(span)
};
let (task, handle) = task::joinable(BlockingTask::new(func));
let (task, handle) = task::joinable(fut);
let _ = self.blocking_spawner.spawn(task, &self);
handle
}
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
cfg_rt! {
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;

use std::future::Future;
}

cfg_rt_multi_thread! {
Expand Down
9 changes: 8 additions & 1 deletion tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
//! Make sure to consult the relevant safety section of each function before
//! use.
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
use crate::runtime::task::{Notified, Schedule, Task};
use crate::util::linked_list;

use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -80,6 +80,9 @@ unsafe impl Sync for Header {}
pub(super) struct Trailer {
/// Consumer task waiting on completion of this task.
pub(super) waker: UnsafeCell<Option<Waker>>,
/// The tracing ID for this instrumented task.
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(super) id: Option<tracing::Id>,
}

/// Either the future or the output.
Expand All @@ -93,6 +96,8 @@ impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let id = future.id();
Box::new(Cell {
header: Header {
state,
Expand All @@ -111,6 +116,8 @@ impl<T: Future, S: Schedule> Cell<T, S> {
},
trailer: Trailer {
waker: UnsafeCell::new(None),
#[cfg(all(tokio_unstable, feature = "tracing"))]
id,
},
})
}
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
use crate::runtime::task::state::Snapshot;
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, Schedule, Task};

use std::future::Future;
use std::mem;
use std::panic;
use std::ptr::NonNull;
Expand Down Expand Up @@ -146,6 +146,11 @@ where
}
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(super) fn id(&self) -> Option<&tracing::Id> {
self.trailer().id.as_ref()
}

/// Forcibly shutdown the task
///
/// Attempt to transition to `Running` in order to forcibly shutdown the
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ cfg_rt_multi_thread! {
pub(crate) use self::stack::TransferStack;
}

use crate::future::Future;
use crate::util::linked_list;

use std::future::Future;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::future::Future;
use crate::runtime::task::{Cell, Harness, Header, Schedule, State};

use std::future::Future;
use std::ptr::NonNull;
use std::task::{Poll, Waker};

Expand Down
31 changes: 30 additions & 1 deletion tokio/src/runtime/task/waker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::future::Future;
use crate::runtime::task::harness::Harness;
use crate::runtime::task::{Header, Schedule};

use std::future::Future;
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::ops;
Expand Down Expand Up @@ -44,12 +44,38 @@ impl<S> ops::Deref for WakerRef<'_, S> {
}
}

cfg_trace! {
macro_rules! trace {
($harness:expr, $op:expr) => {
if let Some(id) = $harness.id() {
tracing::trace!(
target: "tokio::task",
op = %$op,
task.id = id.into_u64(),
);
}
}
}
}

cfg_not_trace! {
macro_rules! trace {
($harness:expr, $op:expr) => {
// noop
let _ = &$harness;
}
}
}

unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
where
T: Future,
S: Schedule,
{
let header = ptr as *const Header;
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.clone");
(*header).state.ref_inc();
raw_waker::<T, S>(header)
}
Expand All @@ -61,6 +87,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.drop");
harness.drop_reference();
}

Expand All @@ -71,6 +98,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.wake");
harness.wake_by_val();
}

Expand All @@ -82,6 +110,7 @@ where
{
let ptr = NonNull::new_unchecked(ptr as *mut Header);
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.wake_by_ref");
harness.wake_by_ref();
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
Expand Down

0 comments on commit 1d40aee

Please sign in to comment.