worker: move worker thread setup code into the main script
This patch directly inlines `createMessageHandler()` and `createWorkerFatalExeception()` in the new `lib/internal/main/worker_thread.js` since the implementation of the two methods are related to the execution flow of workers. PR-URL: https://github.com/nodejs/node/pull/25667 Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
6967f91368
commit
5e1d4462d2
@ -10,30 +10,118 @@ const {
|
|||||||
} = require('internal/bootstrap/pre_execution');
|
} = require('internal/bootstrap/pre_execution');
|
||||||
|
|
||||||
const {
|
const {
|
||||||
getEnvMessagePort,
|
threadId,
|
||||||
threadId
|
getEnvMessagePort
|
||||||
} = internalBinding('worker');
|
} = internalBinding('worker');
|
||||||
|
|
||||||
const {
|
const {
|
||||||
createMessageHandler,
|
messageTypes: {
|
||||||
createWorkerFatalExeception
|
// Messages that may be received by workers
|
||||||
} = require('internal/process/worker_thread_only');
|
LOAD_SCRIPT,
|
||||||
|
// Messages that may be posted from workers
|
||||||
|
UP_AND_RUNNING,
|
||||||
|
ERROR_MESSAGE,
|
||||||
|
COULD_NOT_SERIALIZE_ERROR,
|
||||||
|
// Messages that may be either received or posted
|
||||||
|
STDIO_PAYLOAD,
|
||||||
|
STDIO_WANTS_MORE_DATA,
|
||||||
|
},
|
||||||
|
kStdioWantsMoreDataCallback
|
||||||
|
} = require('internal/worker/io');
|
||||||
|
|
||||||
|
const {
|
||||||
|
fatalException: originalFatalException
|
||||||
|
} = require('internal/process/execution');
|
||||||
|
|
||||||
|
const publicWorker = require('worker_threads');
|
||||||
const debug = require('util').debuglog('worker');
|
const debug = require('util').debuglog('worker');
|
||||||
debug(`[${threadId}] is setting up worker child environment`);
|
|
||||||
|
|
||||||
function prepareUserCodeExecution() {
|
debug(`[${threadId}] is setting up worker child environment`);
|
||||||
initializeClusterIPC();
|
|
||||||
initializeESMLoader();
|
|
||||||
loadPreloadModules();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up the message port and start listening
|
// Set up the message port and start listening
|
||||||
const port = getEnvMessagePort();
|
const port = getEnvMessagePort();
|
||||||
port.on('message', createMessageHandler(port, prepareUserCodeExecution));
|
|
||||||
port.start();
|
port.on('message', (message) => {
|
||||||
|
if (message.type === LOAD_SCRIPT) {
|
||||||
|
const {
|
||||||
|
filename,
|
||||||
|
doEval,
|
||||||
|
workerData,
|
||||||
|
publicPort,
|
||||||
|
manifestSrc,
|
||||||
|
manifestURL,
|
||||||
|
hasStdin
|
||||||
|
} = message;
|
||||||
|
if (manifestSrc) {
|
||||||
|
require('internal/process/policy').setup(manifestSrc, manifestURL);
|
||||||
|
}
|
||||||
|
initializeClusterIPC();
|
||||||
|
initializeESMLoader();
|
||||||
|
loadPreloadModules();
|
||||||
|
publicWorker.parentPort = publicPort;
|
||||||
|
publicWorker.workerData = workerData;
|
||||||
|
|
||||||
|
if (!hasStdin)
|
||||||
|
process.stdin.push(null);
|
||||||
|
|
||||||
|
debug(`[${threadId}] starts worker script ${filename} ` +
|
||||||
|
`(eval = ${eval}) at cwd = ${process.cwd()}`);
|
||||||
|
port.unref();
|
||||||
|
port.postMessage({ type: UP_AND_RUNNING });
|
||||||
|
if (doEval) {
|
||||||
|
const { evalScript } = require('internal/process/execution');
|
||||||
|
evalScript('[worker eval]', filename);
|
||||||
|
} else {
|
||||||
|
process.argv[1] = filename; // script filename
|
||||||
|
require('module').runMain();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
} else if (message.type === STDIO_PAYLOAD) {
|
||||||
|
const { stream, chunk, encoding } = message;
|
||||||
|
process[stream].push(chunk, encoding);
|
||||||
|
return;
|
||||||
|
} else if (message.type === STDIO_WANTS_MORE_DATA) {
|
||||||
|
const { stream } = message;
|
||||||
|
process[stream][kStdioWantsMoreDataCallback]();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
require('assert').fail(`Unknown worker message type ${message.type}`);
|
||||||
|
});
|
||||||
|
|
||||||
// Overwrite fatalException
|
// Overwrite fatalException
|
||||||
process._fatalException = createWorkerFatalExeception(port);
|
process._fatalException = (error) => {
|
||||||
|
debug(`[${threadId}] gets fatal exception`);
|
||||||
|
let caught = false;
|
||||||
|
try {
|
||||||
|
caught = originalFatalException.call(this, error);
|
||||||
|
} catch (e) {
|
||||||
|
error = e;
|
||||||
|
}
|
||||||
|
debug(`[${threadId}] fatal exception caught = ${caught}`);
|
||||||
|
|
||||||
|
if (!caught) {
|
||||||
|
let serialized;
|
||||||
|
try {
|
||||||
|
const { serializeError } = require('internal/error-serdes');
|
||||||
|
serialized = serializeError(error);
|
||||||
|
} catch {}
|
||||||
|
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
|
||||||
|
if (serialized)
|
||||||
|
port.postMessage({
|
||||||
|
type: ERROR_MESSAGE,
|
||||||
|
error: serialized
|
||||||
|
});
|
||||||
|
else
|
||||||
|
port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
|
||||||
|
|
||||||
|
const { clearAsyncIdStack } = require('internal/async_hooks');
|
||||||
|
clearAsyncIdStack();
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
markBootstrapComplete();
|
markBootstrapComplete();
|
||||||
|
|
||||||
|
port.start();
|
||||||
|
@ -3,13 +3,10 @@
|
|||||||
// This file contains process bootstrappers that can only be
|
// This file contains process bootstrappers that can only be
|
||||||
// run in the worker thread.
|
// run in the worker thread.
|
||||||
const {
|
const {
|
||||||
getEnvMessagePort,
|
getEnvMessagePort
|
||||||
threadId
|
|
||||||
} = internalBinding('worker');
|
} = internalBinding('worker');
|
||||||
|
|
||||||
const {
|
const {
|
||||||
messageTypes,
|
|
||||||
kStdioWantsMoreDataCallback,
|
|
||||||
kWaitingStreams,
|
kWaitingStreams,
|
||||||
ReadableWorkerStdio,
|
ReadableWorkerStdio,
|
||||||
WritableWorkerStdio
|
WritableWorkerStdio
|
||||||
@ -18,15 +15,6 @@ const {
|
|||||||
const {
|
const {
|
||||||
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
|
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
|
||||||
} = require('internal/errors');
|
} = require('internal/errors');
|
||||||
|
|
||||||
let debuglog;
|
|
||||||
function debug(...args) {
|
|
||||||
if (!debuglog) {
|
|
||||||
debuglog = require('util').debuglog('worker');
|
|
||||||
}
|
|
||||||
return debuglog(...args);
|
|
||||||
}
|
|
||||||
|
|
||||||
const workerStdio = {};
|
const workerStdio = {};
|
||||||
|
|
||||||
function initializeWorkerStdio() {
|
function initializeWorkerStdio() {
|
||||||
@ -43,97 +31,6 @@ function initializeWorkerStdio() {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
function createMessageHandler(port, prepareUserCodeExecution) {
|
|
||||||
const publicWorker = require('worker_threads');
|
|
||||||
|
|
||||||
return function(message) {
|
|
||||||
if (message.type === messageTypes.LOAD_SCRIPT) {
|
|
||||||
const {
|
|
||||||
filename,
|
|
||||||
doEval,
|
|
||||||
workerData,
|
|
||||||
publicPort,
|
|
||||||
manifestSrc,
|
|
||||||
manifestURL,
|
|
||||||
hasStdin
|
|
||||||
} = message;
|
|
||||||
if (manifestSrc) {
|
|
||||||
require('internal/process/policy').setup(manifestSrc, manifestURL);
|
|
||||||
}
|
|
||||||
prepareUserCodeExecution();
|
|
||||||
publicWorker.parentPort = publicPort;
|
|
||||||
publicWorker.workerData = workerData;
|
|
||||||
|
|
||||||
if (!hasStdin)
|
|
||||||
workerStdio.stdin.push(null);
|
|
||||||
|
|
||||||
debug(`[${threadId}] starts worker script ${filename} ` +
|
|
||||||
`(eval = ${eval}) at cwd = ${process.cwd()}`);
|
|
||||||
port.unref();
|
|
||||||
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
|
|
||||||
if (doEval) {
|
|
||||||
const { evalScript } = require('internal/process/execution');
|
|
||||||
evalScript('[worker eval]', filename);
|
|
||||||
} else {
|
|
||||||
process.argv[1] = filename; // script filename
|
|
||||||
require('module').runMain();
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
|
|
||||||
const { stream, chunk, encoding } = message;
|
|
||||||
workerStdio[stream].push(chunk, encoding);
|
|
||||||
return;
|
|
||||||
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
|
|
||||||
const { stream } = message;
|
|
||||||
workerStdio[stream][kStdioWantsMoreDataCallback]();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
require('assert').fail(`Unknown worker message type ${message.type}`);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX(joyeecheung): this has to be returned as an anonymous function
|
|
||||||
// wrapped in a closure, see the comment of the original
|
|
||||||
// process._fatalException in lib/internal/process/execution.js
|
|
||||||
function createWorkerFatalExeception(port) {
|
|
||||||
const {
|
|
||||||
fatalException: originalFatalException
|
|
||||||
} = require('internal/process/execution');
|
|
||||||
|
|
||||||
return (error) => {
|
|
||||||
debug(`[${threadId}] gets fatal exception`);
|
|
||||||
let caught = false;
|
|
||||||
try {
|
|
||||||
caught = originalFatalException.call(this, error);
|
|
||||||
} catch (e) {
|
|
||||||
error = e;
|
|
||||||
}
|
|
||||||
debug(`[${threadId}] fatal exception caught = ${caught}`);
|
|
||||||
|
|
||||||
if (!caught) {
|
|
||||||
let serialized;
|
|
||||||
try {
|
|
||||||
const { serializeError } = require('internal/error-serdes');
|
|
||||||
serialized = serializeError(error);
|
|
||||||
} catch {}
|
|
||||||
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
|
|
||||||
if (serialized)
|
|
||||||
port.postMessage({
|
|
||||||
type: messageTypes.ERROR_MESSAGE,
|
|
||||||
error: serialized
|
|
||||||
});
|
|
||||||
else
|
|
||||||
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
|
|
||||||
|
|
||||||
const { clearAsyncIdStack } = require('internal/async_hooks');
|
|
||||||
clearAsyncIdStack();
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// The execution of this function itself should not cause any side effects.
|
// The execution of this function itself should not cause any side effects.
|
||||||
function wrapProcessMethods(binding) {
|
function wrapProcessMethods(binding) {
|
||||||
function umask(mask) {
|
function umask(mask) {
|
||||||
@ -150,7 +47,5 @@ function wrapProcessMethods(binding) {
|
|||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
initializeWorkerStdio,
|
initializeWorkerStdio,
|
||||||
createMessageHandler,
|
|
||||||
createWorkerFatalExeception,
|
|
||||||
wrapProcessMethods
|
wrapProcessMethods
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user