process: move worker bootstrap code into worker_thread_only.js
Move worker bootstrap code into worker_thread_only.js from internal/worker.js since they are only run once during bootstrap. PR-URL: https://github.com/nodejs/node/pull/25199 Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
af237152cd
commit
00babd38f3
@ -7,18 +7,21 @@ const {
|
||||
threadId
|
||||
} = internalBinding('worker');
|
||||
|
||||
const debug = require('util').debuglog('worker');
|
||||
|
||||
const {
|
||||
messageTypes,
|
||||
kStdioWantsMoreDataCallback,
|
||||
kWaitingStreams,
|
||||
ReadableWorkerStdio,
|
||||
WritableWorkerStdio
|
||||
} = require('internal/worker/io');
|
||||
|
||||
const {
|
||||
createMessageHandler,
|
||||
createWorkerFatalExeception
|
||||
} = require('internal/worker');
|
||||
let debuglog;
|
||||
function debug(...args) {
|
||||
if (!debuglog) {
|
||||
debuglog = require('util').debuglog('worker');
|
||||
}
|
||||
return debuglog(...args);
|
||||
}
|
||||
|
||||
const workerStdio = {};
|
||||
|
||||
@ -36,12 +39,90 @@ function initializeWorkerStdio() {
|
||||
};
|
||||
}
|
||||
|
||||
function createMessageHandler(port) {
|
||||
const publicWorker = require('worker_threads');
|
||||
|
||||
return function(message) {
|
||||
if (message.type === messageTypes.LOAD_SCRIPT) {
|
||||
const { filename, doEval, workerData, publicPort, hasStdin } = message;
|
||||
publicWorker.parentPort = publicPort;
|
||||
publicWorker.workerData = workerData;
|
||||
|
||||
if (!hasStdin)
|
||||
workerStdio.stdin.push(null);
|
||||
|
||||
debug(`[${threadId}] starts worker script ${filename} ` +
|
||||
`(eval = ${eval}) at cwd = ${process.cwd()}`);
|
||||
port.unref();
|
||||
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
|
||||
if (doEval) {
|
||||
const { evalScript } = require('internal/process/execution');
|
||||
evalScript('[worker eval]', filename);
|
||||
} else {
|
||||
process.argv[1] = filename; // script filename
|
||||
require('module').runMain();
|
||||
}
|
||||
return;
|
||||
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
|
||||
const { stream, chunk, encoding } = message;
|
||||
workerStdio[stream].push(chunk, encoding);
|
||||
return;
|
||||
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
|
||||
const { stream } = message;
|
||||
workerStdio[stream][kStdioWantsMoreDataCallback]();
|
||||
return;
|
||||
}
|
||||
|
||||
require('assert').fail(`Unknown worker message type ${message.type}`);
|
||||
};
|
||||
}
|
||||
|
||||
// XXX(joyeecheung): this has to be returned as an anonymous function
|
||||
// wrapped in a closure, see the comment of the original
|
||||
// process._fatalException in lib/internal/process/execution.js
|
||||
function createWorkerFatalExeception(port) {
|
||||
const {
|
||||
fatalException: originalFatalException
|
||||
} = require('internal/process/execution');
|
||||
|
||||
return (error) => {
|
||||
debug(`[${threadId}] gets fatal exception`);
|
||||
let caught = false;
|
||||
try {
|
||||
caught = originalFatalException.call(this, error);
|
||||
} catch (e) {
|
||||
error = e;
|
||||
}
|
||||
debug(`[${threadId}] fatal exception caught = ${caught}`);
|
||||
|
||||
if (!caught) {
|
||||
let serialized;
|
||||
try {
|
||||
const { serializeError } = require('internal/error-serdes');
|
||||
serialized = serializeError(error);
|
||||
} catch {}
|
||||
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
|
||||
if (serialized)
|
||||
port.postMessage({
|
||||
type: messageTypes.ERROR_MESSAGE,
|
||||
error: serialized
|
||||
});
|
||||
else
|
||||
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
|
||||
|
||||
const { clearAsyncIdStack } = require('internal/async_hooks');
|
||||
clearAsyncIdStack();
|
||||
|
||||
process.exit();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function setup() {
|
||||
debug(`[${threadId}] is setting up worker child environment`);
|
||||
|
||||
const port = getEnvMessagePort();
|
||||
const publicWorker = require('worker_threads');
|
||||
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
|
||||
port.on('message', createMessageHandler(port));
|
||||
port.start();
|
||||
|
||||
return {
|
||||
|
@ -10,7 +10,6 @@ const {
|
||||
ERR_WORKER_UNSUPPORTED_EXTENSION,
|
||||
} = require('internal/errors').codes;
|
||||
const { validateString } = require('internal/validators');
|
||||
const { clearAsyncIdStack } = require('internal/async_hooks');
|
||||
|
||||
const {
|
||||
drainMessagePort,
|
||||
@ -24,7 +23,7 @@ const {
|
||||
ReadableWorkerStdio,
|
||||
WritableWorkerStdio,
|
||||
} = require('internal/worker/io');
|
||||
const { serializeError, deserializeError } = require('internal/error-serdes');
|
||||
const { deserializeError } = require('internal/error-serdes');
|
||||
const { pathToFileURL } = require('url');
|
||||
|
||||
const {
|
||||
@ -219,77 +218,6 @@ class Worker extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
function createMessageHandler(publicWorker, port, workerStdio) {
|
||||
return function(message) {
|
||||
if (message.type === messageTypes.LOAD_SCRIPT) {
|
||||
const { filename, doEval, workerData, publicPort, hasStdin } = message;
|
||||
publicWorker.parentPort = publicPort;
|
||||
publicWorker.workerData = workerData;
|
||||
|
||||
if (!hasStdin)
|
||||
workerStdio.stdin.push(null);
|
||||
|
||||
debug(`[${threadId}] starts worker script ${filename} ` +
|
||||
`(eval = ${eval}) at cwd = ${process.cwd()}`);
|
||||
port.unref();
|
||||
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
|
||||
if (doEval) {
|
||||
const { evalScript } = require('internal/process/execution');
|
||||
evalScript('[worker eval]', filename);
|
||||
} else {
|
||||
process.argv[1] = filename; // script filename
|
||||
require('module').runMain();
|
||||
}
|
||||
return;
|
||||
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
|
||||
const { stream, chunk, encoding } = message;
|
||||
workerStdio[stream].push(chunk, encoding);
|
||||
return;
|
||||
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
|
||||
const { stream } = message;
|
||||
workerStdio[stream][kStdioWantsMoreDataCallback]();
|
||||
return;
|
||||
}
|
||||
|
||||
assert.fail(`Unknown worker message type ${message.type}`);
|
||||
};
|
||||
}
|
||||
|
||||
function createWorkerFatalExeception(port) {
|
||||
const {
|
||||
fatalException: originalFatalException
|
||||
} = require('internal/process/execution');
|
||||
|
||||
return function(error) {
|
||||
debug(`[${threadId}] gets fatal exception`);
|
||||
let caught = false;
|
||||
try {
|
||||
caught = originalFatalException.call(this, error);
|
||||
} catch (e) {
|
||||
error = e;
|
||||
}
|
||||
debug(`[${threadId}] fatal exception caught = ${caught}`);
|
||||
|
||||
if (!caught) {
|
||||
let serialized;
|
||||
try {
|
||||
serialized = serializeError(error);
|
||||
} catch {}
|
||||
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
|
||||
if (serialized)
|
||||
port.postMessage({
|
||||
type: messageTypes.ERROR_MESSAGE,
|
||||
error: serialized
|
||||
});
|
||||
else
|
||||
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
|
||||
clearAsyncIdStack();
|
||||
|
||||
process.exit();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function pipeWithoutWarning(source, dest) {
|
||||
const sourceMaxListeners = source._maxListeners;
|
||||
const destMaxListeners = dest._maxListeners;
|
||||
@ -303,8 +231,6 @@ function pipeWithoutWarning(source, dest) {
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
createMessageHandler,
|
||||
createWorkerFatalExeception,
|
||||
threadId,
|
||||
Worker,
|
||||
isMainThread
|
||||
|
Loading…
x
Reference in New Issue
Block a user