cluster: allow shared reused dgram sockets
Allow listening on reused dgram ports in cluster workers. Fix: https://github.com/joyent/node/issues/9261 PR-URL: https://github.com/nodejs/node/pull/2548 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
parent
154d3f5865
commit
c7be08cec1
@ -57,7 +57,7 @@ Worker.prototype.isConnected = function isConnected() {
|
|||||||
|
|
||||||
// Master/worker specific methods are defined in the *Init() functions.
|
// Master/worker specific methods are defined in the *Init() functions.
|
||||||
|
|
||||||
function SharedHandle(key, address, port, addressType, backlog, fd) {
|
function SharedHandle(key, address, port, addressType, backlog, fd, flags) {
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.workers = [];
|
this.workers = [];
|
||||||
this.handle = null;
|
this.handle = null;
|
||||||
@ -66,7 +66,7 @@ function SharedHandle(key, address, port, addressType, backlog, fd) {
|
|||||||
// FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
|
// FIXME(bnoordhuis) Polymorphic return type for lack of a better solution.
|
||||||
var rval;
|
var rval;
|
||||||
if (addressType === 'udp4' || addressType === 'udp6')
|
if (addressType === 'udp4' || addressType === 'udp6')
|
||||||
rval = dgram._createSocketHandle(address, port, addressType, fd);
|
rval = dgram._createSocketHandle(address, port, addressType, fd, flags);
|
||||||
else
|
else
|
||||||
rval = net._createServerHandle(address, port, addressType, fd);
|
rval = net._createServerHandle(address, port, addressType, fd);
|
||||||
|
|
||||||
@ -438,7 +438,8 @@ function masterInit() {
|
|||||||
var args = [message.address,
|
var args = [message.address,
|
||||||
message.port,
|
message.port,
|
||||||
message.addressType,
|
message.addressType,
|
||||||
message.fd];
|
message.fd,
|
||||||
|
message.index];
|
||||||
var key = args.join(':');
|
var key = args.join(':');
|
||||||
var handle = handles[key];
|
var handle = handles[key];
|
||||||
if (handle === undefined) {
|
if (handle === undefined) {
|
||||||
@ -456,7 +457,8 @@ function masterInit() {
|
|||||||
message.port,
|
message.port,
|
||||||
message.addressType,
|
message.addressType,
|
||||||
message.backlog,
|
message.backlog,
|
||||||
message.fd);
|
message.fd,
|
||||||
|
message.flags);
|
||||||
}
|
}
|
||||||
if (!handle.data) handle.data = message.data;
|
if (!handle.data) handle.data = message.data;
|
||||||
|
|
||||||
@ -485,7 +487,7 @@ function masterInit() {
|
|||||||
cluster.emit('listening', worker, info);
|
cluster.emit('listening', worker, info);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Round-robin only. Server in worker is closing, remove from list.
|
// Server in worker is closing, remove from list.
|
||||||
function close(worker, message) {
|
function close(worker, message) {
|
||||||
var key = message.key;
|
var key = message.key;
|
||||||
var handle = handles[key];
|
var handle = handles[key];
|
||||||
@ -500,6 +502,7 @@ function masterInit() {
|
|||||||
|
|
||||||
function workerInit() {
|
function workerInit() {
|
||||||
var handles = {};
|
var handles = {};
|
||||||
|
var indexes = {};
|
||||||
|
|
||||||
// Called from src/node.js
|
// Called from src/node.js
|
||||||
cluster._setupWorker = function() {
|
cluster._setupWorker = function() {
|
||||||
@ -528,15 +531,22 @@ function workerInit() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// obj is a net#Server or a dgram#Socket object.
|
// obj is a net#Server or a dgram#Socket object.
|
||||||
cluster._getServer = function(obj, address, port, addressType, fd, cb) {
|
cluster._getServer = function(obj, options, cb) {
|
||||||
var message = {
|
const key = [ options.address,
|
||||||
addressType: addressType,
|
options.port,
|
||||||
address: address,
|
options.addressType,
|
||||||
port: port,
|
options.fd ].join(':');
|
||||||
|
if (indexes[key] === undefined)
|
||||||
|
indexes[key] = 0;
|
||||||
|
else
|
||||||
|
indexes[key]++;
|
||||||
|
|
||||||
|
const message = util._extend({
|
||||||
act: 'queryServer',
|
act: 'queryServer',
|
||||||
fd: fd,
|
index: indexes[key],
|
||||||
data: null
|
data: null
|
||||||
};
|
}, options);
|
||||||
|
|
||||||
// Set custom data on handle (i.e. tls tickets key)
|
// Set custom data on handle (i.e. tls tickets key)
|
||||||
if (obj._getServerData) message.data = obj._getServerData();
|
if (obj._getServerData) message.data = obj._getServerData();
|
||||||
send(message, function(reply, handle) {
|
send(message, function(reply, handle) {
|
||||||
@ -549,9 +559,9 @@ function workerInit() {
|
|||||||
});
|
});
|
||||||
obj.once('listening', function() {
|
obj.once('listening', function() {
|
||||||
cluster.worker.state = 'listening';
|
cluster.worker.state = 'listening';
|
||||||
var address = obj.address();
|
const address = obj.address();
|
||||||
message.act = 'listening';
|
message.act = 'listening';
|
||||||
message.port = address && address.port || port;
|
message.port = address && address.port || options.port;
|
||||||
send(message);
|
send(message);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -563,6 +573,7 @@ function workerInit() {
|
|||||||
// closed. Avoids resource leaks when the handle is short-lived.
|
// closed. Avoids resource leaks when the handle is short-lived.
|
||||||
var close = handle.close;
|
var close = handle.close;
|
||||||
handle.close = function() {
|
handle.close = function() {
|
||||||
|
send({ act: 'close', key: key });
|
||||||
delete handles[key];
|
delete handles[key];
|
||||||
return close.apply(this, arguments);
|
return close.apply(this, arguments);
|
||||||
};
|
};
|
||||||
|
23
lib/dgram.js
23
lib/dgram.js
@ -60,14 +60,14 @@ function newHandle(type) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
exports._createSocketHandle = function(address, port, addressType, fd) {
|
exports._createSocketHandle = function(address, port, addressType, fd, flags) {
|
||||||
// Opening an existing fd is not supported for UDP handles.
|
// Opening an existing fd is not supported for UDP handles.
|
||||||
assert(typeof fd !== 'number' || fd < 0);
|
assert(typeof fd !== 'number' || fd < 0);
|
||||||
|
|
||||||
var handle = newHandle(addressType);
|
var handle = newHandle(addressType);
|
||||||
|
|
||||||
if (port || address) {
|
if (port || address) {
|
||||||
var err = handle.bind(address, port || 0, 0);
|
var err = handle.bind(address, port || 0, flags);
|
||||||
if (err) {
|
if (err) {
|
||||||
handle.close();
|
handle.close();
|
||||||
return err;
|
return err;
|
||||||
@ -176,8 +176,12 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
|
|||||||
if (!cluster)
|
if (!cluster)
|
||||||
cluster = require('cluster');
|
cluster = require('cluster');
|
||||||
|
|
||||||
|
var flags = 0;
|
||||||
|
if (self._reuseAddr)
|
||||||
|
flags |= constants.UV_UDP_REUSEADDR;
|
||||||
|
|
||||||
if (cluster.isWorker && !exclusive) {
|
if (cluster.isWorker && !exclusive) {
|
||||||
cluster._getServer(self, ip, port, self.type, -1, function(err, handle) {
|
function onHandle(err, handle) {
|
||||||
if (err) {
|
if (err) {
|
||||||
var ex = exceptionWithHostPort(err, 'bind', ip, port);
|
var ex = exceptionWithHostPort(err, 'bind', ip, port);
|
||||||
self.emit('error', ex);
|
self.emit('error', ex);
|
||||||
@ -191,16 +195,19 @@ Socket.prototype.bind = function(port /*, address, callback*/) {
|
|||||||
|
|
||||||
replaceHandle(self, handle);
|
replaceHandle(self, handle);
|
||||||
startListening(self);
|
startListening(self);
|
||||||
});
|
}
|
||||||
|
cluster._getServer(self, {
|
||||||
|
address: ip,
|
||||||
|
port: port,
|
||||||
|
addressType: self.type,
|
||||||
|
fd: -1,
|
||||||
|
flags: flags
|
||||||
|
}, onHandle);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (!self._handle)
|
if (!self._handle)
|
||||||
return; // handle has been closed in the mean time
|
return; // handle has been closed in the mean time
|
||||||
|
|
||||||
var flags = 0;
|
|
||||||
if (self._reuseAddr)
|
|
||||||
flags |= constants.UV_UDP_REUSEADDR;
|
|
||||||
|
|
||||||
var err = self._handle.bind(ip, port || 0, flags);
|
var err = self._handle.bind(ip, port || 0, flags);
|
||||||
if (err) {
|
if (err) {
|
||||||
var ex = exceptionWithHostPort(err, 'bind', ip, port);
|
var ex = exceptionWithHostPort(err, 'bind', ip, port);
|
||||||
|
@ -1268,7 +1268,13 @@ function listen(self, address, port, addressType, backlog, fd, exclusive) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster._getServer(self, address, port, addressType, fd, cb);
|
cluster._getServer(self, {
|
||||||
|
address: address,
|
||||||
|
port: port,
|
||||||
|
addressType: addressType,
|
||||||
|
fd: fd,
|
||||||
|
flags: 0
|
||||||
|
}, cb);
|
||||||
|
|
||||||
function cb(err, handle) {
|
function cb(err, handle) {
|
||||||
// EADDRINUSE may not be reported until we call listen(). To complicate
|
// EADDRINUSE may not be reported until we call listen(). To complicate
|
||||||
|
40
test/parallel/test-cluster-dgram-reuse.js
Normal file
40
test/parallel/test-cluster-dgram-reuse.js
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const cluster = require('cluster');
|
||||||
|
const dgram = require('dgram');
|
||||||
|
|
||||||
|
if (common.isWindows) {
|
||||||
|
console.log('1..0 # Skipped: dgram clustering is currently not supported ' +
|
||||||
|
'on windows.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster.isMaster) {
|
||||||
|
cluster.fork().on('exit', function(code) {
|
||||||
|
assert.equal(code, 0);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sockets = [];
|
||||||
|
function next() {
|
||||||
|
sockets.push(this);
|
||||||
|
if (sockets.length !== 2)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Work around health check issue
|
||||||
|
process.nextTick(function() {
|
||||||
|
for (var i = 0; i < sockets.length; i++)
|
||||||
|
sockets[i].close(close);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
var waiting = 2;
|
||||||
|
function close() {
|
||||||
|
if (--waiting === 0)
|
||||||
|
cluster.worker.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (var i = 0; i < 2; i++)
|
||||||
|
dgram.createSocket({ type: 'udp4', reuseAddr: true }).bind(common.PORT, next);
|
Loading…
x
Reference in New Issue
Block a user