worker: support MessagePort passing in messages

Support passing `MessagePort` instances through other `MessagePort`s,
as expected by the `MessagePort` spec.

Thanks to Stephen Belanger for reviewing this change in its original PR.

Refs: https://github.com/ayojs/ayo/pull/106

PR-URL: https://github.com/nodejs/node/pull/20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
This commit is contained in:
Anna Henningsen 2017-10-07 14:39:02 -07:00
parent e7a2367471
commit 749a13b76c
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
6 changed files with 122 additions and 5 deletions

View File

@ -629,6 +629,12 @@ An operation outside the bounds of a `Buffer` was attempted.
An attempt has been made to create a `Buffer` larger than the maximum allowed An attempt has been made to create a `Buffer` larger than the maximum allowed
size. size.
<a id="ERR_CANNOT_TRANSFER_OBJECT"></a>
### ERR_CANNOT_TRANSFER_OBJECT
The value passed to `postMessage()` contained an object that is not supported
for transferring.
<a id="ERR_CANNOT_WATCH_SIGINT"></a> <a id="ERR_CANNOT_WATCH_SIGINT"></a>
### ERR_CANNOT_WATCH_SIGINT ### ERR_CANNOT_WATCH_SIGINT
@ -1294,6 +1300,12 @@ strict compliance with the API specification (which in some cases may accept
`func(undefined)` and `func()` are treated identically, and the `func(undefined)` and `func()` are treated identically, and the
[`ERR_INVALID_ARG_TYPE`][] error code may be used instead. [`ERR_INVALID_ARG_TYPE`][] error code may be used instead.
<a id="ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST"></a>
### ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST
A `MessagePort` was found in the object passed to a `postMessage()` call,
but not provided in the `transferList` for that call.
<a id="ERR_MISSING_MODULE"></a> <a id="ERR_MISSING_MODULE"></a>
### ERR_MISSING_MODULE ### ERR_MISSING_MODULE

View File

@ -83,7 +83,7 @@ the [HTML structured clone algorithm][]. In particular, it may contain circular
references and objects like typed arrays that the `JSON` API is not able references and objects like typed arrays that the `JSON` API is not able
to stringify. to stringify.
`transferList` may be a list of `ArrayBuffer` objects. `transferList` may be a list of `ArrayBuffer` and `MessagePort` objects.
After transferring, they will not be usable on the sending side of the channel After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`). anymore (even if they are not contained in `value`).

View File

@ -23,6 +23,7 @@ namespace node {
#define ERRORS_WITH_CODE(V) \ #define ERRORS_WITH_CODE(V) \
V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \ V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \
V(ERR_BUFFER_TOO_LARGE, Error) \ V(ERR_BUFFER_TOO_LARGE, Error) \
V(ERR_CANNOT_TRANSFER_OBJECT, TypeError) \
V(ERR_CLOSED_MESSAGE_PORT, Error) \ V(ERR_CLOSED_MESSAGE_PORT, Error) \
V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \ V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \
V(ERR_INDEX_OUT_OF_RANGE, RangeError) \ V(ERR_INDEX_OUT_OF_RANGE, RangeError) \
@ -31,6 +32,7 @@ namespace node {
V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \ V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \
V(ERR_MEMORY_ALLOCATION_FAILED, Error) \ V(ERR_MEMORY_ALLOCATION_FAILED, Error) \
V(ERR_MISSING_ARGS, TypeError) \ V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \ V(ERR_MISSING_MODULE, Error) \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \
V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \ V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \
@ -57,11 +59,14 @@ namespace node {
// Errors with predefined static messages // Errors with predefined static messages
#define PREDEFINED_ERROR_MESSAGES(V) \ #define PREDEFINED_ERROR_MESSAGES(V) \
V(ERR_CANNOT_TRANSFER_OBJECT, "Cannot transfer object of unsupported type")\
V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \ V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \
V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \ V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \
V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \ V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \ V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList") \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \
"Script execution was interrupted by `SIGINT`") "Script execution was interrupted by `SIGINT`")

View File

@ -41,14 +41,27 @@ namespace {
// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them. // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them.
class DeserializerDelegate : public ValueDeserializer::Delegate { class DeserializerDelegate : public ValueDeserializer::Delegate {
public: public:
DeserializerDelegate(Message* m, Environment* env) DeserializerDelegate(Message* m,
: env_(env), msg_(m) {} Environment* env,
const std::vector<MessagePort*>& message_ports)
: env_(env), msg_(m), message_ports_(message_ports) {}
MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
// by the index in the message's MessagePort array is sufficient.
uint32_t id;
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
return message_ports_[id]->object();
};
ValueDeserializer* deserializer = nullptr; ValueDeserializer* deserializer = nullptr;
private: private:
Environment* env_; Environment* env_;
Message* msg_; Message* msg_;
const std::vector<MessagePort*>& message_ports_;
}; };
} // anonymous namespace } // anonymous namespace
@ -58,7 +71,23 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
EscapableHandleScope handle_scope(env->isolate()); EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context); Context::Scope context_scope(context);
DeserializerDelegate delegate(this, env); // Create all necessary MessagePort handles.
std::vector<MessagePort*> ports(message_ports_.size());
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
ports[i] = MessagePort::New(env,
context,
std::move(message_ports_[i]));
if (ports[i] == nullptr) {
for (MessagePort* port : ports) {
// This will eventually release the MessagePort object itself.
port->Close();
}
return MaybeLocal<Value>();
}
}
message_ports_.clear();
DeserializerDelegate delegate(this, env, ports);
ValueDeserializer deserializer( ValueDeserializer deserializer(
env->isolate(), env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data), reinterpret_cast<const uint8_t*>(main_message_buf_.data),
@ -83,6 +112,10 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
deserializer.ReadValue(context).FromMaybe(Local<Value>())); deserializer.ReadValue(context).FromMaybe(Local<Value>()));
} }
void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}
namespace { namespace {
// This tells V8 how to serialize objects that it does not understand // This tells V8 how to serialize objects that it does not understand
@ -97,12 +130,43 @@ class SerializerDelegate : public ValueSerializer::Delegate {
env_->isolate()->ThrowException(Exception::Error(message)); env_->isolate()->ThrowException(Exception::Error(message));
} }
Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
if (env_->message_port_constructor_template()->HasInstance(object)) {
return WriteMessagePort(Unwrap<MessagePort>(object));
}
THROW_ERR_CANNOT_TRANSFER_OBJECT(env_);
return Nothing<bool>();
}
void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
for (MessagePort* port : ports_) {
port->Close();
msg_->AddMessagePort(port->Detach());
}
}
ValueSerializer* serializer = nullptr; ValueSerializer* serializer = nullptr;
private: private:
Maybe<bool> WriteMessagePort(MessagePort* port) {
for (uint32_t i = 0; i < ports_.size(); i++) {
if (ports_[i] == port) {
serializer->WriteUint32(i);
return Just(true);
}
}
THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_);
return Nothing<bool>();
}
Environment* env_; Environment* env_;
Local<Context> context_; Local<Context> context_;
Message* msg_; Message* msg_;
std::vector<MessagePort*> ports_;
friend class worker::Message; friend class worker::Message;
}; };
@ -131,7 +195,7 @@ Maybe<bool> Message::Serialize(Environment* env,
Local<Value> entry; Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry)) if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>(); return Nothing<bool>();
// Currently, we support ArrayBuffers. // Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) { if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>(); Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate and // If we cannot render the ArrayBuffer unusable in this Isolate and
@ -144,6 +208,12 @@ Maybe<bool> Message::Serialize(Environment* env,
array_buffers.push_back(ab); array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab); serializer.TransferArrayBuffer(id, ab);
continue; continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
CHECK_NE(port, nullptr);
delegate.ports_.push_back(port);
continue;
} }
THROW_ERR_INVALID_TRANSFER_OBJECT(env); THROW_ERR_INVALID_TRANSFER_OBJECT(env);
@ -167,6 +237,8 @@ Maybe<bool> Message::Serialize(Environment* env,
contents.ByteLength() }); contents.ByteLength() });
} }
delegate.Finish();
// The serializer gave us a buffer allocated using `malloc()`. // The serializer gave us a buffer allocated using `malloc()`.
std::pair<uint8_t*, size_t> data = serializer.Release(); std::pair<uint8_t*, size_t> data = serializer.Release();
main_message_buf_ = main_message_buf_ =

View File

@ -37,9 +37,14 @@ class Message {
v8::Local<v8::Value> input, v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list); v8::Local<v8::Value> transfer_list);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
private: private:
MallocedBuffer<char> main_message_buf_; MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_; std::vector<MallocedBuffer<char>> array_buffer_contents_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;
friend class MessagePort; friend class MessagePort;
}; };

View File

@ -0,0 +1,23 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel } = require('worker');
{
const { port1: basePort1, port2: basePort2 } = new MessageChannel();
const {
port1: transferredPort1, port2: transferredPort2
} = new MessageChannel();
basePort1.postMessage({ transferredPort1 }, [ transferredPort1 ]);
basePort2.on('message', common.mustCall(({ transferredPort1 }) => {
transferredPort1.postMessage('foobar');
transferredPort2.on('message', common.mustCall((msg) => {
assert.strictEqual(msg, 'foobar');
transferredPort1.close(common.mustCall());
basePort1.close(common.mustCall());
}));
}));
}