worker: implement worker.moveMessagePortToContext()
This enables using `MessagePort`s in different `vm.Context`s, aiding with the isolation that the `vm` module seeks to provide. Refs: https://github.com/ayojs/ayo/pull/111 PR-URL: https://github.com/nodejs/node/pull/26497 Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
b0de48e854
commit
fe8972a4ff
@ -70,6 +70,30 @@ if (isMainThread) {
|
||||
}
|
||||
```
|
||||
|
||||
## worker.moveMessagePortToContext(port, contextifiedSandbox)
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
* `port` {MessagePort} The message port which will be transferred.
|
||||
* `contextifiedSandbox` {Object} A [contextified][] object as returned by the
|
||||
`vm.createContext()` method.
|
||||
|
||||
* Returns: {MessagePort}
|
||||
|
||||
Transfer a `MessagePort` to a different [`vm`][] Context. The original `port`
|
||||
object will be rendered unusable, and the returned `MessagePort` instance will
|
||||
take its place.
|
||||
|
||||
The returned `MessagePort` will be an object in the target context, and will
|
||||
inherit from its global `Object` class. Objects passed to the
|
||||
[`port.onmessage()`][] listener will also be created in the target context
|
||||
and inherit from its global `Object` class.
|
||||
|
||||
However, the created `MessagePort` will no longer inherit from
|
||||
[`EventEmitter`][], and only [`port.onmessage()`][] can be used to receive
|
||||
events using it.
|
||||
|
||||
## worker.parentPort
|
||||
<!-- YAML
|
||||
added: v10.5.0
|
||||
@ -583,6 +607,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
|
||||
[`Worker`]: #worker_threads_class_worker
|
||||
[`cluster` module]: cluster.html
|
||||
[`port.on('message')`]: #worker_threads_event_message
|
||||
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
|
||||
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
|
||||
[`process.abort()`]: process.html#process_process_abort
|
||||
[`process.chdir()`]: process.html#process_process_chdir_directory
|
||||
@ -600,6 +625,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
|
||||
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
|
||||
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
|
||||
[`trace_events`]: tracing.html
|
||||
[`vm`]: vm.html
|
||||
[`worker.on('message')`]: #worker_threads_event_message_1
|
||||
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist
|
||||
[`worker.terminate()`]: #worker_threads_worker_terminate_callback
|
||||
@ -610,4 +636,5 @@ active handle in the event system. If the worker is already `unref()`ed calling
|
||||
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
|
||||
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
|
||||
[child processes]: child_process.html
|
||||
[contextified]: vm.html#vm_what_does_it_mean_to_contextify_an_object
|
||||
[v8.serdes]: v8.html#v8_serialization_api
|
||||
|
@ -8,6 +8,7 @@ const {
|
||||
MessagePort,
|
||||
MessageChannel,
|
||||
drainMessagePort,
|
||||
moveMessagePortToContext,
|
||||
stopMessagePort
|
||||
} = internalBinding('messaging');
|
||||
const { threadId } = internalBinding('worker');
|
||||
@ -233,6 +234,7 @@ module.exports = {
|
||||
kIncrementsPortRef,
|
||||
kWaitingStreams,
|
||||
kStdioWantsMoreDataCallback,
|
||||
moveMessagePortToContext,
|
||||
MessagePort,
|
||||
MessageChannel,
|
||||
setupPortReferencing,
|
||||
|
@ -8,13 +8,15 @@ const {
|
||||
|
||||
const {
|
||||
MessagePort,
|
||||
MessageChannel
|
||||
MessageChannel,
|
||||
moveMessagePortToContext,
|
||||
} = require('internal/worker/io');
|
||||
|
||||
module.exports = {
|
||||
isMainThread,
|
||||
MessagePort,
|
||||
MessageChannel,
|
||||
moveMessagePortToContext,
|
||||
threadId,
|
||||
Worker,
|
||||
parentPort: null,
|
||||
|
@ -2,11 +2,13 @@
|
||||
|
||||
#include "async_wrap-inl.h"
|
||||
#include "debug_utils.h"
|
||||
#include "node_contextify.h"
|
||||
#include "node_buffer.h"
|
||||
#include "node_errors.h"
|
||||
#include "node_process.h"
|
||||
#include "util.h"
|
||||
|
||||
using node::contextify::ContextifyContext;
|
||||
using v8::Array;
|
||||
using v8::ArrayBuffer;
|
||||
using v8::ArrayBufferCreationMode;
|
||||
@ -760,6 +762,35 @@ void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
|
||||
port->OnMessage();
|
||||
}
|
||||
|
||||
void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
if (!args[0]->IsObject() ||
|
||||
!env->message_port_constructor_template()->HasInstance(args[0])) {
|
||||
return THROW_ERR_INVALID_ARG_TYPE(env,
|
||||
"First argument needs to be a MessagePort instance");
|
||||
}
|
||||
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
|
||||
CHECK_NOT_NULL(port);
|
||||
|
||||
Local<Value> context_arg = args[1];
|
||||
ContextifyContext* context_wrapper;
|
||||
if (!context_arg->IsObject() ||
|
||||
(context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
|
||||
env, context_arg.As<Object>())) == nullptr) {
|
||||
return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
|
||||
}
|
||||
|
||||
std::unique_ptr<MessagePortData> data;
|
||||
if (!port->IsDetached())
|
||||
data = port->Detach();
|
||||
|
||||
Context::Scope context_scope(context_wrapper->context());
|
||||
MessagePort* target =
|
||||
MessagePort::New(env, context_wrapper->context(), std::move(data));
|
||||
if (target != nullptr)
|
||||
args.GetReturnValue().Set(target->object());
|
||||
}
|
||||
|
||||
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
|
||||
Entangle(a, b->data_.get());
|
||||
}
|
||||
@ -816,9 +847,9 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
|
||||
MessagePort* port2 = MessagePort::New(env, context);
|
||||
MessagePort::Entangle(port1, port2);
|
||||
|
||||
args.This()->Set(env->context(), env->port1_string(), port1->object())
|
||||
args.This()->Set(context, env->port1_string(), port1->object())
|
||||
.FromJust();
|
||||
args.This()->Set(env->context(), env->port2_string(), port2->object())
|
||||
args.This()->Set(context, env->port2_string(), port2->object())
|
||||
.FromJust();
|
||||
}
|
||||
|
||||
@ -833,7 +864,7 @@ static void InitMessaging(Local<Object> target,
|
||||
FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
|
||||
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
|
||||
templ->SetClassName(message_channel_string);
|
||||
target->Set(env->context(),
|
||||
target->Set(context,
|
||||
message_channel_string,
|
||||
templ->GetFunction(context).ToLocalChecked()).FromJust();
|
||||
}
|
||||
@ -847,6 +878,8 @@ static void InitMessaging(Local<Object> target,
|
||||
// the browser equivalents do not provide them.
|
||||
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
|
||||
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
|
||||
env->SetMethod(target, "moveMessagePortToContext",
|
||||
MessagePort::MoveToContext);
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
@ -158,12 +158,17 @@ class MessagePort : public HandleWrap {
|
||||
// Stop processing messages on this port as a receiving end.
|
||||
void Stop();
|
||||
|
||||
/* constructor */
|
||||
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
/* prototype methods */
|
||||
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
/* static */
|
||||
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
|
||||
// to the receiving side of the other. This is not thread-safe.
|
||||
static void Entangle(MessagePort* a, MessagePort* b);
|
||||
|
69
test/parallel/test-worker-message-port-move.js
Normal file
69
test/parallel/test-worker-message-port-move.js
Normal file
@ -0,0 +1,69 @@
|
||||
/* global port */
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const vm = require('vm');
|
||||
const {
|
||||
MessagePort, MessageChannel, moveMessagePortToContext
|
||||
} = require('worker_threads');
|
||||
|
||||
const context = vm.createContext();
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
context.port = moveMessagePortToContext(port1, context);
|
||||
context.global = context;
|
||||
Object.assign(context, {
|
||||
global: context,
|
||||
assert,
|
||||
MessagePort,
|
||||
MessageChannel
|
||||
});
|
||||
|
||||
vm.runInContext('(' + function() {
|
||||
{
|
||||
assert(port.postMessage instanceof Function);
|
||||
assert(port.constructor instanceof Function);
|
||||
for (let obj = port; obj !== null; obj = Object.getPrototypeOf(obj)) {
|
||||
for (const key of Object.getOwnPropertyNames(obj)) {
|
||||
if (typeof obj[key] === 'object' && obj[key] !== null) {
|
||||
assert(obj[key] instanceof Object);
|
||||
} else if (typeof obj[key] === 'function') {
|
||||
assert(obj[key] instanceof Function);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert(!(port instanceof MessagePort));
|
||||
assert.strictEqual(port.onmessage, undefined);
|
||||
port.onmessage = function({ data }) {
|
||||
assert(data instanceof Object);
|
||||
port.postMessage(data);
|
||||
};
|
||||
port.start();
|
||||
}
|
||||
|
||||
{
|
||||
let threw = false;
|
||||
try {
|
||||
port.postMessage(global);
|
||||
} catch (e) {
|
||||
assert.strictEqual(e.constructor.name, 'DOMException');
|
||||
assert(e instanceof Object);
|
||||
assert(e instanceof Error);
|
||||
threw = true;
|
||||
}
|
||||
assert(threw);
|
||||
}
|
||||
|
||||
{
|
||||
const newDummyPort = new (port.constructor)();
|
||||
assert(!(newDummyPort instanceof MessagePort));
|
||||
assert(newDummyPort.close instanceof Function);
|
||||
newDummyPort.close();
|
||||
}
|
||||
} + ')()', context);
|
||||
|
||||
port2.on('message', common.mustCall((msg) => {
|
||||
assert(msg instanceof Object);
|
||||
port2.close();
|
||||
}));
|
||||
port2.postMessage({});
|
Loading…
x
Reference in New Issue
Block a user