From eaccf842eb8268e7e758fc9064e209ad621af50b Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 9 Apr 2020 18:45:15 +0200 Subject: [PATCH] worker: allow transferring/cloning generic BaseObjects MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend support for transferring objects à la `MessagePort` to other types of `BaseObject` subclasses, as well as implement cloning support for cases in which destructive transferring is not needed or optional. PR-URL: https://github.com/nodejs/node/pull/33772 Backport-PR-URL: https://github.com/nodejs/node/pull/33965 Reviewed-By: Benjamin Gruenbaum --- doc/api/errors.md | 5 +- src/base_object.h | 38 +++++++++- src/node_messaging.cc | 164 ++++++++++++++++++++++++++++++------------ src/node_messaging.h | 37 ++++++++-- src/util.h | 6 ++ 5 files changed, 194 insertions(+), 56 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index eb7a573bd5ac88..8ed4b31f99aa68 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1584,8 +1584,9 @@ is thrown if a required option is missing. ### `ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST` -A `MessagePort` was found in the object passed to a `postMessage()` call, -but not provided in the `transferList` for that call. +An object that needs to be explicitly listed in the `transferList` argument +was found in the object passed to a `postMessage()` call, but not provided in +the `transferList` for that call. Usually, this is a `MessagePort`. ### `ERR_MISSING_PASSPHRASE` diff --git a/src/base_object.h b/src/base_object.h index e02fd8d209f30a..39450540323a6c 100644 --- a/src/base_object.h +++ b/src/base_object.h @@ -34,6 +34,10 @@ class Environment; template class BaseObjectPtrImpl; +namespace worker { +class TransferData; +} + class BaseObject : public MemoryRetainer { public: enum InternalFields { kSlot, kInternalFieldCount }; @@ -101,7 +105,39 @@ class BaseObject : public MemoryRetainer { static v8::Local GetConstructorTemplate( Environment* env); - protected: + // Interface for transferring BaseObject instances using the .postMessage() + // method of MessagePorts (and, by extension, Workers). + // GetTransferMode() returns a transfer mode that indicates how to deal with + // the current object: + // - kUntransferable: + // No transfer is possible, either because this type of BaseObject does + // not know how to be transfered, or because it is not in a state in + // which it is possible to do so (e.g. because it has already been + // transfered). + // - kTransferable: + // This object can be transfered in a destructive fashion, i.e. will be + // rendered unusable on the sending side of the channel in the process + // of being transfered. (In C++ this would be referred to as movable but + // not copyable.) Objects of this type need to be listed in the + // `transferList` argument of the relevant postMessage() call in order to + // make sure that they are not accidentally destroyed on the sending side. + // TransferForMessaging() will be called to get a representation of the + // object that is used for subsequent deserialization. + // - kCloneable: + // This object can be cloned without being modified. + // CloneForMessaging() will be called to get a representation of the + // object that is used for subsequent deserialization, unless the + // object is listed in transferList, in which case TransferForMessaging() + // is attempted first. + enum class TransferMode { + kUntransferable, + kTransferable, + kCloneable + }; + virtual TransferMode GetTransferMode() const; + virtual std::unique_ptr TransferForMessaging(); + virtual std::unique_ptr CloneForMessaging() const; + virtual inline void OnGCCollect(); private: diff --git a/src/node_messaging.cc b/src/node_messaging.cc index ffe29a5dea8c8a..ae7a0fc1750d04 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -37,6 +37,19 @@ using v8::ValueSerializer; using v8::WasmModuleObject; namespace node { + +BaseObject::TransferMode BaseObject::GetTransferMode() const { + return BaseObject::TransferMode::kUntransferable; +} + +std::unique_ptr BaseObject::TransferForMessaging() { + return CloneForMessaging(); +} + +std::unique_ptr BaseObject::CloneForMessaging() const { + return {}; +} + namespace worker { Message::Message(MallocedBuffer&& buffer) @@ -55,21 +68,20 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { DeserializerDelegate( Message* m, Environment* env, - const std::vector& message_ports, + const std::vector>& host_objects, const std::vector>& shared_array_buffers, const std::vector& wasm_modules) - : message_ports_(message_ports), + : host_objects_(host_objects), shared_array_buffers_(shared_array_buffers), wasm_modules_(wasm_modules) {} MaybeLocal ReadHostObject(Isolate* isolate) override { - // Currently, only MessagePort hosts objects are supported, so identifying - // by the index in the message's MessagePort array is sufficient. + // Identifying the index in the message's BaseObject array is sufficient. uint32_t id; if (!deserializer->ReadUint32(&id)) return MaybeLocal(); - CHECK_LE(id, message_ports_.size()); - return message_ports_[id]->object(isolate); + CHECK_LE(id, host_objects_.size()); + return host_objects_[id]->object(isolate); } MaybeLocal GetSharedArrayBufferFromId( @@ -88,7 +100,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { ValueDeserializer* deserializer = nullptr; private: - const std::vector& message_ports_; + const std::vector>& host_objects_; const std::vector>& shared_array_buffers_; const std::vector& wasm_modules_; }; @@ -102,22 +114,25 @@ MaybeLocal Message::Deserialize(Environment* env, EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); - // Create all necessary MessagePort handles. - std::vector ports(message_ports_.size()); - for (uint32_t i = 0; i < message_ports_.size(); ++i) { - ports[i] = MessagePort::New(env, - context, - std::move(message_ports_[i])); - if (ports[i] == nullptr) { - for (MessagePort* port : ports) { - // This will eventually release the MessagePort object itself. - if (port != nullptr) - port->Close(); + // Create all necessary objects for transferables, e.g. MessagePort handles. + std::vector> host_objects(transferables_.size()); + for (uint32_t i = 0; i < transferables_.size(); ++i) { + TransferData* data = transferables_[i].get(); + host_objects[i] = data->Deserialize( + env, context, std::move(transferables_[i])); + if (!host_objects[i]) { + for (BaseObjectPtr object : host_objects) { + if (!object) continue; + + // Since creating one of the objects failed, we don't want to have the + // other objects lying around in memory. We act as if the object has + // been garbage-collected. + object->Detach(); } return MaybeLocal(); } } - message_ports_.clear(); + transferables_.clear(); std::vector> shared_array_buffers; // Attach all transferred SharedArrayBuffers to their new Isolate. @@ -130,7 +145,7 @@ MaybeLocal Message::Deserialize(Environment* env, shared_array_buffers_.clear(); DeserializerDelegate delegate( - this, env, ports, shared_array_buffers, wasm_modules_); + this, env, host_objects, shared_array_buffers, wasm_modules_); ValueDeserializer deserializer( env->isolate(), reinterpret_cast(main_message_buf_.data), @@ -157,8 +172,8 @@ void Message::AddSharedArrayBuffer( shared_array_buffers_.emplace_back(std::move(backing_store)); } -void Message::AddMessagePort(std::unique_ptr&& data) { - message_ports_.emplace_back(std::move(data)); +void Message::AddTransferable(std::unique_ptr&& data) { + transferables_.emplace_back(std::move(data)); } uint32_t Message::AddWASMModule(CompiledWasmModule&& mod) { @@ -224,8 +239,8 @@ class SerializerDelegate : public ValueSerializer::Delegate { } Maybe WriteHostObject(Isolate* isolate, Local object) override { - if (env_->message_port_constructor_template()->HasInstance(object)) { - return WriteMessagePort(Unwrap(object)); + if (env_->base_object_ctor_template()->HasInstance(object)) { + return WriteHostObject(Unwrap(object)); } ThrowDataCloneError(env_->clone_unsupported_type_str()); @@ -257,32 +272,61 @@ class SerializerDelegate : public ValueSerializer::Delegate { void Finish() { // Only close the MessagePort handles and actually transfer them // once we know that serialization succeeded. - for (MessagePort* port : ports_) { - port->Close(); - msg_->AddMessagePort(port->Detach()); + for (uint32_t i = 0; i < host_objects_.size(); i++) { + BaseObject* host_object = host_objects_[i]; + std::unique_ptr data; + if (i < first_cloned_object_index_) + data = host_object->TransferForMessaging(); + if (!data) + data = host_object->CloneForMessaging(); + CHECK(data); + msg_->AddTransferable(std::move(data)); } } + inline void AddHostObject(BaseObject* host_object) { + // Make sure we have not started serializing the value itself yet. + CHECK_EQ(first_cloned_object_index_, SIZE_MAX); + host_objects_.push_back(host_object); + } + ValueSerializer* serializer = nullptr; private: - Maybe WriteMessagePort(MessagePort* port) { - for (uint32_t i = 0; i < ports_.size(); i++) { - if (ports_[i] == port) { + Maybe WriteHostObject(BaseObject* host_object) { + for (uint32_t i = 0; i < host_objects_.size(); i++) { + if (host_objects_[i] == host_object) { serializer->WriteUint32(i); return Just(true); } } - THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_); - return Nothing(); + BaseObject::TransferMode mode = host_object->GetTransferMode(); + if (mode == BaseObject::TransferMode::kUntransferable) { + ThrowDataCloneError(env_->clone_unsupported_type_str()); + return Nothing(); + } else if (mode == BaseObject::TransferMode::kTransferable) { + // TODO(addaleax): This message code is too specific. Fix that in a + // semver-major follow-up. + THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_); + return Nothing(); + } + + CHECK_EQ(mode, BaseObject::TransferMode::kCloneable); + uint32_t index = host_objects_.size(); + if (first_cloned_object_index_ == SIZE_MAX) + first_cloned_object_index_ = index; + serializer->WriteUint32(index); + host_objects_.push_back(host_object); + return Just(true); } Environment* env_; Local context_; Message* msg_; std::vector> seen_shared_array_buffers_; - std::vector ports_; + std::vector host_objects_; + size_t first_cloned_object_index_ = SIZE_MAX; friend class worker::Message; }; @@ -344,8 +388,7 @@ Maybe Message::Serialize(Environment* env, array_buffers.push_back(ab); serializer.TransferArrayBuffer(id, ab); continue; - } else if (env->message_port_constructor_template() - ->HasInstance(entry)) { + } else if (env->base_object_ctor_template()->HasInstance(entry)) { // Check if the source MessagePort is being transferred. if (!source_port.IsEmpty() && entry == source_port) { ThrowDataCloneException( @@ -354,8 +397,10 @@ Maybe Message::Serialize(Environment* env, "Transfer list contains source port")); return Nothing(); } - MessagePort* port = Unwrap(entry.As()); - if (port == nullptr || port->IsDetached()) { + BaseObject* host_object = Unwrap(entry.As()); + if (env->message_port_constructor_template()->HasInstance(entry) && + (host_object == nullptr || + static_cast(host_object)->IsDetached())) { ThrowDataCloneException( context, FIXED_ONE_BYTE_STRING( @@ -363,17 +408,23 @@ Maybe Message::Serialize(Environment* env, "MessagePort in transfer list is already detached")); return Nothing(); } - if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) != - delegate.ports_.end()) { + if (std::find(delegate.host_objects_.begin(), + delegate.host_objects_.end(), + host_object) != delegate.host_objects_.end()) { ThrowDataCloneException( context, - FIXED_ONE_BYTE_STRING( - env->isolate(), - "Transfer list contains duplicate MessagePort")); + String::Concat(env->isolate(), + FIXED_ONE_BYTE_STRING( + env->isolate(), + "Transfer list contains duplicate "), + entry.As()->GetConstructorName())); return Nothing(); } - delegate.ports_.push_back(port); - continue; + if (host_object != nullptr && host_object->GetTransferMode() != + BaseObject::TransferMode::kUntransferable) { + delegate.AddHostObject(host_object); + continue; + } } THROW_ERR_INVALID_TRANSFER_OBJECT(env); @@ -406,7 +457,7 @@ Maybe Message::Serialize(Environment* env, void Message::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("array_buffers_", array_buffers_); tracker->TrackField("shared_array_buffers", shared_array_buffers_); - tracker->TrackField("message_ports", message_ports_); + tracker->TrackField("transferables", transferables_); } MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } @@ -672,6 +723,25 @@ std::unique_ptr MessagePort::Detach() { return std::move(data_); } +BaseObject::TransferMode MessagePort::GetTransferMode() const { + if (IsDetached()) + return BaseObject::TransferMode::kUntransferable; + return BaseObject::TransferMode::kTransferable; +} + +std::unique_ptr MessagePort::TransferForMessaging() { + Close(); + return Detach(); +} + +BaseObjectPtr MessagePortData::Deserialize( + Environment* env, + Local context, + std::unique_ptr self) { + return BaseObjectPtr { MessagePort::New( + env, context, + static_unique_pointer_cast(std::move(self))) }; +} Maybe MessagePort::PostMessage(Environment* env, Local message_v, @@ -699,8 +769,8 @@ Maybe MessagePort::PostMessage(Environment* env, // Check if the target port is posted to itself. if (data_->sibling_ != nullptr) { - for (const auto& port_data : msg.message_ports()) { - if (data_->sibling_ == port_data.get()) { + for (const auto& transferable : msg.transferables()) { + if (data_->sibling_ == transferable.get()) { doomed = true; ProcessEmitWarning(env, "The target port was posted to itself, and " "the communication channel was lost"); diff --git a/src/node_messaging.h b/src/node_messaging.h index d687e7549d51e9..649ee201045428 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -15,6 +15,23 @@ class MessagePort; typedef MaybeStackBuffer, 8> TransferList; +// Used to represent the in-flight structure of an object that is being +// transfered or cloned using postMessage(). +class TransferData : public MemoryRetainer { + public: + // Deserialize this object on the receiving end after a .postMessage() call. + // - `context` may not be the same as `env->context()`. This method should + // not produce JS objects coming from Contexts other than `context`. + // - `self` is a unique_ptr for the object that this is being called on. + // - The return value is treated like a `Maybe`, i.e. if `nullptr` is + // returned, any further deserialization of the message is stopped and + // control is returned to the event loop or JS as soon as possible. + virtual BaseObjectPtr Deserialize( + Environment* env, + v8::Local context, + std::unique_ptr self) = 0; +}; + // Represents a single communication message. class Message : public MemoryRetainer { public: @@ -54,16 +71,17 @@ class Message : public MemoryRetainer { void AddSharedArrayBuffer(std::shared_ptr backing_store); // Internal method of Message that is called once serialization finishes // and that transfers ownership of `data` to this message. - void AddMessagePort(std::unique_ptr&& data); + void AddTransferable(std::unique_ptr&& data); // Internal method of Message that is called when a new WebAssembly.Module // object is encountered in the incoming value's structure. uint32_t AddWASMModule(v8::CompiledWasmModule&& mod); - // The MessagePorts that will be transferred, as recorded by Serialize(). + // The host objects that will be transferred, as recorded by Serialize() + // (e.g. MessagePorts). // Used for warning user about posting the target MessagePort to itself, // which will as a side effect destroy the communication channel. - const std::vector>& message_ports() const { - return message_ports_; + const std::vector>& transferables() const { + return transferables_; } void MemoryInfo(MemoryTracker* tracker) const override; @@ -75,7 +93,7 @@ class Message : public MemoryRetainer { MallocedBuffer main_message_buf_; std::vector> array_buffers_; std::vector> shared_array_buffers_; - std::vector> message_ports_; + std::vector> transferables_; std::vector wasm_modules_; friend class MessagePort; @@ -83,7 +101,7 @@ class Message : public MemoryRetainer { // This contains all data for a `MessagePort` instance that is not tied to // a specific Environment/Isolate/event loop, for easier transfer between those. -class MessagePortData : public MemoryRetainer { +class MessagePortData : public TransferData { public: explicit MessagePortData(MessagePort* owner); ~MessagePortData() override; @@ -108,6 +126,10 @@ class MessagePortData : public MemoryRetainer { void Disentangle(); void MemoryInfo(MemoryTracker* tracker) const override; + BaseObjectPtr Deserialize( + Environment* env, + v8::Local context, + std::unique_ptr self) override; SET_MEMORY_INFO_NAME(MessagePortData) SET_SELF_SIZE(MessagePortData) @@ -195,6 +217,9 @@ class MessagePort : public HandleWrap { // NULL pointer to the C++ MessagePort object is also detached. inline bool IsDetached() const; + TransferMode GetTransferMode() const override; + std::unique_ptr TransferForMessaging() override; + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(MessagePort) SET_SELF_SIZE(MessagePort) diff --git a/src/util.h b/src/util.h index b5b6cb9226f420..f817eac1298d61 100644 --- a/src/util.h +++ b/src/util.h @@ -791,6 +791,12 @@ class FastStringKey { size_t cached_hash_; }; +// Like std::static_pointer_cast but for unique_ptr with the default deleter. +template +std::unique_ptr static_unique_pointer_cast(std::unique_ptr&& ptr) { + return std::unique_ptr(static_cast(ptr.release())); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS