From 548cedd870fb9f66394fb2af0877f9564db2615c Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 6 May 2020 23:36:10 +0200 Subject: [PATCH] src: split out callback queue implementation from Environment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This isn’t conceptually tied to anything Node.js-specific at all. PR-URL: https://github.com/nodejs/node/pull/33272 Reviewed-By: James M Snell Reviewed-By: Colin Ihrig --- node.gyp | 2 + src/callback_queue-inl.h | 97 ++++++++++++++++++++++++++++++++++++++++ src/callback_queue.h | 70 +++++++++++++++++++++++++++++ src/env-inl.h | 80 +++------------------------------ src/env.cc | 5 +-- src/env.h | 45 +------------------ 6 files changed, 179 insertions(+), 120 deletions(-) create mode 100644 src/callback_queue-inl.h create mode 100644 src/callback_queue.h diff --git a/node.gyp b/node.gyp index 3dadad15c9e193..961164ac320f07 100644 --- a/node.gyp +++ b/node.gyp @@ -642,6 +642,8 @@ 'src/base_object.h', 'src/base_object-inl.h', 'src/base64.h', + 'src/callback_queue.h', + 'src/callback_queue-inl.h', 'src/connect_wrap.h', 'src/connection_wrap.h', 'src/debug_utils.h', diff --git a/src/callback_queue-inl.h b/src/callback_queue-inl.h new file mode 100644 index 00000000000000..e83c81cd0dd802 --- /dev/null +++ b/src/callback_queue-inl.h @@ -0,0 +1,97 @@ +#ifndef SRC_CALLBACK_QUEUE_INL_H_ +#define SRC_CALLBACK_QUEUE_INL_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "callback_queue.h" + +namespace node { + +template +template +std::unique_ptr::Callback> +CallbackQueue::CreateCallback(Fn&& fn, bool refed) { + return std::make_unique>(std::move(fn), refed); +} + +template +std::unique_ptr::Callback> +CallbackQueue::Shift() { + std::unique_ptr ret = std::move(head_); + if (ret) { + head_ = ret->get_next(); + if (!head_) + tail_ = nullptr; // The queue is now empty. + } + size_--; + return ret; +} + +template +void CallbackQueue::Push(std::unique_ptr cb) { + Callback* prev_tail = tail_; + + size_++; + tail_ = cb.get(); + if (prev_tail != nullptr) + prev_tail->set_next(std::move(cb)); + else + head_ = std::move(cb); +} + +template +void CallbackQueue::ConcatMove(CallbackQueue&& other) { + size_ += other.size_; + if (tail_ != nullptr) + tail_->set_next(std::move(other.head_)); + else + head_ = std::move(other.head_); + tail_ = other.tail_; + other.tail_ = nullptr; + other.size_ = 0; +} + +template +size_t CallbackQueue::size() const { + return size_.load(); +} + +template +CallbackQueue::Callback::Callback(bool refed) + : refed_(refed) {} + +template +bool CallbackQueue::Callback::is_refed() const { + return refed_; +} + +template +std::unique_ptr::Callback> +CallbackQueue::Callback::get_next() { + return std::move(next_); +} + +template +void CallbackQueue::Callback::set_next( + std::unique_ptr next) { + next_ = std::move(next); +} + +template +template +CallbackQueue::CallbackImpl::CallbackImpl( + Fn&& callback, bool refed) + : Callback(refed), + callback_(std::move(callback)) {} + +template +template +R CallbackQueue::CallbackImpl::Call(Args... args) { + return callback_(std::forward(args)...); +} + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_INL_H_ diff --git a/src/callback_queue.h b/src/callback_queue.h new file mode 100644 index 00000000000000..ebf975e6391d13 --- /dev/null +++ b/src/callback_queue.h @@ -0,0 +1,70 @@ +#ifndef SRC_CALLBACK_QUEUE_H_ +#define SRC_CALLBACK_QUEUE_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include + +namespace node { + +// A queue of C++ functions that take Args... as arguments and return R +// (this is similar to the signature of std::function). +// New entries are added using `CreateCallback()`/`Push()`, and removed using +// `Shift()`. +// The `refed` flag is left for easier use in situations in which some of these +// should be run even if nothing else is keeping the event loop alive. +template +class CallbackQueue { + public: + class Callback { + public: + explicit inline Callback(bool refed); + + virtual ~Callback() = default; + virtual R Call(Args... args) = 0; + + inline bool is_refed() const; + + private: + inline std::unique_ptr get_next(); + inline void set_next(std::unique_ptr next); + + bool refed_; + std::unique_ptr next_; + + friend class CallbackQueue; + }; + + template + inline std::unique_ptr CreateCallback(Fn&& fn, bool refed); + + inline std::unique_ptr Shift(); + inline void Push(std::unique_ptr cb); + // ConcatMove adds elements from 'other' to the end of this list, and clears + // 'other' afterwards. + inline void ConcatMove(CallbackQueue&& other); + + // size() is atomic and may be called from any thread. + inline size_t size() const; + + private: + template + class CallbackImpl final : public Callback { + public: + CallbackImpl(Fn&& callback, bool refed); + R Call(Args... args) override; + + private: + Fn callback_; + }; + + std::atomic size_ {0}; + std::unique_ptr head_; + Callback* tail_ = nullptr; +}; + +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#endif // SRC_CALLBACK_QUEUE_H_ diff --git a/src/env-inl.h b/src/env-inl.h index 237a46ca84e33c..5e8f3b5b02db54 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -25,6 +25,7 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #include "aliased_buffer.h" +#include "callback_queue-inl.h" #include "env.h" #include "node.h" #include "util-inl.h" @@ -705,50 +706,9 @@ inline void IsolateData::set_options( options_ = std::move(options); } -std::unique_ptr -Environment::NativeImmediateQueue::Shift() { - std::unique_ptr ret = std::move(head_); - if (ret) { - head_ = ret->get_next(); - if (!head_) - tail_ = nullptr; // The queue is now empty. - } - size_--; - return ret; -} - -void Environment::NativeImmediateQueue::Push( - std::unique_ptr cb) { - NativeImmediateCallback* prev_tail = tail_; - - size_++; - tail_ = cb.get(); - if (prev_tail != nullptr) - prev_tail->set_next(std::move(cb)); - else - head_ = std::move(cb); -} - -void Environment::NativeImmediateQueue::ConcatMove( - NativeImmediateQueue&& other) { - size_ += other.size_; - if (tail_ != nullptr) - tail_->set_next(std::move(other.head_)); - else - head_ = std::move(other.head_); - tail_ = other.tail_; - other.tail_ = nullptr; - other.size_ = 0; -} - -size_t Environment::NativeImmediateQueue::size() const { - return size_.load(); -} - template void Environment::CreateImmediate(Fn&& cb, bool ref) { - auto callback = std::make_unique>( - std::move(cb), ref); + auto callback = native_immediates_.CreateCallback(std::move(cb), ref); native_immediates_.Push(std::move(callback)); } @@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) { template void Environment::SetImmediateThreadsafe(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_threadsafe_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_threadsafe_.Push(std::move(callback)); @@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) { template void Environment::RequestInterrupt(Fn&& cb) { - auto callback = std::make_unique>( - std::move(cb), false); + auto callback = + native_immediates_interrupts_.CreateCallback(std::move(cb), false); { Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); native_immediates_interrupts_.Push(std::move(callback)); @@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) { RequestInterruptFromV8(); } -Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) - : refed_(refed) {} - -bool Environment::NativeImmediateCallback::is_refed() const { - return refed_; -} - -std::unique_ptr -Environment::NativeImmediateCallback::get_next() { - return std::move(next_); -} - -void Environment::NativeImmediateCallback::set_next( - std::unique_ptr next) { - next_ = std::move(next); -} - -template -Environment::NativeImmediateCallbackImpl::NativeImmediateCallbackImpl( - Fn&& callback, bool refed) - : NativeImmediateCallback(refed), - callback_(std::move(callback)) {} - -template -void Environment::NativeImmediateCallbackImpl::Call(Environment* env) { - callback_(env); -} - inline bool Environment::can_call_into_js() const { return can_call_into_js_ && !is_stopping(); } diff --git a/src/env.cc b/src/env.cc index 3efa5c3b9c98ab..06e6fe6f793536 100644 --- a/src/env.cc +++ b/src/env.cc @@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() { } DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = queue.Shift()) + while (auto head = queue.Shift()) head->Call(this); } } @@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { auto drain_list = [&]() { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); - while (std::unique_ptr head = - native_immediates_.Shift()) { + while (auto head = native_immediates_.Shift()) { if (head->is_refed()) ref_count++; diff --git a/src/env.h b/src/env.h index 8047ac0fce5a2c..1db3ea0400e73e 100644 --- a/src/env.h +++ b/src/env.h @@ -29,6 +29,7 @@ #include "inspector_agent.h" #include "inspector_profiler.h" #endif +#include "callback_queue.h" #include "debug_utils.h" #include "handle_wrap.h" #include "node.h" @@ -1365,49 +1366,7 @@ class Environment : public MemoryRetainer { std::list at_exit_functions_; - class NativeImmediateCallback { - public: - explicit inline NativeImmediateCallback(bool refed); - - virtual ~NativeImmediateCallback() = default; - virtual void Call(Environment* env) = 0; - - inline bool is_refed() const; - inline std::unique_ptr get_next(); - inline void set_next(std::unique_ptr next); - - private: - bool refed_; - std::unique_ptr next_; - }; - - template - class NativeImmediateCallbackImpl final : public NativeImmediateCallback { - public: - NativeImmediateCallbackImpl(Fn&& callback, bool refed); - void Call(Environment* env) override; - - private: - Fn callback_; - }; - - class NativeImmediateQueue { - public: - inline std::unique_ptr Shift(); - inline void Push(std::unique_ptr cb); - // ConcatMove adds elements from 'other' to the end of this list, and clears - // 'other' afterwards. - inline void ConcatMove(NativeImmediateQueue&& other); - - // size() is atomic and may be called from any thread. - inline size_t size() const; - - private: - std::atomic size_ {0}; - std::unique_ptr head_; - NativeImmediateCallback* tail_ = nullptr; - }; - + typedef CallbackQueue NativeImmediateQueue; NativeImmediateQueue native_immediates_; Mutex native_immediates_threadsafe_mutex_; NativeImmediateQueue native_immediates_threadsafe_;