process: split worker IO into internal/worker/io.js

- Move `setupProcessStdio` which contains write access to
  the process object into `bootstrap/node.js`
- Move `MessagePort`, `MessageChannel`, `ReadableWorkerStdio`,
  and `WritableWorkerStdio` into `internal/worker/io.js`
- Move more worker-specific bootstrap code into
  `internal/process/worker_thread_only` from `setupChild`
  in `internal/worker.js`, and move the `process._fatalException`
  overwrite into `bootstrap/node.js` for clarity.

PR-URL: https://github.com/nodejs/node/pull/25199
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Joyee Cheung 2018-12-23 11:53:05 +08:00
parent 7163fbf066
commit af237152cd
No known key found for this signature in database
GPG Key ID: 92B78A53C8303B8D
8 changed files with 359 additions and 300 deletions

View File

@ -140,9 +140,13 @@ function startup() {
}
if (isMainThread) {
mainThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
NativeModule.require('internal/process/stdio').getMainThreadStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
} else {
workerThreadSetup.setupStdio();
const { getStdout, getStdin, getStderr } =
workerThreadSetup.initializeWorkerStdio();
setupProcessStdio(getStdout, getStdin, getStderr);
}
if (global.__coverage__)
@ -312,8 +316,14 @@ function startup() {
function startExecution() {
// This means we are in a Worker context, and any script execution
// will be directed by the worker module.
if (internalBinding('worker').getEnvMessagePort() !== undefined) {
NativeModule.require('internal/worker').setupChild();
if (!isMainThread) {
const workerThreadSetup = NativeModule.require(
'internal/process/worker_thread_only'
);
// Set up the message port and start listening
const { workerFatalExeception } = workerThreadSetup.setup();
// Overwrite fatalException
process._fatalException = workerFatalExeception;
return;
}
@ -505,6 +515,31 @@ function setupProcessObject() {
EventEmitter.call(process);
}
function setupProcessStdio(getStdout, getStdin, getStderr) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});
Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});
Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});
process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}
function setupGlobalVariables() {
Object.defineProperty(global, Symbol.toStringTag, {
value: 'global',

View File

@ -16,15 +16,6 @@ const {
validateString
} = require('internal/validators');
const {
setupProcessStdio,
getMainThreadStdio
} = require('internal/process/stdio');
function setupStdio() {
setupProcessStdio(getMainThreadStdio());
}
// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function chdir(directory) {
@ -174,7 +165,6 @@ function setupChildProcessIpcChannel() {
}
module.exports = {
setupStdio,
wrapProcessMethods,
setupSignalHandlers,
setupChildProcessIpcChannel,

View File

@ -1,6 +1,5 @@
'use strict';
exports.setupProcessStdio = setupProcessStdio;
exports.getMainThreadStdio = getMainThreadStdio;
function dummyDestroy(err, cb) { cb(err); }
@ -134,31 +133,6 @@ function getMainThreadStdio() {
};
}
function setupProcessStdio({ getStdout, getStdin, getStderr }) {
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});
Object.defineProperty(process, 'stderr', {
configurable: true,
enumerable: true,
get: getStderr
});
Object.defineProperty(process, 'stdin', {
configurable: true,
enumerable: true,
get: getStdin
});
process.openStdin = function() {
process.stdin.resume();
return process.stdin;
};
}
function createWritableStdioStream(fd) {
var stream;
const tty_wrap = internalBinding('tty_wrap');

View File

@ -2,23 +2,54 @@
// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort,
threadId
} = internalBinding('worker');
const debug = require('util').debuglog('worker');
const {
setupProcessStdio
} = require('internal/process/stdio');
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
} = require('internal/worker/io');
const {
workerStdio
createMessageHandler,
createWorkerFatalExeception
} = require('internal/worker');
function setupStdio() {
setupProcessStdio({
getStdout: () => workerStdio.stdout,
getStderr: () => workerStdio.stderr,
getStdin: () => workerStdio.stdin
});
const workerStdio = {};
function initializeWorkerStdio() {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
return {
getStdout() { return workerStdio.stdout; },
getStderr() { return workerStdio.stderr; },
getStdin() { return workerStdio.stdin; }
};
}
function setup() {
debug(`[${threadId}] is setting up worker child environment`);
const port = getEnvMessagePort();
const publicWorker = require('worker_threads');
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
port.start();
return {
workerFatalExeception: createWorkerFatalExeception(port)
};
}
module.exports = {
setupStdio
initializeWorkerStdio,
setup
};

View File

@ -4,35 +4,37 @@ const EventEmitter = require('events');
const assert = require('assert');
const path = require('path');
const util = require('util');
const { Readable, Writable } = require('stream');
const {
ERR_WORKER_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
} = require('internal/errors').codes;
const { validateString } = require('internal/validators');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const {
handle_onclose: handleOnCloseSymbol,
oninit: onInitSymbol
} = internalBinding('symbols');
const { clearAsyncIdStack } = require('internal/async_hooks');
const {
drainMessagePort,
MessageChannel,
messageTypes,
kPort,
kIncrementsPortRef,
kWaitingStreams,
kStdioWantsMoreDataCallback,
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio,
} = require('internal/worker/io');
const { serializeError, deserializeError } = require('internal/error-serdes');
const { pathToFileURL } = require('url');
const {
Worker: WorkerImpl,
getEnvMessagePort,
threadId
} = internalBinding('worker');
const isMainThread = threadId === 0;
const kOnMessageListener = Symbol('kOnMessageListener');
const kHandle = Symbol('kHandle');
const kName = Symbol('kName');
const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
const kOnExit = Symbol('kOnExit');
@ -40,213 +42,9 @@ const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kStartedReading = Symbol('kStartedReading');
const kWaitingStreams = Symbol('kWaitingStreams');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const debug = util.debuglog('worker');
const messageTypes = {
UP_AND_RUNNING: 'upAndRunning',
COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
ERROR_MESSAGE: 'errorMessage',
STDIO_PAYLOAD: 'stdioPayload',
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
LOAD_SCRIPT: 'loadScript'
};
// We have to mess with the MessagePort prototype a bit, so that a) we can make
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
const MessagePortPrototype = Object.create(
Object.getPrototypeOf(MessagePort.prototype),
Object.getOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
Object.setPrototypeOf(MessagePort, EventEmitter);
Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
// Finally, purge methods we don't want to be public.
delete MessagePort.prototype.stop;
delete MessagePort.prototype.drain;
MessagePort.prototype.ref = MessagePortPrototype.ref;
MessagePort.prototype.unref = MessagePortPrototype.unref;
// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug(`[${threadId}] received message`, payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
MessagePortPrototype.start.call(this);
} else {
this.unref();
MessagePortPrototype.stop.call(this);
}
}
});
// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}
Object.defineProperty(MessagePort.prototype, onInitSymbol, {
enumerable: true,
writable: false,
value: oninit
});
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}
Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
enumerable: false,
writable: false,
value: onclose
});
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
MessagePortPrototype.close.call(this);
};
Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
enumerable: false,
writable: false,
value: function inspect() { // eslint-disable-line func-name-matching
let ref;
try {
// This may throw when `this` does not refer to a native object,
// e.g. when accessing the prototype directly.
ref = MessagePortPrototype.hasRef.call(this);
} catch { return this; }
return Object.assign(Object.create(MessagePort.prototype),
ref === undefined ? {
active: false,
} : {
active: true,
refed: ref
},
this);
}
});
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
MessagePortPrototype.start.call(port);
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
MessagePortPrototype.stop.call(port);
port.unref();
}
});
}
class ReadableWorkerStdio extends Readable {
constructor(port, name) {
super();
this[kPort] = port;
this[kName] = name;
this[kIncrementsPortRef] = true;
this[kStartedReading] = false;
this.on('end', () => {
if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
this[kPort].unref();
});
}
_read() {
if (!this[kStartedReading] && this[kIncrementsPortRef]) {
this[kStartedReading] = true;
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
this[kPort].postMessage({
type: messageTypes.STDIO_WANTS_MORE_DATA,
stream: this[kName]
});
}
}
class WritableWorkerStdio extends Writable {
constructor(port, name) {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
}
_write(chunk, encoding, cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunk,
encoding
});
this[kWritableCallbacks].push(cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
_final(cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunk: null
});
cb();
}
[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
for (const cb of cbs)
cb();
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
}
}
class Worker extends EventEmitter {
constructor(filename, options = {}) {
super();
@ -314,8 +112,8 @@ class Worker extends EventEmitter {
[kOnExit](code) {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
MessagePortPrototype.drain.call(this[kPublicPort]);
MessagePortPrototype.drain.call(this[kPort]);
drainMessagePort(this[kPublicPort]);
drainMessagePort(this[kPort]);
this[kDispose]();
this.emit('exit', code);
this.removeAllListeners();
@ -421,25 +219,8 @@ class Worker extends EventEmitter {
}
}
const workerStdio = {};
if (!isMainThread) {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
}
let originalFatalException;
function setupChild() {
// Called during bootstrap to set up worker script execution.
debug(`[${threadId}] is setting up worker child environment`);
const port = getEnvMessagePort();
const publicWorker = require('worker_threads');
port.on('message', (message) => {
function createMessageHandler(publicWorker, port, workerStdio) {
return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
@ -471,14 +252,15 @@ function setupChild() {
}
assert.fail(`Unknown worker message type ${message.type}`);
});
};
}
port.start();
function createWorkerFatalExeception(port) {
const {
fatalException: originalFatalException
} = require('internal/process/execution');
originalFatalException = process._fatalException;
process._fatalException = fatalException;
function fatalException(error) {
return function(error) {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
@ -505,7 +287,7 @@ function setupChild() {
process.exit();
}
}
};
}
function pipeWithoutWarning(source, dest) {
@ -521,11 +303,9 @@ function pipeWithoutWarning(source, dest) {
}
module.exports = {
MessagePort,
MessageChannel,
createMessageHandler,
createWorkerFatalExeception,
threadId,
Worker,
setupChild,
isMainThread,
workerStdio
isMainThread
};

245
lib/internal/worker/io.js Normal file
View File

@ -0,0 +1,245 @@
'use strict';
const {
handle_onclose: handleOnCloseSymbol,
oninit: onInitSymbol
} = internalBinding('symbols');
const {
MessagePort,
MessageChannel
} = internalBinding('messaging');
const { threadId } = internalBinding('worker');
const { Readable, Writable } = require('stream');
const EventEmitter = require('events');
const util = require('util');
const debug = util.debuglog('worker');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kName = Symbol('kName');
const kOnMessageListener = Symbol('kOnMessageListener');
const kPort = Symbol('kPort');
const kWaitingStreams = Symbol('kWaitingStreams');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kStartedReading = Symbol('kStartedReading');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const messageTypes = {
UP_AND_RUNNING: 'upAndRunning',
COULD_NOT_SERIALIZE_ERROR: 'couldNotSerializeError',
ERROR_MESSAGE: 'errorMessage',
STDIO_PAYLOAD: 'stdioPayload',
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
LOAD_SCRIPT: 'loadScript'
};
// Original drain from C++
const originalDrain = MessagePort.prototype.drain;
function drainMessagePort(port) {
return originalDrain.call(port);
}
// We have to mess with the MessagePort prototype a bit, so that a) we can make
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
const MessagePortPrototype = Object.create(
Object.getPrototypeOf(MessagePort.prototype),
Object.getOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
Object.setPrototypeOf(MessagePort, EventEmitter);
Object.setPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
// Finally, purge methods we don't want to be public.
delete MessagePort.prototype.stop;
delete MessagePort.prototype.drain;
MessagePort.prototype.ref = MessagePortPrototype.ref;
MessagePort.prototype.unref = MessagePortPrototype.unref;
// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug(`[${threadId}] received message`, payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
MessagePortPrototype.start.call(this);
} else {
this.unref();
MessagePortPrototype.stop.call(this);
}
}
});
// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}
Object.defineProperty(MessagePort.prototype, onInitSymbol, {
enumerable: true,
writable: false,
value: oninit
});
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}
Object.defineProperty(MessagePort.prototype, handleOnCloseSymbol, {
enumerable: false,
writable: false,
value: onclose
});
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
MessagePortPrototype.close.call(this);
};
Object.defineProperty(MessagePort.prototype, util.inspect.custom, {
enumerable: false,
writable: false,
value: function inspect() { // eslint-disable-line func-name-matching
let ref;
try {
// This may throw when `this` does not refer to a native object,
// e.g. when accessing the prototype directly.
ref = MessagePortPrototype.hasRef.call(this);
} catch { return this; }
return Object.assign(Object.create(MessagePort.prototype),
ref === undefined ? {
active: false,
} : {
active: true,
refed: ref
},
this);
}
});
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
MessagePortPrototype.start.call(port);
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
MessagePortPrototype.stop.call(port);
port.unref();
}
});
}
class ReadableWorkerStdio extends Readable {
constructor(port, name) {
super();
this[kPort] = port;
this[kName] = name;
this[kIncrementsPortRef] = true;
this[kStartedReading] = false;
this.on('end', () => {
if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
this[kPort].unref();
});
}
_read() {
if (!this[kStartedReading] && this[kIncrementsPortRef]) {
this[kStartedReading] = true;
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
this[kPort].postMessage({
type: messageTypes.STDIO_WANTS_MORE_DATA,
stream: this[kName]
});
}
}
class WritableWorkerStdio extends Writable {
constructor(port, name) {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
}
_write(chunk, encoding, cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunk,
encoding
});
this[kWritableCallbacks].push(cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
_final(cb) {
this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD,
stream: this[kName],
chunk: null
});
cb();
}
[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
for (const cb of cbs)
cb();
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
}
}
module.exports = {
drainMessagePort,
messageTypes,
kPort,
kIncrementsPortRef,
kWaitingStreams,
kStdioWantsMoreDataCallback,
MessagePort,
MessageChannel,
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio
};

View File

@ -2,12 +2,15 @@
const {
isMainThread,
MessagePort,
MessageChannel,
threadId,
Worker
} = require('internal/worker');
const {
MessagePort,
MessageChannel
} = require('internal/worker/io');
module.exports = {
isMainThread,
MessagePort,

View File

@ -181,6 +181,7 @@
'lib/internal/stream_base_commons.js',
'lib/internal/vm/source_text_module.js',
'lib/internal/worker.js',
'lib/internal/worker/io.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',