worker: use fake MessageEvent for port.onmessage
Instead of passing the payload for Workers directly to `.onmessage`, perform something more similar to what the browser API provides, namely create an event object with a `.data` property. This does not make `MessagePort` implement the `EventTarget` API, nor does it implement the full `MessageEvent` API, but it would make such extensions non-breaking changes if we desire them at some point in the future. (This would be a breaking change if Workers were not experimental. Currently, this method is also undocumented and only exists with the idea of enabling some degree of Web compatibility.) PR-URL: https://github.com/nodejs/node/pull/26082 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
This commit is contained in:
parent
a02e3e2d5f
commit
5adda2c447
@ -61,11 +61,11 @@ MessagePort.prototype.unref = MessagePortPrototype.unref;
|
||||
// uv_async_t) which can receive information from other threads and emits
|
||||
// .onmessage events, and a function used for sending data to a MessagePort
|
||||
// in some other thread.
|
||||
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
|
||||
if (payload.type !== messageTypes.STDIO_WANTS_MORE_DATA)
|
||||
debug(`[${threadId}] received message`, payload);
|
||||
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
|
||||
if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
|
||||
debug(`[${threadId}] received message`, event);
|
||||
// Emit the deserialized object to userland.
|
||||
this.emit('message', payload);
|
||||
this.emit('message', event.data);
|
||||
};
|
||||
|
||||
// This is for compatibility with the Web's MessagePort API. It makes sense to
|
||||
|
@ -146,6 +146,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
|
||||
V(crypto_ec_string, "ec") \
|
||||
V(crypto_rsa_string, "rsa") \
|
||||
V(cwd_string, "cwd") \
|
||||
V(data_string, "data") \
|
||||
V(dest_string, "dest") \
|
||||
V(destroyed_string, "destroyed") \
|
||||
V(detached_string, "detached") \
|
||||
@ -293,6 +294,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
|
||||
V(subject_string, "subject") \
|
||||
V(subjectaltname_string, "subjectaltname") \
|
||||
V(syscall_string, "syscall") \
|
||||
V(target_string, "target") \
|
||||
V(thread_id_string, "threadId") \
|
||||
V(ticketkeycallback_string, "onticketkeycallback") \
|
||||
V(timeout_string, "timeout") \
|
||||
@ -361,6 +363,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
|
||||
V(inspector_console_extension_installer, v8::Function) \
|
||||
V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \
|
||||
V(message_port, v8::Object) \
|
||||
V(message_event_object_template, v8::ObjectTemplate) \
|
||||
V(message_port_constructor_template, v8::FunctionTemplate) \
|
||||
V(native_module_require, v8::Function) \
|
||||
V(performance_entry_callback, v8::Function) \
|
||||
|
@ -25,6 +25,7 @@ using v8::Maybe;
|
||||
using v8::MaybeLocal;
|
||||
using v8::Nothing;
|
||||
using v8::Object;
|
||||
using v8::ObjectTemplate;
|
||||
using v8::SharedArrayBuffer;
|
||||
using v8::String;
|
||||
using v8::Value;
|
||||
@ -589,12 +590,19 @@ void MessagePort::OnMessage() {
|
||||
// Call the JS .onmessage() callback.
|
||||
HandleScope handle_scope(env()->isolate());
|
||||
Context::Scope context_scope(context);
|
||||
Local<Value> args[] = {
|
||||
received.Deserialize(env(), context).FromMaybe(Local<Value>())
|
||||
};
|
||||
|
||||
if (args[0].IsEmpty() ||
|
||||
MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) {
|
||||
Local<Object> event;
|
||||
Local<Value> payload;
|
||||
Local<Value> cb_args[1];
|
||||
if (!received.Deserialize(env(), context).ToLocal(&payload) ||
|
||||
!env()->message_event_object_template()->NewInstance(context)
|
||||
.ToLocal(&event) ||
|
||||
event->Set(context, env()->data_string(), payload).IsNothing() ||
|
||||
event->Set(context, env()->target_string(), object()).IsNothing() ||
|
||||
(cb_args[0] = event, false) ||
|
||||
MakeCallback(env()->onmessage_string(),
|
||||
arraysize(cb_args),
|
||||
cb_args).IsEmpty()) {
|
||||
// Re-schedule OnMessage() execution in case of failure.
|
||||
if (data_)
|
||||
TriggerAsync();
|
||||
@ -763,6 +771,8 @@ MaybeLocal<Function> GetMessagePortConstructor(
|
||||
if (!templ.IsEmpty())
|
||||
return templ->GetFunction(context);
|
||||
|
||||
Isolate* isolate = env->isolate();
|
||||
|
||||
{
|
||||
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
|
||||
m->SetClassName(env->message_port_constructor_string());
|
||||
@ -775,6 +785,13 @@ MaybeLocal<Function> GetMessagePortConstructor(
|
||||
env->SetProtoMethod(m, "drain", MessagePort::Drain);
|
||||
|
||||
env->set_message_port_constructor_template(m);
|
||||
|
||||
Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
|
||||
event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
|
||||
Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
|
||||
e->Set(env->data_string(), Null(isolate));
|
||||
e->Set(env->target_string(), Null(isolate));
|
||||
env->set_message_event_object_template(e);
|
||||
}
|
||||
|
||||
return GetMessagePortConstructor(env, context);
|
||||
|
@ -25,7 +25,7 @@ assert.throws(common.mustCall(() => {
|
||||
|
||||
// The failed transfer should not affect the ports in anyway.
|
||||
port2.onmessage = common.mustCall((message) => {
|
||||
assert.strictEqual(message, 2);
|
||||
assert.strictEqual(message.data, 2);
|
||||
|
||||
const inspectedPort1 = util.inspect(port1);
|
||||
const inspectedPort2 = util.inspect(port2);
|
||||
|
@ -21,14 +21,15 @@ const { MessageChannel, MessagePort } = require('worker_threads');
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
|
||||
port1.onmessage = common.mustCall((message) => {
|
||||
assert.strictEqual(message, 4);
|
||||
assert.strictEqual(message.data, 4);
|
||||
assert.strictEqual(message.target, port1);
|
||||
port2.close(common.mustCall());
|
||||
});
|
||||
|
||||
port1.postMessage(2);
|
||||
|
||||
port2.onmessage = common.mustCall((message) => {
|
||||
port2.postMessage(message * 2);
|
||||
port2.postMessage(message.data * 2);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,6 @@ if (!process.env.HAS_STARTED_WORKER) {
|
||||
w.postMessage(2);
|
||||
} else {
|
||||
parentPort.onmessage = common.mustCall((message) => {
|
||||
parentPort.postMessage(message * 2);
|
||||
parentPort.postMessage(message.data * 2);
|
||||
});
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user