From 5bc6e493c0642dd65e458026b8145f266d7d5417 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 8 Feb 2019 19:23:09 +0100 Subject: [PATCH] worker: set up child Isolate inside Worker thread Refs: https://github.com/nodejs/node/issues/24016 PR-URL: https://github.com/nodejs/node/pull/26011 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Ben Noordhuis Reviewed-By: Gireesh Punathil Reviewed-By: Joyee Cheung --- src/inspector_agent.cc | 14 +- src/inspector_agent.h | 4 +- src/node_worker.cc | 333 ++++++++++++++++++++++------------------- src/node_worker.h | 22 ++- 4 files changed, 201 insertions(+), 172 deletions(-) diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 272f1a986da71f..8d7aad70e600e4 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -885,12 +885,14 @@ bool Agent::IsActive() { return io_ != nullptr || client_->IsActive(); } -void Agent::AddWorkerInspector(int thread_id, - const std::string& url, - Agent* agent) { - CHECK_NOT_NULL(client_); - agent->parent_handle_ = - client_->getWorkerManager()->NewParentHandle(thread_id, url); +void Agent::SetParentHandle( + std::unique_ptr parent_handle) { + parent_handle_ = std::move(parent_handle); +} + +std::unique_ptr Agent::GetParentHandle( + int thread_id, const std::string& url) { + return client_->getWorkerManager()->NewParentHandle(thread_id, url); } void Agent::WaitForConnect() { diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 5e599a6339e903..905b1e2841ebc8 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -85,7 +85,9 @@ class Agent { void EnableAsyncHook(); void DisableAsyncHook(); - void AddWorkerInspector(int thread_id, const std::string& url, Agent* agent); + void SetParentHandle(std::unique_ptr parent_handle); + std::unique_ptr GetParentHandle( + int thread_id, const std::string& url); // Called to create inspector sessions that can be used from the main thread. // The inspector responds by using the delegate to send messages back. diff --git a/src/node_worker.cc b/src/node_worker.cc index 36b4106d137eb5..ebd1924b8f2479 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -8,6 +8,10 @@ #include "async_wrap.h" #include "async_wrap-inl.h" +#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR +#include "inspector/worker_inspector.h" // ParentInspectorHandle +#endif + #include #include @@ -35,34 +39,21 @@ namespace worker { namespace { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR -void StartWorkerInspector(Environment* child, const std::string& url) { +void StartWorkerInspector( + Environment* child, + std::unique_ptr parent_handle, + const std::string& url) { + child->inspector_agent()->SetParentHandle(std::move(parent_handle)); child->inspector_agent()->Start(url, child->options()->debug_options(), child->inspector_host_port(), false); } -void AddWorkerInspector(Environment* parent, - Environment* child, - int id, - const std::string& url) { - parent->inspector_agent()->AddWorkerInspector(id, url, - child->inspector_agent()); -} - void WaitForWorkerInspectorToStop(Environment* child) { child->inspector_agent()->WaitForDisconnect(); child->inspector_agent()->Stop(); } - -#else -// No-ops -void StartWorkerInspector(Environment* child, const std::string& url) {} -void AddWorkerInspector(Environment* parent, - Environment* child, - int id, - const std::string& url) {} -void WaitForWorkerInspectorToStop(Environment* child) {} #endif } // anonymous namespace @@ -71,9 +62,13 @@ Worker::Worker(Environment* env, Local wrap, const std::string& url, std::shared_ptr per_isolate_opts) - : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), url_(url), + : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), + url_(url), + per_isolate_opts_(per_isolate_opts), + platform_(env->isolate_data()->platform()), + profiler_idle_notifier_started_(env->profiler_idle_notifier_started()), thread_id_(Environment::AllocateThreadId()) { - Debug(this, "Creating new worker instance at %p", static_cast(this)); + Debug(this, "Creating new worker instance with thread id %llu", thread_id_); // Set up everything that needs to be set up in the parent environment. parent_port_ = MessagePort::New(env, env->context()); @@ -89,57 +84,17 @@ Worker::Worker(Environment* env, env->message_port_string(), parent_port_->object()).FromJust(); - array_buffer_allocator_.reset(CreateArrayBufferAllocator()); - - CHECK_EQ(uv_loop_init(&loop_), 0); - isolate_ = NewIsolate(array_buffer_allocator_.get(), &loop_); - CHECK_NOT_NULL(isolate_); - - { - // Enter an environment capable of executing code in the child Isolate - // (and only in it). - Locker locker(isolate_); - Isolate::Scope isolate_scope(isolate_); - HandleScope handle_scope(isolate_); - - isolate_data_.reset(CreateIsolateData(isolate_, - &loop_, - env->isolate_data()->platform(), - array_buffer_allocator_.get())); - if (per_isolate_opts != nullptr) { - isolate_data_->set_options(per_isolate_opts); - } - CHECK(isolate_data_); - - Local context = NewContext(isolate_); - Context::Scope context_scope(context); - - // TODO(addaleax): Use CreateEnvironment(), or generally another public API. - env_.reset(new Environment( - isolate_data_.get(), context, Flags::kNoFlags, thread_id_)); - CHECK_NOT_NULL(env_); - env_->set_abort_on_uncaught_exception(false); - env_->set_worker_context(this); - - env_->Start(env->profiler_idle_notifier_started()); - env_->ProcessCliArgs(std::vector{}, - std::vector{}); - // Done while on the parent thread - AddWorkerInspector(env, env_.get(), thread_id_, url_); - } - - // The new isolate won't be bothered on this thread again. - isolate_->DiscardThreadSpecificMetadata(); - - wrap->Set(env->context(), - env->thread_id_string(), - Number::New(env->isolate(), static_cast(thread_id_))) + object()->Set(env->context(), + env->thread_id_string(), + Number::New(env->isolate(), static_cast(thread_id_))) .FromJust(); - Debug(this, - "Set up worker at %p with id %llu", - static_cast(this), - thread_id_); +#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR + inspector_parent_handle_ = + env->inspector_agent()->GetParentHandle(thread_id_, url); +#endif + + Debug(this, "Preparation for worker %llu finished", thread_id_); } bool Worker::is_stopped() const { @@ -147,14 +102,79 @@ bool Worker::is_stopped() const { return stopped_; } +// This class contains data that is only relevant to the child thread itself, +// and only while it is running. +// (Eventually, the Environment instance should probably also be moved here.) +class WorkerThreadData { + public: + explicit WorkerThreadData(Worker* w) + : w_(w), + array_buffer_allocator_(CreateArrayBufferAllocator()) { + CHECK_EQ(uv_loop_init(&loop_), 0); + + Isolate* isolate = NewIsolate(array_buffer_allocator_.get(), &loop_); + CHECK_NOT_NULL(isolate); + + { + Locker locker(isolate); + Isolate::Scope isolate_scope(isolate); + HandleScope handle_scope(isolate); + isolate_data_.reset(CreateIsolateData(isolate, + &loop_, + w_->platform_, + array_buffer_allocator_.get())); + CHECK(isolate_data_); + if (w_->per_isolate_opts_) + isolate_data_->set_options(std::move(w_->per_isolate_opts_)); + } + + Mutex::ScopedLock lock(w_->mutex_); + w_->isolate_ = isolate; + } + + ~WorkerThreadData() { + Debug(w_, "Worker %llu dispose isolate", w_->thread_id_); + Isolate* isolate; + { + Mutex::ScopedLock lock(w_->mutex_); + isolate = w_->isolate_; + w_->isolate_ = nullptr; + } + + w_->platform_->CancelPendingDelayedTasks(isolate); + + isolate_data_.reset(); + w_->platform_->UnregisterIsolate(isolate); + + isolate->Dispose(); + + // Need to run the loop one more time to close the platform's uv_async_t + uv_run(&loop_, UV_RUN_ONCE); + + CheckedUvLoopClose(&loop_); + } + + private: + Worker* const w_; + uv_loop_t loop_; + DeleteFnPtr + array_buffer_allocator_; + DeleteFnPtr isolate_data_; + + friend class Worker; +}; + void Worker::Run() { std::string name = "WorkerThread "; name += std::to_string(thread_id_); TRACE_EVENT_METADATA1( "__metadata", "thread_name", "name", TRACE_STR_COPY(name.c_str())); - MultiIsolatePlatform* platform = isolate_data_->platform(); - CHECK_NOT_NULL(platform); + CHECK_NOT_NULL(platform_); + + Debug(this, "Creating isolate for worker with id %llu", thread_id_); + + WorkerThreadData data(this); Debug(this, "Starting worker with id %llu", thread_id_); { @@ -163,10 +183,73 @@ void Worker::Run() { SealHandleScope outer_seal(isolate_); bool inspector_started = false; + DeleteFnPtr env_; + OnScopeLeave cleanup_env([&]() { + if (!env_) return; + env_->set_can_call_into_js(false); + Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_, + Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE); + + // Grab the parent-to-child channel and render is unusable. + MessagePort* child_port; + { + Mutex::ScopedLock lock(mutex_); + child_port = child_port_; + child_port_ = nullptr; + } + + { + Context::Scope context_scope(env_->context()); + if (child_port != nullptr) + child_port->Close(); + env_->stop_sub_worker_contexts(); + env_->RunCleanup(); + RunAtExit(env_.get()); +#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR + if (inspector_started) + WaitForWorkerInspectorToStop(env_.get()); +#endif + + { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + stopped_ = true; + } + + env_->RunCleanup(); + + // This call needs to be made while the `Environment` is still alive + // because we assume that it is available for async tracking in the + // NodePlatform implementation. + platform_->DrainTasks(isolate_); + } + }); + { - Context::Scope context_scope(env_->context()); HandleScope handle_scope(isolate_); + Local context = NewContext(isolate_); + if (is_stopped()) return; + + CHECK(!context.IsEmpty()); + Context::Scope context_scope(context); + { + // TODO(addaleax): Use CreateEnvironment(), or generally another + // public API. + env_.reset(new Environment(data.isolate_data_.get(), + context, + Environment::kNoFlags, + thread_id_)); + CHECK_NOT_NULL(env_); + env_->set_abort_on_uncaught_exception(false); + env_->set_worker_context(this); + + env_->Start(profiler_idle_notifier_started_); + env_->ProcessCliArgs(std::vector{}, + std::vector{}); + } + + Debug(this, "Created Environment for worker with id %llu", thread_id_); + if (is_stopped()) return; { HandleScope handle_scope(isolate_); Mutex::ScopedLock lock(mutex_); @@ -182,8 +265,13 @@ void Worker::Run() { Debug(this, "Created message port for worker %llu", thread_id_); } - if (!is_stopped()) { - StartWorkerInspector(env_.get(), url_); + if (is_stopped()) return; + { +#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR + StartWorkerInspector(env_.get(), + std::move(inspector_parent_handle_), + url_); +#endif inspector_started = true; HandleScope handle_scope(isolate_); @@ -198,6 +286,7 @@ void Worker::Run() { Debug(this, "Loaded environment for worker %llu", thread_id_); } + if (is_stopped()) return; { SealHandleScope seal(isolate_); bool more; @@ -205,12 +294,12 @@ void Worker::Run() { node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); do { if (is_stopped()) break; - uv_run(&loop_, UV_RUN_DEFAULT); + uv_run(&data.loop_, UV_RUN_DEFAULT); if (is_stopped()) break; - platform->DrainTasks(isolate_); + platform_->DrainTasks(isolate_); - more = uv_loop_alive(&loop_); + more = uv_loop_alive(&data.loop_); if (more && !is_stopped()) continue; @@ -218,7 +307,7 @@ void Worker::Run() { // Emit `beforeExit` if the loop became alive either after emitting // event, or after running some callbacks. - more = uv_loop_alive(&loop_); + more = uv_loop_alive(&data.loop_); } while (more == true); env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT); @@ -237,79 +326,11 @@ void Worker::Run() { Debug(this, "Exiting thread for worker %llu with exit code %d", thread_id_, exit_code_); } - - env_->set_can_call_into_js(false); - Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_, - Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE); - - // Grab the parent-to-child channel and render is unusable. - MessagePort* child_port; - { - Mutex::ScopedLock lock(mutex_); - child_port = child_port_; - child_port_ = nullptr; - } - - { - Context::Scope context_scope(env_->context()); - child_port->Close(); - env_->stop_sub_worker_contexts(); - env_->RunCleanup(); - RunAtExit(env_.get()); - if (inspector_started) - WaitForWorkerInspectorToStop(env_.get()); - - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - stopped_ = true; - } - - env_->RunCleanup(); - - // This call needs to be made while the `Environment` is still alive - // because we assume that it is available for async tracking in the - // NodePlatform implementation. - platform->DrainTasks(isolate_); - } - - env_.reset(); - } - - DisposeIsolate(); - - { - Mutex::ScopedLock lock(mutex_); - CHECK(thread_exit_async_); - scheduled_on_thread_stopped_ = true; - uv_async_send(thread_exit_async_.get()); } Debug(this, "Worker %llu thread stops", thread_id_); } -void Worker::DisposeIsolate() { - if (env_) { - CHECK_NOT_NULL(isolate_); - Locker locker(isolate_); - Isolate::Scope isolate_scope(isolate_); - env_.reset(); - } - - if (isolate_ == nullptr) - return; - - Debug(this, "Worker %llu dispose isolate", thread_id_); - CHECK(isolate_data_); - MultiIsolatePlatform* platform = isolate_data_->platform(); - platform->CancelPendingDelayedTasks(isolate_); - - isolate_data_.reset(); - platform->UnregisterIsolate(isolate_); - - isolate_->Dispose(); - isolate_ = nullptr; -} - void Worker::JoinThread() { if (thread_joined_) return; @@ -340,7 +361,6 @@ void Worker::OnThreadStopped() { CHECK(stopped_); } - CHECK_NULL(child_port_); parent_port_ = nullptr; } @@ -370,16 +390,9 @@ Worker::~Worker() { CHECK(stopped_); CHECK(thread_joined_); - CHECK_NULL(child_port_); // This has most likely already happened within the worker thread -- this // is just in case Worker creation failed early. - DisposeIsolate(); - - // Need to run the loop one more time to close the platform's uv_async_t - uv_run(&loop_, UV_RUN_ONCE); - - CheckedUvLoopClose(&loop_); Debug(this, "Worker %llu destroyed", thread_id_); } @@ -476,7 +489,13 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { }), 0); CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) { - static_cast(arg)->Run(); + Worker* w = static_cast(arg); + w->Run(); + + Mutex::ScopedLock lock(w->mutex_); + CHECK(w->thread_exit_async_); + w->scheduled_on_thread_stopped_ = true; + uv_async_send(w->thread_exit_async_.get()); }, static_cast(w)), 0); } @@ -510,12 +529,12 @@ void Worker::Exit(int code) { Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); if (!stopped_) { - CHECK_NOT_NULL(env_); stopped_ = true; exit_code_ = code; if (child_port_ != nullptr) child_port_->StopEventLoop(); - isolate_->TerminateExecution(); + if (isolate_ != nullptr) + isolate_->TerminateExecution(); } } diff --git a/src/node_worker.h b/src/node_worker.h index 5ba9ceade3dc6b..4d7a7335ca6d63 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -9,6 +9,8 @@ namespace node { namespace worker { +class WorkerThreadData; + // A worker thread, as represented in its parent thread. class Worker : public AsyncWrap { public: @@ -49,17 +51,19 @@ class Worker : public AsyncWrap { private: void OnThreadStopped(); - void DisposeIsolate(); - uv_loop_t loop_; - DeleteFnPtr isolate_data_; - DeleteFnPtr env_; const std::string url_; + + std::shared_ptr per_isolate_opts_; + MultiIsolatePlatform* platform_; v8::Isolate* isolate_ = nullptr; - DeleteFnPtr - array_buffer_allocator_; + bool profiler_idle_notifier_started_; uv_thread_t tid_; +#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR + std::unique_ptr inspector_parent_handle_; +#endif + // This mutex protects access to all variables listed below it. mutable Mutex mutex_; @@ -79,12 +83,14 @@ class Worker : public AsyncWrap { std::unique_ptr child_port_data_; - // The child port is always kept alive by the child Environment's persistent - // handle to it. + // The child port is kept alive by the child Environment's persistent + // handle to it, as long as that child Environment exists. MessagePort* child_port_ = nullptr; // This is always kept alive because the JS object associated with the Worker // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; + + friend class WorkerThreadData; }; } // namespace worker