Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: split out callback queue implementation from Environment #33272

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@
'src/base_object.h',
'src/base_object-inl.h',
'src/base64.h',
'src/callback_queue.h',
'src/callback_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/debug_utils.h',
Expand Down
97 changes: 97 additions & 0 deletions src/callback_queue-inl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#ifndef SRC_CALLBACK_QUEUE_INL_H_
#define SRC_CALLBACK_QUEUE_INL_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "callback_queue.h"

namespace node {

template <typename R, typename... Args>
template <typename Fn>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Shift() {
std::unique_ptr<Callback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
Callback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

template <typename R, typename... Args>
size_t CallbackQueue<R, Args...>::size() const {
return size_.load();
}

template <typename R, typename... Args>
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
: refed_(refed) {}

template <typename R, typename... Args>
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
return refed_;
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Callback::get_next() {
return std::move(next_);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Callback::set_next(
std::unique_ptr<Callback> next) {
next_ = std::move(next);
}

template <typename R, typename... Args>
template <typename Fn>
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
Fn&& callback, bool refed)
: Callback(refed),
callback_(std::move(callback)) {}

template <typename R, typename... Args>
template <typename Fn>
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args... args) {
return callback_(std::forward<Args>(args)...);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_INL_H_
70 changes: 70 additions & 0 deletions src/callback_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#ifndef SRC_CALLBACK_QUEUE_H_
#define SRC_CALLBACK_QUEUE_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include <atomic>

namespace node {

// A queue of C++ functions that take Args... as arguments and return R
// (this is similar to the signature of std::function).
// New entries are added using `CreateCallback()`/`Push()`, and removed using
// `Shift()`.
// The `refed` flag is left for easier use in situations in which some of these
// should be run even if nothing else is keeping the event loop alive.
template <typename R, typename... Args>
addaleax marked this conversation as resolved.
Show resolved Hide resolved
class CallbackQueue {
public:
class Callback {
public:
explicit inline Callback(bool refed);

virtual ~Callback() = default;
virtual R Call(Args... args) = 0;

inline bool is_refed() const;

private:
inline std::unique_ptr<Callback> get_next();
inline void set_next(std::unique_ptr<Callback> next);

bool refed_;
std::unique_ptr<Callback> next_;

friend class CallbackQueue;
};

template <typename Fn>
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);

inline std::unique_ptr<Callback> Shift();
inline void Push(std::unique_ptr<Callback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(CallbackQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
template <typename Fn>
class CallbackImpl final : public Callback {
public:
CallbackImpl(Fn&& callback, bool refed);
R Call(Args... args) override;

private:
Fn callback_;
};

std::atomic<size_t> size_ {0};
std::unique_ptr<Callback> head_;
Callback* tail_ = nullptr;
};

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_H_
80 changes: 6 additions & 74 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "aliased_buffer.h"
#include "callback_queue-inl.h"
#include "env.h"
#include "node.h"
#include "util-inl.h"
Expand Down Expand Up @@ -705,50 +706,9 @@ inline void IsolateData::set_options(
options_ = std::move(options);
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateQueue::Shift() {
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

void Environment::NativeImmediateQueue::Push(
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
NativeImmediateCallback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

void Environment::NativeImmediateQueue::ConcatMove(
NativeImmediateQueue&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

size_t Environment::NativeImmediateQueue::size() const {
return size_.load();
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), ref);
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
native_immediates_.Push(std::move(callback));
}

Expand All @@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand All @@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
Expand All @@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

bool Environment::NativeImmediateCallback::is_refed() const {
return refed_;
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateCallback::get_next() {
return std::move(next_);
}

void Environment::NativeImmediateCallback::set_next(
std::unique_ptr<NativeImmediateCallback> next) {
next_ = std::move(next);
}

template <typename Fn>
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
Fn&& callback, bool refed)
: NativeImmediateCallback(refed),
callback_(std::move(callback)) {}

template <typename Fn>
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
callback_(env);
}

inline bool Environment::can_call_into_js() const {
return can_call_into_js_ && !is_stopping();
}
Expand Down
5 changes: 2 additions & 3 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() {
}
DebugSealHandleScope seal_handle_scope(isolate());

while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
while (auto head = queue.Shift())
head->Call(this);
}
}
Expand All @@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
auto drain_list = [&]() {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (std::unique_ptr<NativeImmediateCallback> head =
native_immediates_.Shift()) {
while (auto head = native_immediates_.Shift()) {
if (head->is_refed())
ref_count++;

Expand Down
45 changes: 2 additions & 43 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "inspector_agent.h"
#include "inspector_profiler.h"
#endif
#include "callback_queue.h"
#include "debug_utils.h"
#include "handle_wrap.h"
#include "node.h"
Expand Down Expand Up @@ -1368,49 +1369,7 @@ class Environment : public MemoryRetainer {

std::list<ExitCallback> at_exit_functions_;

class NativeImmediateCallback {
public:
explicit inline NativeImmediateCallback(bool refed);

virtual ~NativeImmediateCallback() = default;
virtual void Call(Environment* env) = 0;

inline bool is_refed() const;
inline std::unique_ptr<NativeImmediateCallback> get_next();
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);

private:
bool refed_;
std::unique_ptr<NativeImmediateCallback> next_;
};

template <typename Fn>
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
public:
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
void Call(Environment* env) override;

private:
Fn callback_;
};

class NativeImmediateQueue {
public:
inline std::unique_ptr<NativeImmediateCallback> Shift();
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(NativeImmediateQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
std::atomic<size_t> size_ {0};
std::unique_ptr<NativeImmediateCallback> head_;
NativeImmediateCallback* tail_ = nullptr;
};

typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
Expand Down