Skip to content

Commit

Permalink
worker: make transfer list behave like web MessagePort
Browse files Browse the repository at this point in the history
Allow generic iterables as transfer list arguments, as well
as an options object with a `transfer` option, for web compatibility.

Refs: #28033 (comment)
  • Loading branch information
addaleax committed Aug 25, 2019
1 parent 66043e1 commit 440f2b2
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 77 deletions.
3 changes: 3 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength =
V(dns_soa_string, "SOA") \
V(dns_srv_string, "SRV") \
V(dns_txt_string, "TXT") \
V(done_string, "done") \
V(duration_string, "duration") \
V(emit_warning_string, "emitWarning") \
V(encoding_string, "encoding") \
Expand Down Expand Up @@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength =
V(modulus_string, "modulus") \
V(name_string, "name") \
V(netmask_string, "netmask") \
V(next_string, "next") \
V(nistcurve_string, "nistCurve") \
V(nsname_string, "nsname") \
V(ocsp_request_string, "OCSPRequest") \
Expand Down Expand Up @@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength =
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
V(transfer_string, "transfer") \
V(ttl_string, "ttl") \
V(type_string, "type") \
V(uid_string, "uid") \
Expand Down
217 changes: 144 additions & 73 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Symbol;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
Expand Down Expand Up @@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
Local<Value> transfer_list_v,
const TransferList& transfer_list_v,
Local<Object> source_port) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
Expand All @@ -317,72 +318,66 @@ Maybe<bool> Message::Serialize(Environment* env,
delegate.serializer = &serializer;

std::vector<Local<ArrayBuffer>> array_buffers;
if (transfer_list_v->IsArray()) {
Local<Array> transfer_list = transfer_list_v.As<Array>();
uint32_t length = transfer_list->Length();
for (uint32_t i = 0; i < length; ++i) {
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
Local<Value> entry = transfer_list_v[i];
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
continue;
}

THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
}

serializer.WriteHeader();
Expand Down Expand Up @@ -664,7 +659,7 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {

Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
Local<Value> transfer_v) {
const TransferList& transfer_v) {
Isolate* isolate = env->isolate();
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
Expand Down Expand Up @@ -705,20 +700,98 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Just(true);
}

static Maybe<bool> ReadIterable(Environment* env,
Local<Context> context,
// NOLINTNEXTLINE(runtime/references)
TransferList& transfer_list,
Local<Value> object) {
if (!object->IsObject()) return Just(false);

if (object->IsArray()) {
Local<Array> arr = object.As<Array>();
size_t length = arr->Length();
transfer_list.AllocateSufficientStorage(length);
for (size_t i = 0; i < length; i++) {
if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
return Nothing<bool>();
}
return Just(true);
}

Isolate* isolate = env->isolate();
Local<Value> iterator_method;
if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
.ToLocal(&iterator_method)) return Nothing<bool>();
if (!iterator_method->IsFunction()) return Just(false);

Local<Value> iterator;
if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
.ToLocal(&iterator)) return Nothing<bool>();
if (!iterator->IsObject()) return Just(false);

Local<Value> next;
if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
return Nothing<bool>();
if (!next->IsFunction()) return Just(false);

std::vector<Local<Value>> entries;
while (env->can_call_into_js()) {
Local<Value> result;
if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
.ToLocal(&result)) return Nothing<bool>();
if (!result->IsObject()) return Just(false);

Local<Value> done;
if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
return Nothing<bool>();
if (done->BooleanValue(isolate)) break;

Local<Value> val;
if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
return Nothing<bool>();
entries.push_back(val);
}

transfer_list.AllocateSufficientStorage(entries.size());
std::copy(entries.begin(), entries.end(), &transfer_list[0]);
return Just(true);
}

void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();

if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}

if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
// Browsers ignore null or undefined, and otherwise accept an array or an
// options object.
// TODO(addaleax): Add support for an options object and generic sequence
// support.
// Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional transferList argument must be an array");
"Optional transferList argument must be an iterable");
}

TransferList transfer_list;
if (args[1]->IsObject()) {
bool was_iterable;
if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
return;
if (!was_iterable) {
Local<Value> transfer_option;
if (!args[1].As<Object>()->Get(context, env->transfer_string())
.ToLocal(&transfer_option)) return;
if (!transfer_option->IsUndefined()) {
if (!ReadIterable(env, context, transfer_list, transfer_option)
.To(&was_iterable)) return;
if (!was_iterable) {
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional options.transfer argument must be an iterable");
}
}
}
}

MessagePort* port = Unwrap<MessagePort>(args.This());
Expand All @@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
// transfers.
if (port == nullptr) {
Message msg;
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();
USE(msg.Serialize(env, context, args[0], args[1], obj));
USE(msg.Serialize(env, context, args[0], transfer_list, obj));
return;
}

port->PostMessage(env, args[0], args[1]);
port->PostMessage(env, args[0], transfer_list);
}

void MessagePort::Start() {
Expand Down
6 changes: 4 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace worker {
class MessagePortData;
class MessagePort;

typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;

// Represents a single communication message.
class Message : public MemoryRetainer {
public:
Expand Down Expand Up @@ -44,7 +46,7 @@ class Message : public MemoryRetainer {
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list,
const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());

Expand Down Expand Up @@ -149,7 +151,7 @@ class MessagePort : public HandleWrap {
// serialized with transfers, then silently discarded.
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Value> message,
v8::Local<v8::Value> transfer);
const TransferList& transfer);

// Start processing messages on this port as a receiving end.
void Start();
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-worker-message-port-terminate-transfer-list.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';
const common = require('../common');

const { parentPort, MessageChannel, Worker } = require('worker_threads');

// Do not use isMainThread so that this test itself can be run inside a Worker.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = 1;
const w = new Worker(__filename);
w.once('message', common.mustCall(() => {
w.once('message', common.mustNotCall());
setTimeout(() => w.terminate(), 100);
}));
} else {
const { port1 } = new MessageChannel();

parentPort.postMessage('ready');

// Make sure we don’t end up running JS after the infinite loop is broken.
port1.postMessage({}, {
transfer: (function*() { while (true); })()
});

parentPort.postMessage('UNREACHABLE');
process.kill(process.pid, 'SIGINT');
}
Loading

0 comments on commit 440f2b2

Please sign in to comment.