diff --git a/doc/api/cluster.markdown b/doc/api/cluster.markdown index 809efa5ed9b..c8bccec217a 100644 --- a/doc/api/cluster.markdown +++ b/doc/api/cluster.markdown @@ -53,14 +53,28 @@ The worker processes are spawned using the `child_process.fork` method, so that they can communicate with the parent via IPC and pass server handles back and forth. -When you call `server.listen(...)` in a worker, it serializes the -arguments and passes the request to the master process. If the master -process already has a listening server matching the worker's -requirements, then it passes the handle to the worker. If it does not -already have a listening server matching that requirement, then it will -create one, and pass the handle to the child. +The cluster module supports two methods of distributing incoming +connections. -This causes potentially surprising behavior in three edge cases: +The first one (and the default one on all platforms except Windows), +is the round-robin approach, where the master process listens on a +port, accepts new connections and distributes them across the workers +in a round-robin fashion, with some built-in smarts to avoid +overloading a worker process. + +The second approach is where the master process creates the listen +socket and sends it to interested workers. The workers then accept +incoming connections directly. + +The second approach should, in theory, give the best performance. +In practice however, distribution tends to be very unbalanced due +to operating system scheduler vagaries. Loads have been observed +where over 70% of all connections ended up in just two processes, +out of a total of eight. + +Because `server.listen()` hands off most of the work to the master +process, there are three cases where the behavior between a normal +node.js process and a cluster worker differs: 1. `server.listen({fd: 7})` Because the message is passed to the master, file descriptor 7 **in the parent** will be listened on, and the @@ -77,12 +91,10 @@ This causes potentially surprising behavior in three edge cases: want to listen on a unique port, generate a port number based on the cluster worker ID. -When multiple processes are all `accept()`ing on the same underlying -resource, the operating system load-balances across them very -efficiently. There is no routing logic in Node.js, or in your program, -and no shared state between the workers. Therefore, it is important to -design your program such that it does not rely too heavily on in-memory -data objects for things like sessions and login. +There is no routing logic in Node.js, or in your program, and no shared +state between the workers. Therefore, it is important to design your +program such that it does not rely too heavily on in-memory data objects +for things like sessions and login. Because workers are all separate processes, they can be killed or re-spawned depending on your program's needs, without affecting other @@ -91,6 +103,21 @@ continue to accept connections. Node does not automatically manage the number of workers for you, however. It is your responsibility to manage the worker pool for your application's needs. +## cluster.schedulingPolicy + +The scheduling policy, either `cluster.SCHED_RR` for round-robin or +`cluster.SCHED_NONE` to leave it to the operating system. This is a +global setting and effectively frozen once you spawn the first worker +or call `cluster.setupMaster()`, whatever comes first. + +`SCHED_RR` is the default on all operating systems except Windows. +Windows will change to `SCHED_RR` once libuv is able to effectively +distribute IOCP handles without incurring a large performance hit. + +`cluster.schedulingPolicy` can also be set through the +`NODE_CLUSTER_SCHED_POLICY` environment variable. Valid +values are `"rr"` and `"none"`. + ## cluster.settings * {Object} diff --git a/lib/cluster.js b/lib/cluster.js index 45b1224eb08..11ccbac5cdc 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -25,6 +25,8 @@ var dgram = require('dgram'); var fork = require('child_process').fork; var net = require('net'); var util = require('util'); +var SCHED_NONE = 1; +var SCHED_RR = 2; var cluster = new EventEmitter; module.exports = cluster; @@ -52,6 +54,121 @@ Worker.prototype.send = function() { // Master/worker specific methods are defined in the *Init() functions. +function SharedHandle(key, address, port, addressType, backlog, fd) { + this.key = key; + this.errno = ''; + this.workers = []; + + if (addressType === 'udp4' || addressType === 'udp6') + this.handle = dgram._createSocketHandle(address, port, addressType, fd); + else + this.handle = net._createServerHandle(address, port, addressType, fd); + + this.errno = this.handle ? '' : process._errno; +} + +SharedHandle.prototype.add = function(worker, send) { + assert(this.workers.indexOf(worker) === -1); + this.workers.push(worker); + send(this.errno, null, this.handle); +}; + +SharedHandle.prototype.remove = function(worker) { + var index = this.workers.indexOf(worker); + assert(index !== -1); + this.workers.splice(index, 1); + if (this.workers.length !== 0) return false; + this.handle.close(); + this.handle = null; + return true; +}; + + +// Start a round-robin server. Master accepts connections and distributes +// them over the workers. +function RoundRobinHandle(key, address, port, addressType, backlog, fd) { + this.key = key; + this.all = {}; + this.free = []; + this.handles = []; + this.handle = null; + this.server = net.createServer(assert.fail); + + if (fd >= 0) + this.server.listen({ fd: fd }); + else if (port >= 0) + this.server.listen(port, address); + else + this.server.listen(address); // UNIX socket path. + + var self = this; + this.server.once('listening', function() { + self.handle = self.server._handle; + self.handle.onconnection = self.distribute.bind(self); + self.server._handle = null; + self.server = null; + }); +} + +RoundRobinHandle.prototype.add = function(worker, send) { + assert(worker.id in this.all === false); + this.all[worker.id] = worker; + + var self = this; + function done() { + if (self.handle.getsockname) + send(null, { sockname: self.handle.getsockname() }, null); + else + send(null, null, null); // UNIX socket. + self.handoff(worker); // In case there are connections pending. + } + + if (this.server === null) return done(); + // Still busy binding. + this.server.once('listening', done); + this.server.once('error', function(err) { + send(err.errno, null); + }); +}; + +RoundRobinHandle.prototype.remove = function(worker) { + if (worker.id in this.all === false) return false; + delete this.all[worker.id]; + var index = this.free.indexOf(worker); + if (index !== -1) this.free.splice(index, 1); + if (Object.getOwnPropertyNames(this.all).length !== 0) return false; + for (var handle; handle = this.handles.shift(); handle.close()); + this.handle.close(); + this.handle = null; + return true; +}; + +RoundRobinHandle.prototype.distribute = function(handle) { + this.handles.push(handle); + var worker = this.free.shift(); + if (worker) this.handoff(worker); +}; + +RoundRobinHandle.prototype.handoff = function(worker) { + if (worker.id in this.all === false) { + return; // Worker is closing (or has closed) the server. + } + var handle = this.handles.shift(); + if (typeof handle === 'undefined') { + this.free.push(worker); // Add to ready queue again. + return; + } + var message = { act: 'newconn', key: this.key }; + var self = this; + sendHelper(worker.process, message, handle, function(reply) { + if (reply.accepted) + handle.close(); + else + self.distribute(handle); // Worker is shutting down. Send to another. + self.handoff(worker); + }); +}; + if (cluster.isMaster) masterInit(); @@ -90,11 +207,25 @@ function masterInit() { }; cluster.settings = settings; - // Indexed by address:port:etc key. Its entries are dicts with handle and - // workers keys. That second one is a list of workers that hold a reference - // to the handle. When a worker dies, we scan the dicts and close the handle - // when its reference count drops to zero. Yes, that means we're doing an - // O(n*m) scan but n and m are small and worker deaths are rare events anyway. + // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? + var schedulingPolicy = { + 'none': SCHED_NONE, + 'rr': SCHED_RR + }[process.env.NODE_CLUSTER_SCHED_POLICY]; + + if (typeof schedulingPolicy === 'undefined') { + // FIXME Round-robin doesn't perform well on Windows right now due to the + // way IOCP is wired up. Bert is going to fix that, eventually. + schedulingPolicy = (process.platform === 'win32') ? SCHED_NONE : SCHED_RR; + } + + cluster.schedulingPolicy = schedulingPolicy; + cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. + cluster.SCHED_RR = SCHED_RR; // Master distributes connections. + + // Keyed on address:port:etc. When a worker dies, we walk over the handles + // and remove() the worker from each one. remove() may do a linear scan + // itself so we might end up with an O(n*m) operation. Ergo, FIXME. var handles = {}; var initialized = false; @@ -111,6 +242,9 @@ function masterInit() { { settings.execArgv = settings.execArgv.concat(['--logfile=v8-%p.log']); } + schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. + assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, + 'Bad cluster.schedulingPolicy: ' + schedulingPolicy); cluster.settings = settings; process.on('internalMessage', function(message) { @@ -169,15 +303,9 @@ function masterInit() { cluster.on('disconnect', function(worker) { delete cluster.workers[worker.id]; - // O(n*m) scan but for small values of n and m. for (var key in handles) { - var e = handles[key]; - var i = e.workers.indexOf(worker); - if (i === -1) continue; - e.workers.splice(i, 1); - if (e.workers.length !== 0) continue; - e.handle.close(); - delete handles[key]; + var handle = handles[key]; + if (handle.remove(worker)) delete handles[key]; } if (Object.keys(handles).length === 0) { intercom.emit('disconnect'); @@ -210,6 +338,8 @@ function masterInit() { listening(worker, message); else if (message.act === 'suicide') worker.suicide = true; + else if (message.act === 'close') + close(worker, message); } function online(worker) { @@ -224,17 +354,32 @@ function masterInit() { message.addressType, message.fd]; var key = args.join(':'); - var e = handles[key]; - if (typeof e === 'undefined') { - e = { workers: [] }; - if (message.addressType === 'udp4' || message.addressType === 'udp6') - e.handle = dgram._createSocketHandle.apply(null, args); - else - e.handle = net._createServerHandle.apply(null, args); - handles[key] = e; + var handle = handles[key]; + if (typeof handle === 'undefined') { + var constructor = RoundRobinHandle; + // UDP is exempt from round-robin connection balancing for what should + // be obvious reasons: it's connectionless. There is nothing to send to + // the workers except raw datagrams and that's pointless. + if (schedulingPolicy !== SCHED_RR || + message.addressType === 'udp4' || + message.addressType === 'udp6') { + constructor = SharedHandle; + } + handles[key] = handle = new constructor(key, + message.address, + message.port, + message.addressType, + message.backlog, + message.fd); } - e.workers.push(worker); - send(worker, { ack: message.seq }, e.handle); + handle.add(worker, function(errno, reply, handle) { + reply = util._extend({ ack: message.seq, key: key }, reply); + if (errno) { + reply.errno = errno; + delete handles[key]; // Gives other workers a chance to retry. + } + send(worker, reply, handle); + }); } function listening(worker, message) { @@ -249,6 +394,13 @@ function masterInit() { cluster.emit('listening', worker, info); } + // Round-robin only. Server in worker is closing, remove from list. + function close(worker, message) { + var key = message.key; + var handle = handles[key]; + if (handle.remove(worker)) delete handles[key]; + } + function send(worker, message, handle, cb) { sendHelper(worker.process, message, handle, cb); } @@ -256,7 +408,7 @@ function masterInit() { function workerInit() { - var handles = []; + var handles = {}; // Called from src/node.js cluster._setupWorker = function() { @@ -269,7 +421,10 @@ function workerInit() { process.on('internalMessage', internal(worker, onmessage)); send({ act: 'online' }); function onmessage(message, handle) { - if (message.act === 'disconnect') worker.disconnect(); + if (message.act === 'newconn') + onconnection(message, handle); + else if (message.act === 'disconnect') + worker.disconnect(); } }; @@ -282,28 +437,108 @@ function workerInit() { act: 'queryServer', fd: fd }; - send(message, function(_, handle) { - // Monkey-patch the close() method so we can keep track of when it's - // closed. Avoids resource leaks when the handle is short-lived. - var close = handle.close; - handle.close = function() { - var index = handles.indexOf(handle); - if (index !== -1) handles.splice(index, 1); - return close.apply(this, arguments); - }; - handles.push(handle); - cb(handle); + send(message, function(reply, handle) { + if (handle) + shared(reply, handle, cb); // Shared listen socket. + else + rr(reply, cb); // Round-robin. }); obj.once('listening', function() { cluster.worker.state = 'listening'; + var address = obj.address(); message.act = 'listening'; - message.port = obj.address().port || port; + message.port = address && address.port || port, send(message); }); }; + // Shared listen socket. + function shared(message, handle, cb) { + var key = message.key; + // Monkey-patch the close() method so we can keep track of when it's + // closed. Avoids resource leaks when the handle is short-lived. + var close = handle.close; + handle.close = function() { + delete handles[key]; + return close.apply(this, arguments); + }; + assert(typeof handles[key] === 'undefined'); + handles[key] = handle; + cb(handle); + } + + // Round-robin. Master distributes handles across workers. + function rr(message, cb) { + if (message.errno) + onerror(message, cb); + else + onsuccess(message, cb); + + function onerror(message, cb) { + function listen(backlog) { + process._errno = message.errno; + return -1; + } + function close() { + } + cb({ close: close, listen: listen }); + } + + function onsuccess(message, cb) { + var key = message.key; + function listen(backlog) { + // TODO(bnoordhuis) Send a message to the master that tells it to + // update the backlog size. The actual backlog should probably be + // the largest requested size by any worker. + return 0; + } + function close() { + // lib/net.js treats server._handle.close() as effectively synchronous. + // That means there is a time window between the call to close() and + // the ack by the master process in which we can still receive handles. + // onconnection() below handles that by sending those handles back to + // the master. + if (typeof key === 'undefined') return; + send({ act: 'close', key: key }); + delete handles[key]; + key = undefined; + } + function getsockname() { + var rv = {}; + if (key) return util._extend(rv, message.sockname); + return rv; + } + // Faux handle. Mimics a TCPWrap with just enough fidelity to get away + // with it. Fools net.Server into thinking that it's backed by a real + // handle. + var handle = { + close: close, + listen: listen + }; + if (message.sockname) { + handle.getsockname = getsockname; // TCP handles only. + } + assert(typeof handles[key] === 'undefined'); + handles[key] = handle; + cb(handle); + } + } + + // Round-robin connection. + function onconnection(message, handle) { + var key = message.key; + var server = handles[key]; + var accepted = (typeof server !== 'undefined'); + send({ ack: message.seq, accepted: accepted }); + if (accepted) server.onconnection(handle); + } + Worker.prototype.disconnect = function() { - for (var handle; handle = handles.shift(); handle.close()); + for (var key in handles) { + var handle = handles[key]; + delete handles[key]; + handle.close(); + } process.disconnect(); };