cluster: support options in Worker constructor

This commit moves some common Worker code into the constructor
via support for an options argument.

Reviewed-By: Fedor Indutny <fedor@indutny.com>
This commit is contained in:
cjihrig 2014-07-30 22:20:52 -04:00 committed by Fedor Indutny
parent 4b59db008c
commit d287b8e58a
2 changed files with 103 additions and 40 deletions

View File

@ -35,12 +35,24 @@ cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
cluster.isMaster = (cluster.isWorker === false);
function Worker() {
if (!(this instanceof Worker)) return new Worker;
function Worker(options) {
if (!(this instanceof Worker))
return new Worker(options);
EventEmitter.call(this);
if (!util.isObject(options))
options = {};
this.suicide = undefined;
this.state = 'none';
this.id = 0;
this.state = options.state || 'none';
this.id = options.id | 0;
if (options.process) {
this.process = options.process;
this.process.on('error', this.emit.bind(this, 'error'));
this.process.on('message', this.emit.bind(this, 'message'));
}
}
util.inherits(Worker, EventEmitter);
@ -190,26 +202,6 @@ if (cluster.isMaster)
else
workerInit();
function createWorkerExecArgv(masterExecArgv, worker) {
var args = masterExecArgv.slice();
var debugPort = process.debugPort + worker.id;
var hasDebugArg = false;
for (var i = 0; i < args.length; i++) {
var match = args[i].match(/^(--debug|--debug-brk)(=\d+)?$/);
if (!match) continue;
args[i] = match[1] + '=' + debugPort;
hasDebugArg = true;
}
if (!hasDebugArg)
args = ['--debug-port=' + debugPort].concat(args);
return args;
}
function masterInit() {
cluster.workers = {};
@ -278,21 +270,46 @@ function masterInit() {
});
};
var ids = 0;
cluster.fork = function(env) {
cluster.setupMaster();
var worker = new Worker;
worker.id = ++ids;
function createWorkerProcess(id, env) {
var workerEnv = util._extend({}, process.env);
var execArgv = cluster.settings.execArgv.slice();
var debugPort = process.debugPort + id;
var hasDebugArg = false;
workerEnv = util._extend(workerEnv, env);
workerEnv.NODE_UNIQUE_ID = '' + worker.id;
worker.process = fork(cluster.settings.exec, cluster.settings.args, {
workerEnv.NODE_UNIQUE_ID = '' + id;
for (var i = 0; i < execArgv.length; i++) {
var match = execArgv[i].match(/^(--debug|--debug-brk)(=\d+)?$/);
if (match) {
execArgv[i] = match[1] + '=' + debugPort;
hasDebugArg = true;
}
}
if (!hasDebugArg)
execArgv = ['--debug-port=' + debugPort].concat(execArgv);
return fork(cluster.settings.exec, cluster.settings.args, {
env: workerEnv,
silent: cluster.settings.silent,
execArgv: createWorkerExecArgv(cluster.settings.execArgv, worker),
execArgv: execArgv,
gid: cluster.settings.gid,
uid: cluster.settings.uid
});
}
var ids = 0;
cluster.fork = function(env) {
cluster.setupMaster();
var id = ++ids;
var workerProcess = createWorkerProcess(id, env);
var worker = new Worker({
id: id,
process: workerProcess
});
worker.process.once('exit', function(exitCode, signalCode) {
worker.suicide = !!worker.suicide;
worker.state = 'dead';
@ -307,8 +324,6 @@ function masterInit() {
cluster.emit('disconnect', worker);
delete cluster.workers[worker.id];
});
worker.process.on('error', worker.emit.bind(worker, 'error'));
worker.process.on('message', worker.emit.bind(worker, 'message'));
worker.process.on('internalMessage', internal(worker, onmessage));
process.nextTick(function() {
cluster.emit('fork', worker);
@ -447,11 +462,12 @@ function workerInit() {
// Called from src/node.js
cluster._setupWorker = function() {
var worker = new Worker;
var worker = new Worker({
id: +process.env.NODE_UNIQUE_ID | 0,
process: process,
state: 'online'
});
cluster.worker = worker;
worker.id = +process.env.NODE_UNIQUE_ID | 0;
worker.state = 'online';
worker.process = process;
process.once('disconnect', function() {
if (!worker.suicide) {
// Unexpected disconnect, master exited, or some such nastiness, so
@ -459,8 +475,6 @@ function workerInit() {
process.exit(0);
}
});
worker.process.on('error', worker.emit.bind(worker, 'error'));
worker.process.on('message', worker.emit.bind(worker, 'message'));
process.on('internalMessage', internal(worker, onmessage));
send({ act: 'online' });
function onmessage(message, handle) {

View File

@ -0,0 +1,49 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// test-cluster-worker-constructor.js
// validates correct behavior of the cluster.Worker constructor
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var worker;
worker = new cluster.Worker();
assert.equal(worker.suicide, undefined);
assert.equal(worker.state, 'none');
assert.equal(worker.id, 0);
assert.equal(worker.process, undefined);
worker = new cluster.Worker({
id: 3,
state: 'online',
process: process
});
assert.equal(worker.suicide, undefined);
assert.equal(worker.state, 'online');
assert.equal(worker.id, 3);
assert.equal(worker.process, process);
worker = cluster.Worker.call({}, {id: 5});
assert(worker instanceof cluster.Worker);
assert.equal(worker.id, 5);