diff --git a/node.gyp b/node.gyp index fb1c674e8e8c1a..d52748ec2bc562 100644 --- a/node.gyp +++ b/node.gyp @@ -509,14 +509,18 @@ 'src/inspector_socket.cc', 'src/inspector_socket_server.cc', 'src/inspector/main_thread_interface.cc', + 'src/inspector/worker_inspector.cc', 'src/inspector/node_string.cc', + 'src/inspector/worker_agent.cc', 'src/inspector/tracing_agent.cc', 'src/inspector_agent.h', 'src/inspector_io.h', 'src/inspector_socket.h', 'src/inspector_socket_server.h', 'src/inspector/main_thread_interface.h', + 'src/inspector/worker_inspector.h', 'src/inspector/node_string.h', + 'src/inspector/worker_agent.h', 'src/inspector/tracing_agent.h', '<@(node_inspector_generated_sources)' ], @@ -1066,6 +1070,8 @@ '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/Forward.h', '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/Protocol.cpp', '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/Protocol.h', + '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/NodeWorker.cpp', + '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/NodeWorker.h', '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/NodeTracing.cpp', '<(SHARED_INTERMEDIATE_DIR)/src/node/inspector/protocol/NodeTracing.h', ], diff --git a/src/inspector/node_protocol.pdl b/src/inspector/node_protocol.pdl index 27b3d814c88d0a..9fb9f1c55fa191 100644 --- a/src/inspector/node_protocol.pdl +++ b/src/inspector/node_protocol.pdl @@ -37,3 +37,58 @@ experimental domain NodeTracing # Signals that tracing is stopped and there is no trace buffers pending flush, all data were # delivered via dataCollected events. event tracingComplete + +# Support for sending messages to Node worker Inspector instances. +experimental domain NodeWorker + + type WorkerID extends string + + # Unique identifier of attached debugging session. + type SessionID extends string + + type WorkerInfo extends object + properties + WorkerID workerId + string type + string title + string url + + # Sends protocol message over session with given id. + command sendMessageToWorker + parameters + string message + # Identifier of the session. + SessionID sessionId + + # Instructs the inspector to attach to running workers. Will also attach to new workers + # as they start + command enable + parameters + # Whether to new workers should be paused until the frontend sends `Runtime.runIfWaitingForDebugger` + # message to run them. + boolean waitForDebuggerOnStart + + # Detaches from all running workers and disables attaching to new workers as they are started. + command disable + + # Issued when attached to a worker. + event attachedToWorker + parameters + # Identifier assigned to the session used to send/receive messages. + SessionID sessionId + WorkerInfo workerInfo + boolean waitingForDebugger + + # Issued when detached from the worker. + event detachedFromWorker + parameters + # Detached session identifier. + SessionID sessionId + + # Notifies about a new protocol message received from the session + # (session ID is provided in attachedToWorker notification). + event receivedMessageFromWorker + parameters + # Identifier of a session which sends a message. + SessionID sessionId + string message diff --git a/src/inspector/node_protocol_config.json b/src/inspector/node_protocol_config.json index 7cea20ae937616..4ef37856068093 100644 --- a/src/inspector/node_protocol_config.json +++ b/src/inspector/node_protocol_config.json @@ -3,12 +3,7 @@ "path": "node_protocol.json", "package": "src/node/inspector/protocol", "output": "node/inspector/protocol", - "namespace": ["node", "inspector", "protocol"], - "options": [ - { - "domain": "NodeTracing" - } - ] + "namespace": ["node", "inspector", "protocol"] }, "exported": { "package": "include/inspector", diff --git a/src/inspector/worker_agent.cc b/src/inspector/worker_agent.cc new file mode 100644 index 00000000000000..fccd6d57a53c2a --- /dev/null +++ b/src/inspector/worker_agent.cc @@ -0,0 +1,154 @@ +#include "worker_agent.h" + +#include "main_thread_interface.h" +#include "worker_inspector.h" + +namespace node { +namespace inspector { +namespace protocol { + +class NodeWorkers + : public std::enable_shared_from_this { + public: + explicit NodeWorkers(std::weak_ptr frontend, + std::shared_ptr thread) + : frontend_(frontend), thread_(thread) {} + void WorkerCreated(const std::string& title, + const std::string& url, + bool waiting, + std::shared_ptr target); + void Receive(const std::string& id, const std::string& message); + void Send(const std::string& id, const std::string& message); + void Detached(const std::string& id); + + private: + std::weak_ptr frontend_; + std::shared_ptr thread_; + std::unordered_map> sessions_; + int next_target_id_ = 0; +}; + +namespace { +class AgentWorkerInspectorDelegate : public WorkerDelegate { + public: + explicit AgentWorkerInspectorDelegate(std::shared_ptr workers) + : workers_(workers) {} + + void WorkerCreated(const std::string& title, + const std::string& url, + bool waiting, + std::shared_ptr target) override { + workers_->WorkerCreated(title, url, waiting, target); + } + + private: + std::shared_ptr workers_; +}; + +class ParentInspectorSessionDelegate : public InspectorSessionDelegate { + public: + ParentInspectorSessionDelegate(const std::string& id, + std::shared_ptr workers) + : id_(id), workers_(workers) {} + + ~ParentInspectorSessionDelegate() override { + workers_->Detached(id_); + } + + void SendMessageToFrontend(const v8_inspector::StringView& msg) override { + std::string message = protocol::StringUtil::StringViewToUtf8(msg); + workers_->Send(id_, message); + } + + private: + std::string id_; + std::shared_ptr workers_; +}; + +std::unique_ptr WorkerInfo(const std::string& id, + const std::string& title, + const std::string& url) { + return NodeWorker::WorkerInfo::create() + .setWorkerId(id) + .setTitle(title) + .setUrl(url) + .setType("worker").build(); +} +} // namespace + +WorkerAgent::WorkerAgent(std::weak_ptr manager) + : manager_(manager) {} + + +void WorkerAgent::Wire(UberDispatcher* dispatcher) { + frontend_.reset(new NodeWorker::Frontend(dispatcher->channel())); + NodeWorker::Dispatcher::wire(dispatcher, this); + auto manager = manager_.lock(); + CHECK_NOT_NULL(manager); + workers_ = + std::make_shared(frontend_, manager->MainThread()); +} + +DispatchResponse WorkerAgent::sendMessageToWorker(const String& message, + const String& sessionId) { + workers_->Receive(sessionId, message); + return DispatchResponse::OK(); +} + +DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) { + auto manager = manager_.lock(); + if (!manager) { + return DispatchResponse::OK(); + } + if (!event_handle_) { + std::unique_ptr delegate( + new AgentWorkerInspectorDelegate(workers_)); + event_handle_ = manager->SetAutoAttach(std::move(delegate)); + } + event_handle_->SetWaitOnStart(waitForDebuggerOnStart); + return DispatchResponse::OK(); +} + +DispatchResponse WorkerAgent::disable() { + event_handle_.reset(); + return DispatchResponse::OK(); +} + +void NodeWorkers::WorkerCreated(const std::string& title, + const std::string& url, + bool waiting, + std::shared_ptr target) { + auto frontend = frontend_.lock(); + if (!frontend) + return; + std::string id = std::to_string(++next_target_id_); + auto delegate = thread_->MakeDelegateThreadSafe( + std::unique_ptr( + new ParentInspectorSessionDelegate(id, shared_from_this()))); + sessions_[id] = target->Connect(std::move(delegate), true); + frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting); +} + +void NodeWorkers::Send(const std::string& id, const std::string& message) { + auto frontend = frontend_.lock(); + if (frontend) + frontend->receivedMessageFromWorker(id, message); +} + +void NodeWorkers::Receive(const std::string& id, const std::string& message) { + auto it = sessions_.find(id); + if (it != sessions_.end()) + it->second->Dispatch(Utf8ToStringView(message)->string()); +} + +void NodeWorkers::Detached(const std::string& id) { + if (sessions_.erase(id) == 0) + return; + auto frontend = frontend_.lock(); + if (frontend) { + frontend->detachedFromWorker(id); + } +} +} // namespace protocol +} // namespace inspector +} // namespace node diff --git a/src/inspector/worker_agent.h b/src/inspector/worker_agent.h new file mode 100644 index 00000000000000..402c7194163967 --- /dev/null +++ b/src/inspector/worker_agent.h @@ -0,0 +1,39 @@ +#ifndef SRC_INSPECTOR_WORKER_AGENT_H_ +#define SRC_INSPECTOR_WORKER_AGENT_H_ + +#include "node/inspector/protocol/NodeWorker.h" +#include "v8.h" + + +namespace node { +namespace inspector { +class WorkerManagerEventHandle; +class WorkerManager; + +namespace protocol { +class NodeWorkers; + +class WorkerAgent : public NodeWorker::Backend { + public: + explicit WorkerAgent(std::weak_ptr manager); + ~WorkerAgent() override = default; + + void Wire(UberDispatcher* dispatcher); + + DispatchResponse sendMessageToWorker(const String& message, + const String& sessionId) override; + + DispatchResponse enable(bool waitForDebuggerOnStart) override; + DispatchResponse disable() override; + + private: + std::shared_ptr frontend_; + std::weak_ptr manager_; + std::unique_ptr event_handle_; + std::shared_ptr workers_; +}; +} // namespace protocol +} // namespace inspector +} // namespace node + +#endif // SRC_INSPECTOR_WORKER_AGENT_H_ diff --git a/src/inspector/worker_inspector.cc b/src/inspector/worker_inspector.cc new file mode 100644 index 00000000000000..52e71a562daeb6 --- /dev/null +++ b/src/inspector/worker_inspector.cc @@ -0,0 +1,128 @@ +#include "worker_inspector.h" + +#include "main_thread_interface.h" + +namespace node { +namespace inspector { +namespace { + +class WorkerStartedRequest : public Request { + public: + WorkerStartedRequest( + int id, + const std::string& url, + std::shared_ptr worker_thread, + bool waiting) + : id_(id), + info_(BuildWorkerTitle(id), url, worker_thread), + waiting_(waiting) {} + void Call(MainThreadInterface* thread) override { + auto manager = thread->inspector_agent()->GetWorkerManager(); + manager->WorkerStarted(id_, info_, waiting_); + } + + private: + static std::string BuildWorkerTitle(int id) { + return "Worker " + std::to_string(id); + } + + int id_; + WorkerInfo info_; + bool waiting_; +}; + + +void Report(const std::unique_ptr& delegate, + const WorkerInfo& info, bool waiting) { + if (info.worker_thread) + delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread); +} + +class WorkerFinishedRequest : public Request { + public: + explicit WorkerFinishedRequest(int worker_id) : worker_id_(worker_id) {} + + void Call(MainThreadInterface* thread) override { + thread->inspector_agent()->GetWorkerManager()->WorkerFinished(worker_id_); + } + + private: + int worker_id_; +}; +} // namespace + + +ParentInspectorHandle::ParentInspectorHandle( + int id, const std::string& url, + std::shared_ptr parent_thread, bool wait_for_connect) + : id_(id), url_(url), parent_thread_(parent_thread), + wait_(wait_for_connect) {} + +ParentInspectorHandle::~ParentInspectorHandle() { + parent_thread_->Post( + std::unique_ptr(new WorkerFinishedRequest(id_))); +} + +void ParentInspectorHandle::WorkerStarted( + std::shared_ptr worker_thread, bool waiting) { + std::unique_ptr request( + new WorkerStartedRequest(id_, url_, worker_thread, waiting)); + parent_thread_->Post(std::move(request)); +} + +void WorkerManager::WorkerFinished(int session_id) { + children_.erase(session_id); +} + +void WorkerManager::WorkerStarted(int session_id, + const WorkerInfo& info, + bool waiting) { + if (info.worker_thread->Expired()) + return; + children_.emplace(session_id, info); + for (const auto& delegate : delegates_) { + Report(delegate.second, info, waiting); + } +} + +std::unique_ptr +WorkerManager::NewParentHandle(int thread_id, const std::string& url) { + bool wait = !delegates_waiting_on_start_.empty(); + return std::unique_ptr( + new ParentInspectorHandle(thread_id, url, thread_, wait)); +} + +void WorkerManager::RemoveAttachDelegate(int id) { + delegates_.erase(id); + delegates_waiting_on_start_.erase(id); +} + +std::unique_ptr WorkerManager::SetAutoAttach( + std::unique_ptr attach_delegate) { + int id = ++next_delegate_id_; + delegates_[id] = std::move(attach_delegate); + const auto& delegate = delegates_[id]; + for (const auto& worker : children_) { + // Waiting is only reported when a worker is started, same as browser + Report(delegate, worker.second, false); + } + return std::unique_ptr( + new WorkerManagerEventHandle(shared_from_this(), id)); +} + +void WorkerManager::SetWaitOnStartForDelegate(int id, bool wait) { + if (wait) + delegates_waiting_on_start_.insert(id); + else + delegates_waiting_on_start_.erase(id); +} + +void WorkerManagerEventHandle::SetWaitOnStart(bool wait_on_start) { + manager_->SetWaitOnStartForDelegate(id_, wait_on_start); +} + +WorkerManagerEventHandle::~WorkerManagerEventHandle() { + manager_->RemoveAttachDelegate(id_); +} +} // namespace inspector +} // namespace node diff --git a/src/inspector/worker_inspector.h b/src/inspector/worker_inspector.h new file mode 100644 index 00000000000000..e3c96cf62f01b0 --- /dev/null +++ b/src/inspector/worker_inspector.h @@ -0,0 +1,98 @@ +#ifndef SRC_INSPECTOR_WORKER_INSPECTOR_H_ +#define SRC_INSPECTOR_WORKER_INSPECTOR_H_ + +#if !HAVE_INSPECTOR +#error("This header can only be used when inspector is enabled") +#endif + +#include +#include +#include +#include + +namespace node { +namespace inspector { +class MainThreadHandle; +class WorkerManager; + +class WorkerDelegate { + public: + virtual void WorkerCreated(const std::string& title, + const std::string& url, + bool waiting, + std::shared_ptr worker) = 0; +}; + +class WorkerManagerEventHandle { + public: + explicit WorkerManagerEventHandle(std::shared_ptr manager, + int id) + : manager_(manager), id_(id) {} + void SetWaitOnStart(bool wait_on_start); + ~WorkerManagerEventHandle(); + + private: + std::shared_ptr manager_; + int id_; +}; + +struct WorkerInfo { + WorkerInfo(const std::string& target_title, + const std::string& target_url, + std::shared_ptr worker_thread) + : title(target_title), + url(target_url), + worker_thread(worker_thread) {} + std::string title; + std::string url; + std::shared_ptr worker_thread; +}; + +class ParentInspectorHandle { + public: + ParentInspectorHandle(int id, const std::string& url, + std::shared_ptr parent_thread, + bool wait_for_connect); + ~ParentInspectorHandle(); + void WorkerStarted(std::shared_ptr worker_thread, + bool waiting); + bool WaitForConnect() { + return wait_; + } + + private: + int id_; + std::string url_; + std::shared_ptr parent_thread_; + bool wait_; +}; + +class WorkerManager : public std::enable_shared_from_this { + public: + explicit WorkerManager(std::shared_ptr thread) + : thread_(thread) {} + + std::unique_ptr NewParentHandle( + int thread_id, const std::string& url); + void WorkerStarted(int session_id, const WorkerInfo& info, bool waiting); + void WorkerFinished(int session_id); + std::unique_ptr SetAutoAttach( + std::unique_ptr attach_delegate); + void SetWaitOnStartForDelegate(int id, bool wait); + void RemoveAttachDelegate(int id); + std::shared_ptr MainThread() { + return thread_; + } + + private: + std::shared_ptr thread_; + std::unordered_map children_; + std::unordered_map> delegates_; + // If any one needs it, workers stop for all + std::unordered_set delegates_waiting_on_start_; + int next_delegate_id_ = 0; +}; +} // namespace inspector +} // namespace node + +#endif // SRC_INSPECTOR_WORKER_INSPECTOR_H_ diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 63b92663532e06..ebb7b7d5bc3e72 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -4,6 +4,8 @@ #include "inspector/main_thread_interface.h" #include "inspector/node_string.h" #include "inspector/tracing_agent.h" +#include "inspector/worker_agent.h" +#include "inspector/worker_inspector.h" #include "node/inspector/protocol/Protocol.h" #include "node_internals.h" #include "node_url.h" @@ -201,6 +203,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, public: explicit ChannelImpl(Environment* env, const std::unique_ptr& inspector, + std::shared_ptr worker_manager, std::unique_ptr delegate, bool prevent_shutdown) : delegate_(std::move(delegate)), @@ -209,11 +212,15 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, node_dispatcher_.reset(new protocol::UberDispatcher(this)); tracing_agent_.reset(new protocol::TracingAgent(env)); tracing_agent_->Wire(node_dispatcher_.get()); + worker_agent_.reset(new protocol::WorkerAgent(worker_manager)); + worker_agent_->Wire(node_dispatcher_.get()); } virtual ~ChannelImpl() { tracing_agent_->disable(); tracing_agent_.reset(); // Dispose before the dispatchers + worker_agent_->disable(); + worker_agent_.reset(); // Dispose before the dispatchers } std::string dispatchProtocolMessage(const StringView& message) { @@ -273,6 +280,7 @@ class ChannelImpl final : public v8_inspector::V8Inspector::Channel, } std::unique_ptr tracing_agent_; + std::unique_ptr worker_agent_; std::unique_ptr delegate_; std::unique_ptr session_; std::unique_ptr node_dispatcher_; @@ -469,7 +477,8 @@ class NodeInspectorClient : public V8InspectorClient { // TODO(addaleax): Revert back to using make_unique once we get issues // with CI resolved (i.e. revert the patch that added this comment). channels_[session_id].reset( - new ChannelImpl(env_, client_, std::move(delegate), prevent_shutdown)); + new ChannelImpl(env_, client_, getWorkerManager(), + std::move(delegate), prevent_shutdown)); return session_id; } @@ -589,6 +598,14 @@ class NodeInspectorClient : public V8InspectorClient { return interface_->GetHandle(); } + std::shared_ptr getWorkerManager() { + if (worker_manager_ == nullptr) { + worker_manager_ = + std::make_shared(getThreadHandle()); + } + return worker_manager_; + } + bool IsActive() { return !channels_.empty(); } @@ -646,6 +663,7 @@ class NodeInspectorClient : public V8InspectorClient { bool waiting_for_io_shutdown_ = false; // Allows accessing Inspector from non-main threads std::unique_ptr interface_; + std::shared_ptr worker_manager_; }; Agent::Agent(Environment* env) @@ -680,7 +698,10 @@ bool Agent::Start(const std::string& path, } bool wait_for_connect = options->wait_for_connect(); - if (!options->inspector_enabled || !StartIoThread()) { + if (parent_handle_) { + wait_for_connect = parent_handle_->WaitForConnect(); + parent_handle_->WorkerStarted(client_->getThreadHandle(), wait_for_connect); + } else if (!options->inspector_enabled || !StartIoThread()) { return false; } if (wait_for_connect) { @@ -727,7 +748,9 @@ std::unique_ptr Agent::Connect( void Agent::WaitForDisconnect() { CHECK_NOT_NULL(client_); - if (client_->hasConnectedSessions()) { + bool is_worker = parent_handle_ != nullptr; + parent_handle_.reset(); + if (client_->hasConnectedSessions() && !is_worker) { fprintf(stderr, "Waiting for the debugger to disconnect...\n"); fflush(stderr); } @@ -842,7 +865,11 @@ void Agent::ContextCreated(Local context, const ContextInfo& info) { } bool Agent::WillWaitForConnect() { - return debug_options_->wait_for_connect(); + if (debug_options_->wait_for_connect()) + return true; + if (parent_handle_) + return parent_handle_->WaitForConnect(); + return false; } bool Agent::IsActive() { @@ -851,11 +878,24 @@ bool Agent::IsActive() { return io_ != nullptr || client_->IsActive(); } +void Agent::AddWorkerInspector(int thread_id, + const std::string& url, + Agent* agent) { + CHECK_NOT_NULL(client_); + agent->parent_handle_ = + client_->getWorkerManager()->NewParentHandle(thread_id, url); +} + void Agent::WaitForConnect() { CHECK_NOT_NULL(client_); client_->waitForFrontend(); } +std::shared_ptr Agent::GetWorkerManager() { + CHECK_NOT_NULL(client_); + return client_->getWorkerManager(); +} + SameThreadInspectorSession::~SameThreadInspectorSession() { auto client = client_.lock(); if (client) diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 79ae8d4cd99404..e926ccaa926dab 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -26,7 +26,9 @@ struct ContextInfo; namespace inspector { class InspectorIo; +class ParentInspectorHandle; class NodeInspectorClient; +class WorkerManager; class InspectorSession { public: @@ -82,6 +84,8 @@ class Agent { void EnableAsyncHook(); void DisableAsyncHook(); + void AddWorkerInspector(int thread_id, const std::string& url, Agent* agent); + // Called to create inspector sessions that can be used from the main thread. // The inspector responds by using the delegate to send messages back. std::unique_ptr Connect( @@ -103,6 +107,9 @@ class Agent { std::shared_ptr options() { return debug_options_; } void ContextCreated(v8::Local context, const ContextInfo& info); + // Interface for interacting with inspectors in worker threads + std::shared_ptr GetWorkerManager(); + private: void ToggleAsyncHook(v8::Isolate* isolate, const node::Persistent& fn); @@ -112,6 +119,7 @@ class Agent { std::shared_ptr client_; // Interface for transports, e.g. WebSocket server std::unique_ptr io_; + std::unique_ptr parent_handle_; std::string path_; std::shared_ptr debug_options_; diff --git a/src/node_worker.cc b/src/node_worker.cc index 1a90e3a64fd843..209b7a4d091490 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -40,6 +40,14 @@ void StartWorkerInspector(Environment* child, const std::string& url) { child->inspector_agent()->Start(url, nullptr, false); } +void AddWorkerInspector(Environment* parent, + Environment* child, + int id, + const std::string& url) { + parent->inspector_agent()->AddWorkerInspector(id, url, + child->inspector_agent()); +} + void WaitForWorkerInspectorToStop(Environment* child) { child->inspector_agent()->WaitForDisconnect(); child->inspector_agent()->Stop(); @@ -48,6 +56,10 @@ void WaitForWorkerInspectorToStop(Environment* child) { #else // No-ops void StartWorkerInspector(Environment* child, const std::string& url) {} +void AddWorkerInspector(Environment* parent, + Environment* child, + int id, + const std::string& url) {} void WaitForWorkerInspectorToStop(Environment* child) {} #endif @@ -115,6 +127,8 @@ Worker::Worker(Environment* env, Local wrap, const std::string& url) env_->Start(std::vector{}, std::vector{}, env->profiler_idle_notifier_started()); + // Done while on the parent thread + AddWorkerInspector(env, env_.get(), thread_id_, url_); } // The new isolate won't be bothered on this thread again. diff --git a/test/parallel/test-worker-debug.js b/test/parallel/test-worker-debug.js new file mode 100644 index 00000000000000..8691879f14e78f --- /dev/null +++ b/test/parallel/test-worker-debug.js @@ -0,0 +1,228 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); + +common.skipIfInspectorDisabled(); + +const assert = require('assert'); +const EventEmitter = require('events'); +const { Session } = require('inspector'); +const { pathToFileURL } = require('url'); +const { + Worker, isMainThread, parentPort, workerData +} = require('worker_threads'); + + +const workerMessage = 'This is a message from a worker'; + +function waitForMessage() { + return new Promise((resolve) => { + parentPort.once('message', resolve); + }); +} + +// This is at the top so line numbers change less often +if (!isMainThread) { + if (workerData === 1) { + console.log(workerMessage); + debugger; // eslint-disable-line no-debugger + } else if (workerData === 2) { + parentPort.postMessage('running'); + waitForMessage(); + } + return; +} + +function doPost(session, method, params) { + return new Promise((resolve, reject) => { + session.post(method, params, (error, result) => { + if (error) + reject(JSON.stringify(error)); + else + resolve(result); + }); + }); +} + +function waitForEvent(emitter, event) { + return new Promise((resolve) => emitter.once(event, resolve)); +} + +function waitForWorkerAttach(session) { + return waitForEvent(session, 'NodeWorker.attachedToWorker') + .then(({ params }) => params); +} + +async function waitForWorkerDetach(session, id) { + let sessionId; + do { + const { params } = + await waitForEvent(session, 'NodeWorker.detachedFromWorker'); + sessionId = params.sessionId; + } while (sessionId !== id); +} + +function runWorker(id, workerCallback = () => {}) { + return new Promise((resolve, reject) => { + const worker = new Worker(__filename, { workerData: id }); + workerCallback(worker); + worker.on('error', reject); + worker.on('exit', resolve); + }); +} + +class WorkerSession extends EventEmitter { + constructor(parentSession, id) { + super(); + this._parentSession = parentSession; + this._id = id; + this._requestCallbacks = new Map(); + this._nextCommandId = 1; + this._parentSession.on('NodeWorker.receivedMessageFromWorker', + ({ params }) => { + if (params.sessionId === this._id) + this._processMessage(JSON.parse(params.message)); + }); + } + + _processMessage(message) { + if (message.id === undefined) { + // console.log(JSON.stringify(message)); + this.emit('inspectorNotification', message); + this.emit(message.method, message); + return; + } + const callback = this._requestCallbacks.get(message.id); + if (callback) { + this._requestCallbacks.delete(message.id); + if (message.error) + callback[1](message.error.message); + else + callback[0](message.result); + } + } + + async waitForBreakAfterCommand(command, script, line) { + const notificationPromise = waitForEvent(this, 'Debugger.paused'); + this.post(command); + const notification = await notificationPromise; + const callFrame = notification.params.callFrames[0]; + assert.strictEqual(callFrame.url, pathToFileURL(script).toString()); + assert.strictEqual(callFrame.location.lineNumber, line); + } + + post(method, parameters) { + const msg = { + id: this._nextCommandId++, + method + }; + if (parameters) + msg.params = parameters; + + return new Promise((resolve, reject) => { + this._requestCallbacks.set(msg.id, [resolve, reject]); + this._parentSession.post('NodeWorker.sendMessageToWorker', { + sessionId: this._id, message: JSON.stringify(msg) + }); + }); + } +} + +async function testBasicWorkerDebug(session, post) { + /* + 1. Do 'enble' with waitForDebuggerOnStart = true + 2. Run worker. It should break on start. + 3. Enable Runtime (to get console message) and Debugger. Resume. + 4. Breaks on the 'debugger' statement. Resume. + 5. Console message recieved, worker runs to a completion. + 6. contextCreated/contextDestroyed had been properly dispatched + */ + console.log('Test basic debug scenario'); + await post('NodeWorker.enable', { waitForDebuggerOnStart: true }); + const attached = waitForWorkerAttach(session); + const worker = runWorker(1); + const { sessionId, waitingForDebugger } = await attached; + assert.strictEqual(waitingForDebugger, true); + const detached = waitForWorkerDetach(session, sessionId); + const workerSession = new WorkerSession(session, sessionId); + const contextEvents = Promise.all([ + waitForEvent(workerSession, 'Runtime.executionContextCreated'), + waitForEvent(workerSession, 'Runtime.executionContextDestroyed') + ]); + const consolePromise = waitForEvent(workerSession, 'Runtime.consoleAPICalled') + .then((notification) => notification.params.args[0].value); + await workerSession.post('Debugger.enable'); + await workerSession.post('Runtime.enable'); + await workerSession.waitForBreakAfterCommand( + 'Runtime.runIfWaitingForDebugger', __filename, 2); + await workerSession.waitForBreakAfterCommand( + 'Debugger.resume', __filename, 27); // V8 line number is zero-based + assert.strictEqual(await consolePromise, workerMessage); + workerSession.post('Debugger.resume'); + await Promise.all([worker, detached, contextEvents]); +} + +async function testNoWaitOnStart(session, post) { + console.log('Test disabled waitForDebuggerOnStart'); + await post('NodeWorker.enable', { waitForDebuggerOnStart: false }); + let worker; + const promise = waitForWorkerAttach(session); + const exitPromise = runWorker(2, (w) => { worker = w; }); + const { waitingForDebugger } = await promise; + assert.strictEqual(waitingForDebugger, false); + worker.postMessage('resume'); + await exitPromise; +} + +async function testTwoWorkers(session, post) { + console.log('Test attach to a running worker and then start a new one'); + await post('NodeWorker.disable'); + let okToAttach = false; + const worker1attached = waitForWorkerAttach(session).then((notification) => { + assert.strictEqual(okToAttach, true); + return notification; + }); + + let worker1Exited; + const worker = await new Promise((resolve, reject) => { + worker1Exited = runWorker(2, resolve); + }).then((worker) => new Promise( + (resolve) => worker.once('message', () => resolve(worker)))); + okToAttach = true; + await post('NodeWorker.enable', { waitForDebuggerOnStart: true }); + const { waitingForDebugger: worker1Waiting } = await worker1attached; + assert.strictEqual(worker1Waiting, false); + + const worker2Attached = waitForWorkerAttach(session); + let worker2Done = false; + const worker2Exited = runWorker(1) + .then(() => assert.strictEqual(worker2Done, true)); + const worker2AttachInfo = await worker2Attached; + assert.strictEqual(worker2AttachInfo.waitingForDebugger, true); + worker2Done = true; + + const workerSession = new WorkerSession(session, worker2AttachInfo.sessionId); + workerSession.post('Runtime.runIfWaitingForDebugger'); + worker.postMessage('resume'); + await Promise.all([worker1Exited, worker2Exited]); +} + +async function test() { + const session = new Session(); + session.connect(); + const post = doPost.bind(null, session); + + await testBasicWorkerDebug(session, post); + + console.log('Test disabling attach to workers'); + await post('NodeWorker.disable'); + await runWorker(1); + + await testNoWaitOnStart(session, post); + await testTwoWorkers(session, post); + + session.disconnect(); + console.log('Test done'); +} + +test();