-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix thread panic when "unreachable" SpawnedTask code is reachable. #12086
Conversation
af8aaf8
to
25f1d46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code change itself is good, but the test needs some work.
// capture the panic message | ||
let panic_msg = Arc::new(Mutex::new(None)); | ||
let captured_panic_msg = Arc::clone(&panic_msg); | ||
std::panic::set_hook(Box::new(move |e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this hook? IIRC it is a global process state (similar to env variables) and is shared with all the other tests that run in the same process (so all unit tests in common-runtime
). This will be a pain to debug for others.
See my comment below on an alternative.
let join = rt.spawn(async { | ||
let task = SpawnedTask::spawn(async { | ||
let fut: Pending<()> = pending(); | ||
fut.await; | ||
unreachable!("should never return"); | ||
}); | ||
let _ = task.join_unwind().await; | ||
}); | ||
|
||
// caller shutdown their DF runtime (e.g. timeout, error in caller, etc) | ||
rt.shutdown_background(); | ||
|
||
// race condition | ||
// poll occurs during shutdown (buffered stream poll calls, etc) | ||
let _ = join.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let join = rt.spawn(async { | |
let task = SpawnedTask::spawn(async { | |
let fut: Pending<()> = pending(); | |
fut.await; | |
unreachable!("should never return"); | |
}); | |
let _ = task.join_unwind().await; | |
}); | |
// caller shutdown their DF runtime (e.g. timeout, error in caller, etc) | |
rt.shutdown_background(); | |
// race condition | |
// poll occurs during shutdown (buffered stream poll calls, etc) | |
let _ = join.await; | |
let task = rt.spawn(async { | |
SpawnedTask::spawn(async { | |
let fut: Pending<()> = pending(); | |
fut.await; | |
unreachable!("should never return"); | |
}) | |
}).await; | |
// caller shutdown their DF runtime (e.g. timeout, error in caller, etc) | |
rt.shutdown_background(); | |
// race condition | |
// poll occurs during shutdown (buffered stream poll calls, etc) | |
let _ = task.join_unwind().await; |
I think the current setup is a bit flaky:
- it depends on your panic hook, see my concerns above
- it depends on the fact that your sub-runtime makes progress in
spawn
. this is also why you need to loop a bunch of times
I think this change also allows you to remove the loop and the panic hook.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test change made. Confirmed that it does properly find the panic (without the hook, without looping) if we have not made the code changes. Thank you. ❤️
if e.is_panic() { | ||
std::panic::resume_unwind(e.into_panic()); | ||
} else if e.is_cancelled() { | ||
log::warn!("SpawnedTask was polled during shutdown"); | ||
e | ||
} else { | ||
// Cancellation may be caused by two reasons: | ||
// 1. Abort is called, but since we consumed `self`, it's not our case (`JoinHandle` not accessible outside). | ||
// 2. The runtime is shutting down. | ||
// So we consider this branch as unreachable. | ||
unreachable!("SpawnedTask was cancelled unexpectedly"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Join error is either is_panic
or is_cancelled
. However it's not exhaustive checks - without pattern matching (may become reachable at the end of the day if something changes).
Logic is in general: resume_unwind
if panic or else return an error
I'd suggest to simplify this and keep the comment
// `JoinError` can be caused either by panic or cancellation. We have to handle panics:
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
// Cancellation may be caused by two reasons:
// 1. Abort is called, but since we consumed `self`, it's not our case (`JoinHandle` not accessible outside).
// 2. The runtime is shutting down.
log::warn!("SpawnedTask was polled during shutdown");
e
}
…age tests) panic hook manipulation
… update test for cancellation error
use std::future::{pending, Pending}; | ||
|
||
use tokio::runtime::Runtime; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This idea came a bit late, so this is optional:
Could you add a test that shows that the panic is propagated, e.g.:
#[tokio::test]
#[should_panic(expected = "foo")]
async fn panic_resume() {
// this should panic w/o an `unwrap`
SpawnedTask::spawn(async {panic!("foo")}).join_unwind().await.ok();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea -- I'll make a follow on PR to do so (as it adds additional test coverage for an existing feature rather than something that was changed in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @wiedld and @crepererum and @DDtKey
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
…pache#12086) * test: demonstrate that the unreachable in SpawnedTask is reachable * chore: use workspace tokio and add feature * fix(12089): SpawnedTask will no longer panic during shutdown * chore(12089): add new error type for JoinError * refactor(12089): handle join error when using SpawnedTask::join_unwind * Revert "chore: use workspace tokio and add feature" This reverts commit 3010288. * refactor(12089): update test to avoid the looping and global (to package tests) panic hook manipulation * refactor(12089): make single conditional for unwind vs no-unwind, and update test for cancellation error
Which issue does this PR close?
Closes #12089
We are occasionally seeing a thread panic
"entered unreachable code: SpawnedTask was cancelled unexpectedly"
in our server logs. This is because the "unreachable" is actually reachable.Rationale for this change
The existing unreachable assumes that no polling will occur after the runtime begins a shutdown. However, we found that while running datafusion in it's own runtime (own threadpool) we can actually hit this unreachable code -- when we start shutting down the executor and an internal poll still occurs.
We think that how we are executing our datafusion queries is not uncommon, and therefore the shutdown behavior should not be causing a thread panic in datafusion.
What changes are included in this PR?
SpawnedTask::join_unwind
fallibleDataFusionError::ExecutionJoin(JoinError)
Are these changes tested?
Yes.
Are there any user-facing changes?
Not substantially.
There will be a new enum option for DataFusionError, and any external caller of
SpawnedTask::join_unwind
will need to handle the error.