worker: make transfer list behave like web MessagePort

Allow generic iterables as transfer list arguments, as well
as an options object with a `transfer` option, for web compatibility.

PR-URL: https://github.com/nodejs/node/pull/29319
Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2019-08-25 21:48:58 +02:00 committed by Daniel Bevenius
parent 754d5a9375
commit 72650bcf72
5 changed files with 238 additions and 77 deletions

View File

@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength =
V(dns_soa_string, "SOA") \
V(dns_srv_string, "SRV") \
V(dns_txt_string, "TXT") \
V(done_string, "done") \
V(duration_string, "duration") \
V(emit_warning_string, "emitWarning") \
V(encoding_string, "encoding") \
@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength =
V(modulus_string, "modulus") \
V(name_string, "name") \
V(netmask_string, "netmask") \
V(next_string, "next") \
V(nistcurve_string, "nistCurve") \
V(nsname_string, "nsname") \
V(ocsp_request_string, "OCSPRequest") \
@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength =
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
V(transfer_string, "transfer") \
V(ttl_string, "ttl") \
V(type_string, "type") \
V(uid_string, "uid") \

View File

@ -31,6 +31,7 @@ using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Symbol;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Maybe<bool> Message::Serialize(Environment* env,
Local<Context> context,
Local<Value> input,
Local<Value> transfer_list_v,
const TransferList& transfer_list_v,
Local<Object> source_port) {
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);
@ -317,72 +318,66 @@ Maybe<bool> Message::Serialize(Environment* env,
delegate.serializer = &serializer;
std::vector<Local<ArrayBuffer>> array_buffers;
if (transfer_list_v->IsArray()) {
Local<Array> transfer_list = transfer_list_v.As<Array>();
uint32_t length = transfer_list->Length();
for (uint32_t i = 0; i < length; ++i) {
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
Local<Value> entry = transfer_list_v[i];
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and
// take ownership of its memory, copying the buffer will have to do.
if (!ab->IsDetachable() || ab->IsExternal() ||
!env->isolate_data()->uses_node_allocator()) {
continue;
}
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate ArrayBuffer"));
return Nothing<bool>();
}
// We simply use the array index in the `array_buffers` list as the
// ID that we write into the serialized buffer.
uint32_t id = array_buffers.size();
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
// Check if the source MessagePort is being transferred.
if (!source_port.IsEmpty() && entry == source_port) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(env->isolate(),
"Transfer list contains source port"));
return Nothing<bool>();
}
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
if (port == nullptr || port->IsDetached()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"MessagePort in transfer list is already detached"));
return Nothing<bool>();
}
if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) !=
delegate.ports_.end()) {
ThrowDataCloneException(
context,
FIXED_ONE_BYTE_STRING(
env->isolate(),
"Transfer list contains duplicate MessagePort"));
return Nothing<bool>();
}
delegate.ports_.push_back(port);
continue;
}
THROW_ERR_INVALID_TRANSFER_OBJECT(env);
return Nothing<bool>();
}
serializer.WriteHeader();
@ -664,7 +659,7 @@ std::unique_ptr<MessagePortData> MessagePort::Detach() {
Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
Local<Value> transfer_v) {
const TransferList& transfer_v) {
Isolate* isolate = env->isolate();
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
@ -705,20 +700,98 @@ Maybe<bool> MessagePort::PostMessage(Environment* env,
return Just(true);
}
static Maybe<bool> ReadIterable(Environment* env,
Local<Context> context,
// NOLINTNEXTLINE(runtime/references)
TransferList& transfer_list,
Local<Value> object) {
if (!object->IsObject()) return Just(false);
if (object->IsArray()) {
Local<Array> arr = object.As<Array>();
size_t length = arr->Length();
transfer_list.AllocateSufficientStorage(length);
for (size_t i = 0; i < length; i++) {
if (!arr->Get(context, i).ToLocal(&transfer_list[i]))
return Nothing<bool>();
}
return Just(true);
}
Isolate* isolate = env->isolate();
Local<Value> iterator_method;
if (!object.As<Object>()->Get(context, Symbol::GetIterator(isolate))
.ToLocal(&iterator_method)) return Nothing<bool>();
if (!iterator_method->IsFunction()) return Just(false);
Local<Value> iterator;
if (!iterator_method.As<Function>()->Call(context, object, 0, nullptr)
.ToLocal(&iterator)) return Nothing<bool>();
if (!iterator->IsObject()) return Just(false);
Local<Value> next;
if (!iterator.As<Object>()->Get(context, env->next_string()).ToLocal(&next))
return Nothing<bool>();
if (!next->IsFunction()) return Just(false);
std::vector<Local<Value>> entries;
while (env->can_call_into_js()) {
Local<Value> result;
if (!next.As<Function>()->Call(context, iterator, 0, nullptr)
.ToLocal(&result)) return Nothing<bool>();
if (!result->IsObject()) return Just(false);
Local<Value> done;
if (!result.As<Object>()->Get(context, env->done_string()).ToLocal(&done))
return Nothing<bool>();
if (done->BooleanValue(isolate)) break;
Local<Value> val;
if (!result.As<Object>()->Get(context, env->value_string()).ToLocal(&val))
return Nothing<bool>();
entries.push_back(val);
}
transfer_list.AllocateSufficientStorage(entries.size());
std::copy(entries.begin(), entries.end(), &transfer_list[0]);
return Just(true);
}
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();
if (args.Length() == 0) {
return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to "
"MessagePort.postMessage");
}
if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) {
// Browsers ignore null or undefined, and otherwise accept an array or an
// options object.
// TODO(addaleax): Add support for an options object and generic sequence
// support.
// Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional transferList argument must be an array");
"Optional transferList argument must be an iterable");
}
TransferList transfer_list;
if (args[1]->IsObject()) {
bool was_iterable;
if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable))
return;
if (!was_iterable) {
Local<Value> transfer_option;
if (!args[1].As<Object>()->Get(context, env->transfer_string())
.ToLocal(&transfer_option)) return;
if (!transfer_option->IsUndefined()) {
if (!ReadIterable(env, context, transfer_list, transfer_option)
.To(&was_iterable)) return;
if (!was_iterable) {
return THROW_ERR_INVALID_ARG_TYPE(env,
"Optional options.transfer argument must be an iterable");
}
}
}
}
MessagePort* port = Unwrap<MessagePort>(args.This());
@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
// transfers.
if (port == nullptr) {
Message msg;
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();
USE(msg.Serialize(env, context, args[0], args[1], obj));
USE(msg.Serialize(env, context, args[0], transfer_list, obj));
return;
}
port->PostMessage(env, args[0], args[1]);
port->PostMessage(env, args[0], transfer_list);
}
void MessagePort::Start() {

View File

@ -14,6 +14,8 @@ namespace worker {
class MessagePortData;
class MessagePort;
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
@ -44,7 +46,7 @@ class Message : public MemoryRetainer {
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list,
const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());
@ -149,7 +151,7 @@ class MessagePort : public HandleWrap {
// serialized with transfers, then silently discarded.
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Value> message,
v8::Local<v8::Value> transfer);
const TransferList& transfer);
// Start processing messages on this port as a receiving end.
void Start();

View File

@ -0,0 +1,26 @@
'use strict';
const common = require('../common');
const { parentPort, MessageChannel, Worker } = require('worker_threads');
// Do not use isMainThread so that this test itself can be run inside a Worker.
if (!process.env.HAS_STARTED_WORKER) {
process.env.HAS_STARTED_WORKER = 1;
const w = new Worker(__filename);
w.once('message', common.mustCall(() => {
w.once('message', common.mustNotCall());
setTimeout(() => w.terminate(), 100);
}));
} else {
const { port1 } = new MessageChannel();
parentPort.postMessage('ready');
// Make sure we dont end up running JS after the infinite loop is broken.
port1.postMessage({}, {
transfer: (function*() { while (true); })()
});
parentPort.postMessage('UNREACHABLE');
process.kill(process.pid, 'SIGINT');
}

View File

@ -72,22 +72,81 @@ const { MessageChannel, MessagePort } = require('worker_threads');
{
const { port1, port2 } = new MessageChannel();
port2.on('message', common.mustCall(4));
port2.on('message', common.mustCall(6));
port1.postMessage(1, null);
port1.postMessage(2, undefined);
port1.postMessage(3, []);
port1.postMessage(4, {});
port1.postMessage(5, { transfer: undefined });
port1.postMessage(6, { transfer: [] });
const err = {
constructor: TypeError,
code: 'ERR_INVALID_ARG_TYPE',
message: 'Optional transferList argument must be an array'
message: 'Optional transferList argument must be an iterable'
};
assert.throws(() => port1.postMessage(5, 0), err);
assert.throws(() => port1.postMessage(5, false), err);
assert.throws(() => port1.postMessage(5, 'X'), err);
assert.throws(() => port1.postMessage(5, Symbol('X')), err);
const err2 = {
constructor: TypeError,
code: 'ERR_INVALID_ARG_TYPE',
message: 'Optional options.transfer argument must be an iterable'
};
assert.throws(() => port1.postMessage(5, { transfer: null }), err2);
assert.throws(() => port1.postMessage(5, { transfer: 0 }), err2);
assert.throws(() => port1.postMessage(5, { transfer: false }), err2);
assert.throws(() => port1.postMessage(5, { transfer: {} }), err2);
assert.throws(() => port1.postMessage(5, {
transfer: { [Symbol.iterator]() { return {}; } }
}), err2);
assert.throws(() => port1.postMessage(5, {
transfer: { [Symbol.iterator]() { return { next: 42 }; } }
}), err2);
assert.throws(() => port1.postMessage(5, {
transfer: { [Symbol.iterator]() { return { next: null }; } }
}), err2);
port1.close();
}
{
// Make sure these ArrayBuffers end up detached, i.e. are actually being
// transferred because the transfer list provides them.
const { port1, port2 } = new MessageChannel();
port2.on('message', common.mustCall((msg) => {
assert.strictEqual(msg.ab.byteLength, 10);
}, 4));
{
const ab = new ArrayBuffer(10);
port1.postMessage({ ab }, [ ab ]);
assert.strictEqual(ab.byteLength, 0);
}
{
const ab = new ArrayBuffer(10);
port1.postMessage({ ab }, { transfer: [ ab ] });
assert.strictEqual(ab.byteLength, 0);
}
{
const ab = new ArrayBuffer(10);
port1.postMessage({ ab }, (function*() { yield ab; })());
assert.strictEqual(ab.byteLength, 0);
}
{
const ab = new ArrayBuffer(10);
port1.postMessage({ ab }, {
transfer: (function*() { yield ab; })()
});
assert.strictEqual(ab.byteLength, 0);
}
port1.close();
}