From 099f18e89bb309b6d60c4b891de137bb5b94a56c Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 9 May 2020 05:49:34 +0200 Subject: [PATCH] src: distinguish refed/unrefed threadsafe Immediates In some situations, it can be useful to use threadsafe callbacks on an `Environment` to perform cleanup operations that should run even when the process would otherwise be ending. PR-URL: https://github.com/nodejs/node/pull/33320 Reviewed-By: James M Snell --- src/env-inl.h | 4 ++-- src/env.cc | 29 +++++++++++++++++------------ src/env.h | 2 +- src/node_worker.cc | 2 +- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index 3d7bd79852c028..3971f479aa1e23 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -727,9 +727,9 @@ void Environment::SetUnrefImmediate(Fn&& cb) { } template -void Environment::SetImmediateThreadsafe(Fn&& cb) { +void Environment::SetImmediateThreadsafe(Fn&& cb, bool refed) { auto callback = - native_immediates_threadsafe_.CreateCallback(std::move(cb), false); + native_immediates_threadsafe_.CreateCallback(std::move(cb), refed); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_threadsafe_.Push(std::move(callback)); diff --git a/src/env.cc b/src/env.cc index b83bd62835128c..feb648a2b66aaf 100644 --- a/src/env.cc +++ b/src/env.cc @@ -741,19 +741,10 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { // exceptions, so we do not need to handle that. RunAndClearInterrupts(); - // It is safe to check .size() first, because there is a causal relationship - // between pushes to the threadsafe and this function being called. - // For the common case, it's worth checking the size first before establishing - // a mutex lock. - if (native_immediates_threadsafe_.size() > 0) { - Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); - native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); - } - - auto drain_list = [&]() { + auto drain_list = [&](NativeImmediateQueue* queue) { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); - while (auto head = native_immediates_.Shift()) { + while (auto head = queue->Shift()) { if (head->is_refed()) ref_count++; @@ -771,12 +762,26 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { } return false; }; - while (drain_list()) {} + while (drain_list(&native_immediates_)) {} immediate_info()->ref_count_dec(ref_count); if (immediate_info()->ref_count() == 0) ToggleImmediateRef(false); + + // It is safe to check .size() first, because there is a causal relationship + // between pushes to the threadsafe immediate list and this function being + // called. For the common case, it's worth checking the size first before + // establishing a mutex lock. + // This is intentionally placed after the `ref_count` handling, because when + // refed threadsafe immediates are created, they are not counted towards the + // count in immediate_info() either. + NativeImmediateQueue threadsafe_immediates; + if (native_immediates_threadsafe_.size() > 0) { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + threadsafe_immediates.ConcatMove(std::move(native_immediates_threadsafe_)); + } + while (drain_list(&threadsafe_immediates)) {} } void Environment::RequestInterruptFromV8() { diff --git a/src/env.h b/src/env.h index b1a2c97352c306..72ca9449236ae3 100644 --- a/src/env.h +++ b/src/env.h @@ -1167,7 +1167,7 @@ class Environment : public MemoryRetainer { inline void SetUnrefImmediate(Fn&& cb); template // This behaves like SetImmediate() but can be called from any thread. - inline void SetImmediateThreadsafe(Fn&& cb); + inline void SetImmediateThreadsafe(Fn&& cb, bool refed = true); // This behaves like V8's Isolate::RequestInterrupt(), but also accounts for // the event loop (i.e. combines the V8 function with SetImmediate()). // The passed callback may not throw exceptions. diff --git a/src/node_worker.cc b/src/node_worker.cc index ee48461136b782..8ba250577354d9 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -759,7 +759,7 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo& args) { env, std::move(snapshot)); Local args[] = { stream->object() }; taker->MakeCallback(env->ondone_string(), arraysize(args), args); - }); + }, /* refed */ false); }); args.GetReturnValue().Set(scheduled ? taker->object() : Local()); }