worker: reduce MessagePort
prototype to documented API
`MessagePort` is special because it has to be a C++ API that is exposed to userland. Therefore, there is a number of internal methods on its native prototype; this commit reduces this set of methods to only what is documented in the API. PR-URL: https://github.com/nodejs/node/pull/23037 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
2d65e67305
commit
0434d270c1
@ -14,18 +14,18 @@ const {
|
|||||||
|
|
||||||
const { internalBinding } = require('internal/bootstrap/loaders');
|
const { internalBinding } = require('internal/bootstrap/loaders');
|
||||||
const { MessagePort, MessageChannel } = internalBinding('messaging');
|
const { MessagePort, MessageChannel } = internalBinding('messaging');
|
||||||
const { handle_onclose } = internalBinding('symbols');
|
const {
|
||||||
|
handle_onclose: handleOnCloseSymbol,
|
||||||
|
oninit: onInitSymbol
|
||||||
|
} = internalBinding('symbols');
|
||||||
const { clearAsyncIdStack } = require('internal/async_hooks');
|
const { clearAsyncIdStack } = require('internal/async_hooks');
|
||||||
const { serializeError, deserializeError } = require('internal/error-serdes');
|
const { serializeError, deserializeError } = require('internal/error-serdes');
|
||||||
const { pathToFileURL } = require('url');
|
const { pathToFileURL } = require('url');
|
||||||
|
|
||||||
util.inherits(MessagePort, EventEmitter);
|
|
||||||
|
|
||||||
const {
|
const {
|
||||||
Worker: WorkerImpl,
|
Worker: WorkerImpl,
|
||||||
getEnvMessagePort,
|
getEnvMessagePort,
|
||||||
threadId,
|
threadId
|
||||||
oninit: oninit_symbol
|
|
||||||
} = internalBinding('worker');
|
} = internalBinding('worker');
|
||||||
|
|
||||||
const isMainThread = threadId === 0;
|
const isMainThread = threadId === 0;
|
||||||
@ -58,6 +58,23 @@ const messageTypes = {
|
|||||||
LOAD_SCRIPT: 'loadScript'
|
LOAD_SCRIPT: 'loadScript'
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// We have to mess with the MessagePort prototype a bit, so that a) we can make
|
||||||
|
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
|
||||||
|
// not provide methods that are not present in the Browser and not documented
|
||||||
|
// on our side (e.g. hasRef).
|
||||||
|
// Save a copy of the original set of methods as a shallow clone.
|
||||||
|
const MessagePortPrototype = Object.create(
|
||||||
|
Object.getPrototypeOf(MessagePort.prototype),
|
||||||
|
Object.getOwnPropertyDescriptors(MessagePort.prototype));
|
||||||
|
// Set up the new inheritance chain.
|
||||||
|
Object.setPrototypeOf(MessagePort, EventEmitter);
|
||||||
|
Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
|
||||||
|
// Finally, purge methods we don't want to be public.
|
||||||
|
delete MessagePort.prototype.stop;
|
||||||
|
delete MessagePort.prototype.drain;
|
||||||
|
delete MessagePort.prototype.hasRef;
|
||||||
|
delete MessagePort.prototype.getAsyncId;
|
||||||
|
|
||||||
// A communication channel consisting of a handle (that wraps around an
|
// A communication channel consisting of a handle (that wraps around an
|
||||||
// uv_async_t) which can receive information from other threads and emits
|
// uv_async_t) which can receive information from other threads and emits
|
||||||
// .onmessage events, and a function used for sending data to a MessagePort
|
// .onmessage events, and a function used for sending data to a MessagePort
|
||||||
@ -81,10 +98,10 @@ Object.defineProperty(MessagePort.prototype, 'onmessage', {
|
|||||||
this[kOnMessageListener] = value;
|
this[kOnMessageListener] = value;
|
||||||
if (typeof value === 'function') {
|
if (typeof value === 'function') {
|
||||||
this.ref();
|
this.ref();
|
||||||
this.start();
|
MessagePortPrototype.start.call(this);
|
||||||
} else {
|
} else {
|
||||||
this.unref();
|
this.unref();
|
||||||
this.stop();
|
MessagePortPrototype.stop.call(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -94,7 +111,7 @@ function oninit() {
|
|||||||
setupPortReferencing(this, this, 'message');
|
setupPortReferencing(this, this, 'message');
|
||||||
}
|
}
|
||||||
|
|
||||||
Object.defineProperty(MessagePort.prototype, oninit_symbol, {
|
Object.defineProperty(MessagePort.prototype, onInitSymbol, {
|
||||||
enumerable: true,
|
enumerable: true,
|
||||||
writable: false,
|
writable: false,
|
||||||
value: oninit
|
value: oninit
|
||||||
@ -111,22 +128,18 @@ function onclose() {
|
|||||||
this.emit('close');
|
this.emit('close');
|
||||||
}
|
}
|
||||||
|
|
||||||
Object.defineProperty(MessagePort.prototype, handle_onclose, {
|
Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
|
||||||
enumerable: false,
|
enumerable: false,
|
||||||
writable: false,
|
writable: false,
|
||||||
value: onclose
|
value: onclose
|
||||||
});
|
});
|
||||||
|
|
||||||
const originalClose = MessagePort.prototype.close;
|
|
||||||
MessagePort.prototype.close = function(cb) {
|
MessagePort.prototype.close = function(cb) {
|
||||||
if (typeof cb === 'function')
|
if (typeof cb === 'function')
|
||||||
this.once('close', cb);
|
this.once('close', cb);
|
||||||
originalClose.call(this);
|
MessagePortPrototype.close.call(this);
|
||||||
};
|
};
|
||||||
|
|
||||||
const drainMessagePort = MessagePort.prototype.drain;
|
|
||||||
delete MessagePort.prototype.drain;
|
|
||||||
|
|
||||||
Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
|
Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
|
||||||
enumerable: false,
|
enumerable: false,
|
||||||
writable: false,
|
writable: false,
|
||||||
@ -135,7 +148,7 @@ Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
|
|||||||
try {
|
try {
|
||||||
// This may throw when `this` does not refer to a native object,
|
// This may throw when `this` does not refer to a native object,
|
||||||
// e.g. when accessing the prototype directly.
|
// e.g. when accessing the prototype directly.
|
||||||
ref = this.hasRef();
|
ref = MessagePortPrototype.hasRef.call(this);
|
||||||
} catch { return this; }
|
} catch { return this; }
|
||||||
return Object.assign(Object.create(MessagePort.prototype),
|
return Object.assign(Object.create(MessagePort.prototype),
|
||||||
ref === undefined ? {
|
ref === undefined ? {
|
||||||
@ -157,12 +170,12 @@ function setupPortReferencing(port, eventEmitter, eventName) {
|
|||||||
eventEmitter.on('newListener', (name) => {
|
eventEmitter.on('newListener', (name) => {
|
||||||
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
|
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
|
||||||
port.ref();
|
port.ref();
|
||||||
port.start();
|
MessagePortPrototype.start.call(port);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
eventEmitter.on('removeListener', (name) => {
|
eventEmitter.on('removeListener', (name) => {
|
||||||
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
|
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
|
||||||
port.stop();
|
MessagePortPrototype.stop.call(port);
|
||||||
port.unref();
|
port.unref();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -304,7 +317,7 @@ class Worker extends EventEmitter {
|
|||||||
|
|
||||||
[kOnExit](code) {
|
[kOnExit](code) {
|
||||||
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
|
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
|
||||||
drainMessagePort.call(this[kPublicPort]);
|
MessagePortPrototype.drain.call(this[kPublicPort]);
|
||||||
this[kDispose]();
|
this[kDispose]();
|
||||||
this.emit('exit', code);
|
this.emit('exit', code);
|
||||||
this.removeAllListeners();
|
this.removeAllListeners();
|
||||||
|
@ -502,10 +502,6 @@ void InitWorker(Local<Object> target,
|
|||||||
thread_id_string,
|
thread_id_string,
|
||||||
Number::New(env->isolate(),
|
Number::New(env->isolate(),
|
||||||
static_cast<double>(env->thread_id()))).FromJust();
|
static_cast<double>(env->thread_id()))).FromJust();
|
||||||
Local<String> oninit_string = FIXED_ONE_BYTE_STRING(env->isolate(), "oninit");
|
|
||||||
target->Set(env->context(),
|
|
||||||
oninit_string,
|
|
||||||
env->oninit_symbol()).FromJust();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
@ -19,8 +19,7 @@ validateSnapshotNodes('Worker', [
|
|||||||
validateSnapshotNodes('MessagePort', [
|
validateSnapshotNodes('MessagePort', [
|
||||||
{
|
{
|
||||||
children: [
|
children: [
|
||||||
{ name: 'MessagePortData' },
|
{ name: 'MessagePortData' }
|
||||||
{ name: 'MessagePort' }
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
], { loose: true });
|
], { loose: true });
|
||||||
|
@ -69,3 +69,12 @@ const { MessageChannel, MessagePort } = require('worker_threads');
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
assert.deepStrictEqual(
|
||||||
|
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
|
||||||
|
[
|
||||||
|
'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
|
||||||
|
'unref'
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user