worker: enable stdio

Provide `stdin`, `stdout` and `stderr` options for the `Worker`
constructor, and make these available to the worker thread
under their usual names.

The default for `stdin` is an empty stream, the default for
`stdout` and `stderr` is redirecting to the parent thread’s
corresponding stdio streams.

PR-URL: https://github.com/nodejs/node/pull/20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
This commit is contained in:
Anna Henningsen 2018-05-13 23:25:14 +02:00
parent 147ea5e3d7
commit ddefa0f2c5
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
4 changed files with 256 additions and 11 deletions

View File

@ -240,7 +240,7 @@ Most Node.js APIs are available inside of it.
Notable differences inside a Worker environment are:
- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][]
properties are set to `null`.
may be redirected by the parent thread.
- The [`require('worker').isMainThread`][] property is set to `false`.
- The [`require('worker').parentPort`][] message port is available,
- [`process.exit()`][] does not stop the whole program, just the single thread,
@ -313,6 +313,13 @@ if (isMainThread) {
described in the [HTML structured clone algorithm][], and an error will be
thrown if the object cannot be cloned (e.g. because it contains
`function`s).
* stdin {boolean} If this is set to `true`, then `worker.stdin` will
provide a writable stream whose contents will appear as `process.stdin`
inside the Worker. By default, no data is provided.
* stdout {boolean} If this is set to `true`, then `worker.stdout` will
not automatically be piped through to `process.stdout` in the parent.
* stderr {boolean} If this is set to `true`, then `worker.stderr` will
not automatically be piped through to `process.stderr` in the parent.
### Event: 'error'
<!-- YAML
@ -377,6 +384,41 @@ Opposite of `unref()`, calling `ref()` on a previously `unref()`ed worker will
behavior). If the worker is `ref()`ed, calling `ref()` again will have
no effect.
### worker.stderr
<!-- YAML
added: REPLACEME
-->
* {stream.Readable}
This is a readable stream which contains data written to [`process.stderr`][]
inside the worker thread. If `stderr: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stderr`][] stream.
### worker.stdin
<!-- YAML
added: REPLACEME
-->
* {null|stream.Writable}
If `stdin: true` was passed to the [`Worker`][] constructor, this is a
writable stream. The data written to this stream will be made available in
the worker thread as [`process.stdin`][].
### worker.stdout
<!-- YAML
added: REPLACEME
-->
* {stream.Readable}
This is a readable stream which contains data written to [`process.stdout`][]
inside the worker thread. If `stdout: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stdout`][] stream.
### worker.terminate([callback])
<!-- YAML
added: REPLACEME

View File

@ -6,7 +6,10 @@ const {
ERR_UNKNOWN_STDIN_TYPE,
ERR_UNKNOWN_STREAM_TYPE
} = require('internal/errors').codes;
const { isMainThread } = require('internal/worker');
const {
isMainThread,
workerStdio
} = require('internal/worker');
exports.setup = setupStdio;
@ -17,8 +20,7 @@ function setupStdio() {
function getStdout() {
if (stdout) return stdout;
if (!isMainThread)
return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
if (!isMainThread) return workerStdio.stdout;
stdout = createWritableStdioStream(1);
stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
@ -34,8 +36,7 @@ function setupStdio() {
function getStderr() {
if (stderr) return stderr;
if (!isMainThread)
return new (require('stream').Writable)({ write(b, e, cb) { cb(); } });
if (!isMainThread) return workerStdio.stderr;
stderr = createWritableStdioStream(2);
stderr.destroySoon = stderr.destroy;
stderr._destroy = function(er, cb) {
@ -51,8 +52,7 @@ function setupStdio() {
function getStdin() {
if (stdin) return stdin;
if (!isMainThread)
return new (require('stream').Readable)({ read() { this.push(null); } });
if (!isMainThread) return workerStdio.stdin;
const tty_wrap = process.binding('tty_wrap');
const fd = 0;

View File

@ -5,6 +5,7 @@ const EventEmitter = require('events');
const assert = require('assert');
const path = require('path');
const util = require('util');
const { Readable, Writable } = require('stream');
const {
ERR_INVALID_ARG_TYPE,
ERR_WORKER_NEED_ABSOLUTE_PATH,
@ -29,6 +30,7 @@ const isMainThread = threadId === 0;
const kOnMessageListener = Symbol('kOnMessageListener');
const kHandle = Symbol('kHandle');
const kName = Symbol('kName');
const kPort = Symbol('kPort');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
@ -36,6 +38,12 @@ const kOnExit = Symbol('kOnExit');
const kOnMessage = Symbol('kOnMessage');
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
const kOnErrorMessage = Symbol('kOnErrorMessage');
const kParentSideStdio = Symbol('kParentSideStdio');
const kWritableCallbacks = Symbol('kWritableCallbacks');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kStartedReading = Symbol('kStartedReading');
const kWaitingStreams = Symbol('kWaitingStreams');
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const debug = util.debuglog('worker');
@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
}
class ReadableWorkerStdio extends Readable {
constructor(port, name) {
super();
this[kPort] = port;
this[kName] = name;
this[kIncrementsPortRef] = true;
this[kStartedReading] = false;
this.on('end', () => {
if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
this[kPort].unref();
});
}
_read() {
if (!this[kStartedReading] && this[kIncrementsPortRef]) {
this[kStartedReading] = true;
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
this[kPort].postMessage({
type: 'stdioWantsMoreData',
stream: this[kName]
});
}
}
class WritableWorkerStdio extends Writable {
constructor(port, name) {
super({ decodeStrings: false });
this[kPort] = port;
this[kName] = name;
this[kWritableCallbacks] = [];
}
_write(chunk, encoding, cb) {
this[kPort].postMessage({
type: 'stdioPayload',
stream: this[kName],
chunk,
encoding
});
this[kWritableCallbacks].push(cb);
if (this[kPort][kWaitingStreams]++ === 0)
this[kPort].ref();
}
_final(cb) {
this[kPort].postMessage({
type: 'stdioPayload',
stream: this[kName],
chunk: null
});
cb();
}
[kStdioWantsMoreDataCallback]() {
const cbs = this[kWritableCallbacks];
this[kWritableCallbacks] = [];
for (const cb of cbs)
cb();
if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
this[kPort].unref();
}
}
class Worker extends EventEmitter {
constructor(filename, options = {}) {
super();
@ -154,8 +228,25 @@ class Worker extends EventEmitter {
this[kPort].on('message', (data) => this[kOnMessage](data));
this[kPort].start();
this[kPort].unref();
this[kPort][kWaitingStreams] = 0;
debug(`[${threadId}] created Worker with ID ${this.threadId}`);
let stdin = null;
if (options.stdin)
stdin = new WritableWorkerStdio(this[kPort], 'stdin');
const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
if (!options.stdout) {
stdout[kIncrementsPortRef] = false;
pipeWithoutWarning(stdout, process.stdout);
}
const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
if (!options.stderr) {
stderr[kIncrementsPortRef] = false;
pipeWithoutWarning(stderr, process.stderr);
}
this[kParentSideStdio] = { stdin, stdout, stderr };
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
@ -165,7 +256,8 @@ class Worker extends EventEmitter {
filename,
doEval: !!options.eval,
workerData: options.workerData,
publicPort: port2
publicPort: port2,
hasStdin: !!options.stdin
}, [port2]);
// Actually start the new thread now that everything is in place.
this[kHandle].startThread();
@ -197,6 +289,16 @@ class Worker extends EventEmitter {
return this[kOnCouldNotSerializeErr]();
case 'errorMessage':
return this[kOnErrorMessage](message.error);
case 'stdioPayload':
{
const { stream, chunk, encoding } = message;
return this[kParentSideStdio][stream].push(chunk, encoding);
}
case 'stdioWantsMoreData':
{
const { stream } = message;
return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
}
}
assert.fail(`Unknown worker message type ${message.type}`);
@ -207,6 +309,18 @@ class Worker extends EventEmitter {
this[kHandle] = null;
this[kPort] = null;
this[kPublicPort] = null;
const { stdout, stderr } = this[kParentSideStdio];
this[kParentSideStdio] = null;
if (!stdout._readableState.ended) {
debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
stdout.push(null);
}
if (!stderr._readableState.ended) {
debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
stderr.push(null);
}
}
postMessage(...args) {
@ -243,6 +357,27 @@ class Worker extends EventEmitter {
return this[kHandle].threadId;
}
get stdin() {
return this[kParentSideStdio].stdin;
}
get stdout() {
return this[kParentSideStdio].stdout;
}
get stderr() {
return this[kParentSideStdio].stderr;
}
}
const workerStdio = {};
if (!isMainThread) {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
}
let originalFatalException;
@ -256,10 +391,14 @@ function setupChild(evalScript) {
port.on('message', (message) => {
if (message.type === 'loadScript') {
const { filename, doEval, workerData, publicPort } = message;
const { filename, doEval, workerData, publicPort, hasStdin } = message;
publicWorker.parentPort = publicPort;
setupPortReferencing(publicPort, publicPort, 'message');
publicWorker.workerData = workerData;
if (!hasStdin)
workerStdio.stdin.push(null);
debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
@ -271,6 +410,14 @@ function setupChild(evalScript) {
require('module').runMain();
}
return;
} else if (message.type === 'stdioPayload') {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === 'stdioWantsMoreData') {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}
assert.fail(`Unknown worker message type ${message.type}`);
@ -317,11 +464,24 @@ function deserializeError(error) {
error.byteLength).toString('utf8');
}
function pipeWithoutWarning(source, dest) {
const sourceMaxListeners = source._maxListeners;
const destMaxListeners = dest._maxListeners;
source.setMaxListeners(Infinity);
dest.setMaxListeners(Infinity);
source.pipe(dest);
source._maxListeners = sourceMaxListeners;
dest._maxListeners = destMaxListeners;
}
module.exports = {
MessagePort,
MessageChannel,
threadId,
Worker,
setupChild,
isMainThread
isMainThread,
workerStdio
};

View File

@ -0,0 +1,43 @@
// Flags: --experimental-worker
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs');
const util = require('util');
const { Writable } = require('stream');
const { Worker, isMainThread } = require('worker');
class BufferingWritable extends Writable {
constructor() {
super();
this.chunks = [];
}
_write(chunk, enc, cb) {
this.chunks.push(chunk);
cb();
}
get buffer() {
return Buffer.concat(this.chunks);
}
}
if (isMainThread) {
const original = new BufferingWritable();
const passed = new BufferingWritable();
const w = new Worker(__filename, { stdin: true, stdout: true });
const source = fs.createReadStream(process.execPath);
source.pipe(w.stdin);
source.pipe(original);
w.stdout.pipe(passed);
passed.on('finish', common.mustCall(() => {
assert.strictEqual(original.buffer.compare(passed.buffer), 0,
`Original: ${util.inspect(original.buffer)}, ` +
`Actual: ${util.inspect(passed.buffer)}`);
}));
} else {
process.stdin.pipe(process.stdout);
}