From dceebbfa31a2b82bb534ba120ef9354a7d348f65 Mon Sep 17 00:00:00 2001 From: Andreas Madsen Date: Thu, 12 Apr 2012 09:23:07 +0200 Subject: [PATCH] child_process: allow sending a net Socket and Server object using child.send child_process.fork() support sending native hander object, this patch add support for sending net.Server and net.Socket object by converting the object to a native handle object and back to a useful object again. Note when sending a Socket there was emitted by a net Server object, the server.connections property becomes null, because it is no longer possible to known when it is destroyed. --- doc/api/child_process.markdown | 76 +++++- doc/api/net.markdown | 4 + lib/child_process.js | 272 ++++++++++++++++++-- lib/net.js | 46 +++- test/simple/test-child-process-fork-net.js | 198 ++++++++++++++ test/simple/test-child-process-fork-net2.js | 131 ++++++++++ 6 files changed, 696 insertions(+), 31 deletions(-) create mode 100644 test/simple/test-child-process-fork-net.js create mode 100644 test/simple/test-child-process-fork-net2.js diff --git a/doc/api/child_process.markdown b/doc/api/child_process.markdown index ade33bc6386..18be2f66c2e 100644 --- a/doc/api/child_process.markdown +++ b/doc/api/child_process.markdown @@ -55,7 +55,7 @@ An alternative way to check if you can send messages is to see if the ### Event: 'message' * `message` {Object} a parsed JSON object or primitive value -* `sendHandle` {Handle object} a handle object +* `sendHandle` {Handle object} a Socket or Server object Messages send by `.send(message, [sendHandle])` are obtained using the `message` event. @@ -129,7 +129,7 @@ See `kill(2)` * `message` {Object} * `sendHandle` {Handle object} -When useing `child_process.fork()` an you can write to the child using +When using `child_process.fork()` you can write to the child using `child.send(message, [sendHandle])` and messages are received by a `'message'` event on the child. @@ -162,9 +162,73 @@ the `message` event, since they are internal messages used by node core. Messages containing the prefix are emitted in the `internalMessage` event, you should by all means avoid using this feature, it is subject to change without notice. -The `sendHandle` option to `child.send()` is for sending a handle object to -another process. The child will receive the object as its second argument to -the `message` event. +The `sendHandle` option to `child.send()` is for sending a TCP server or +socket object to another process. The child will receive the object as its +second argument to the `message` event. + +**send server object** + +Here is an example of sending a server: + + var child = require('child_process').fork('child.js'); + + // Open up the server object and send the handle. + var server = require('net').createServer(); + server.on('connection', function (socket) { + socket.end('handled by parent'); + }); + server.listen(1337, function() { + child.send('server', server); + }); + +And the child would the recive the server object as: + + process.on('message', function(m, server) { + if (m === 'server') { + server.on('connection', function (socket) { + socket.end('handled by child'); + }); + } + }); + +Note that the server is now shared between the parent and child, this means +that some connections will be handled by the parent and some by the child. + +**send socket object** + +Here is an example of sending a socket. It will spawn two childs and handle +connections with the remote address `74.125.127.100` as VIP by sending the +socket to a "special" child process. Other sockets will go to a "normal" process. + + var normal = require('child_process').fork('child.js', ['normal']); + var special = require('child_process').fork('child.js', ['special']); + + // Open up the server and send sockets to child + var server = require('net').createServer(); + server.on('connection', function (socket) { + + // if this is a VIP + if (socket.remoteAddress === '74.125.127.100') { + special.send('socket', socket); + return; + } + // just the usual dudes + normal.send('socket', socket); + }); + server.listen(1337); + +The `child.js` could look like this: + + process.on('message', function(m, socket) { + if (m === 'socket') { + socket.end('You where handled as a ' + process.argv[2] + ' person'); + } + }); + +Note that once a single socket has been sent to a child the parent can no +longer keep track of when the socket is destroyed. To indicate this condition +the `.connections` property becomes `null`. +It is also recomended not to use `.maxConnections` in this condition. ### child.disconnect() @@ -382,7 +446,7 @@ leaner than `child_process.exec`. It has the same options. This is a special case of the `spawn()` functionality for spawning Node processes. In addition to having all the methods in a normal ChildProcess -instance, the returned object has a communication channel built-in. Se +instance, the returned object has a communication channel built-in. See `child.send(message, [sendHandle])` for details. By default the spawned Node process will have the stdout, stderr associated diff --git a/doc/api/net.markdown b/doc/api/net.markdown index 9ef058cb80f..7fb07df9558 100644 --- a/doc/api/net.markdown +++ b/doc/api/net.markdown @@ -198,10 +198,14 @@ Don't call `server.address()` until the `'listening'` event has been emitted. Set this property to reject connections when the server's connection count gets high. +It is not recommended to use this option once a socket has been sent to a child +with `child_process.fork()`. + ### server.connections The number of concurrent connections on the server. +This becomes `null` when sending a socket to a child with `child_process.fork()`. `net.Server` is an `EventEmitter` with the following events: diff --git a/lib/child_process.js b/lib/child_process.js index 63a0856bf3b..37937012424 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -54,15 +54,207 @@ function createSocket(pipe, readable) { } +// this object contain function to convert TCP objects to native handle objects +// and back again. +var handleConversion = { + 'net.Native': { + simultaneousAccepts: true, + + send: function(message, handle) { + return handle; + }, + + got: function(message, handle, emit) { + emit(handle); + } + }, + + 'net.Server': { + simultaneousAccepts: true, + + send: function(message, server) { + return server._handle; + }, + + got: function(message, handle, emit) { + var self = this; + + var server = new net.Server(); + server.listen(handle, function() { + emit(server); + }); + } + }, + + 'net.Socket': { + send: function(message, socket) { + // pause socket so no data is lost, will be resumed later + socket.pause(); + + // if the socket wsa created by net.Server + if (socket.server) { + // the slave should keep track of the socket + message.key = socket.server._connectionKey; + + var firstTime = !this._channel.sockets.send[message.key]; + + // add socket to connections list + var socketList = getSocketList('send', this, message.key); + socketList.add(socket); + + // the server should no longer expose a .connection property + // and when asked to close it should query the socket status from slaves + if (firstTime) { + socket.server._setupSlave(socketList); + } + } + + // remove handle from socket object, it will be closed when the socket + // has been send + var handle = socket._handle; + handle.onread = function() {}; + socket._handle = null; + + return handle; + }, + + got: function(message, handle, emit) { + var socket = new net.Socket({handle: handle}); + socket.readable = socket.writable = true; + socket.pause(); + + // if the socket was created by net.Server we will track the socket + if (message.key) { + + // add socket to connections list + var socketList = getSocketList('got', this, message.key); + socketList.add(socket); + } + + emit(socket); + socket.resume(); + } + } +}; + +// This object keep track of the socket there are sended +function SocketListSend(slave, key) { + var self = this; + + this.key = key; + this.list = []; + this.slave = slave; + + slave.once('disconnect', function() { + self.flush(); + }); + + this.slave.on('internalMessage', function(msg) { + if (msg.cmd !== 'NODE_SOCKET_CLOSED' || msg.key !== self.key) return; + self.flush(); + }); +} +util.inherits(SocketListSend, EventEmitter); + +SocketListSend.prototype.add = function(socket) { + this.list.push(socket); +}; + +SocketListSend.prototype.flush = function() { + var list = this.list; + this.list = []; + + list.forEach(function(socket) { + socket.destroy(); + }); +}; + +SocketListSend.prototype.update = function() { + if (this.slave.connected === false) return; + + this.slave.send({ + cmd: 'NODE_SOCKET_FETCH', + key: this.key + }); +}; + +// This object keep track of the socket there are received +function SocketListReceive(slave, key) { + var self = this; + + this.key = key; + this.list = []; + this.slave = slave; + + slave.on('internalMessage', function(msg) { + if (msg.cmd !== 'NODE_SOCKET_FETCH' || msg.key !== self.key) return; + + if (self.list.length === 0) { + self.flush(); + return; + } + + self.on('itemRemoved', function removeMe() { + if (self.list.length !== 0) return; + self.removeListener('itemRemoved', removeMe); + self.flush(); + }); + }); +} +util.inherits(SocketListReceive, EventEmitter); + +SocketListReceive.prototype.flush = function() { + this.list = []; + + if (this.slave.connected) { + this.slave.send({ + cmd: 'NODE_SOCKET_CLOSED', + key: this.key + }); + } +}; + +SocketListReceive.prototype.add = function(socket) { + var self = this; + this.list.push(socket); + + socket.on('close', function() { + self.list.splice(self.list.indexOf(socket), 1); + self.emit('itemRemoved'); + }); +}; + +function getSocketList(type, slave, key) { + var sockets = slave._channel.sockets[type]; + var socketList = sockets[key]; + if (!socketList) { + var Construct = type === 'send' ? SocketListSend : SocketListReceive; + socketList = sockets[key] = new Construct(slave, key); + } + return socketList; +} + +function handleMessage(target, message, handle) { + //Filter out internal messages + //if cmd property begin with "_NODE" + if (message !== null && + typeof message === 'object' && + typeof message.cmd === 'string' && + message.cmd.indexOf('NODE_') === 0) { + target.emit('internalMessage', message, handle); + } + //Non-internal message + else { + target.emit('message', message, handle); + } +} + function setupChannel(target, channel) { target._channel = channel; var jsonBuffer = ''; channel.buffering = false; channel.onread = function(pool, offset, length, recvHandle) { - // Update simultaneous accepts on Windows - net._setSimultaneousAccepts(recvHandle); - if (pool) { jsonBuffer += pool.toString('ascii', offset, offset + length); @@ -73,18 +265,7 @@ function setupChannel(target, channel) { var json = jsonBuffer.slice(start, i); var message = JSON.parse(json); - //Filter out internal messages - //if cmd property begin with "_NODE" - if (message !== null && - typeof message === 'object' && - typeof message.cmd === 'string' && - message.cmd.indexOf('NODE_') === 0) { - target.emit('internalMessage', message, recvHandle); - } - //Non-internal message - else { - target.emit('message', message, recvHandle); - } + handleMessage(target, message, recvHandle); start = i + 1; } @@ -97,7 +278,27 @@ function setupChannel(target, channel) { } }; - target.send = function(message, sendHandle) { + // object where socket lists will live + channel.sockets = { got: {}, send: {} }; + + // handlers will go through this + target.on('internalMessage', function(message, handle) { + if (message.cmd !== 'NODE_HANDLE') return; + + var obj = handleConversion[message.type]; + + // Update simultaneous accepts on Windows + if (obj.simultaneousAccepts) { + net._setSimultaneousAccepts(handle); + } + + // Convert handle object + obj.got.call(this, message, handle, function(handle) { + handleMessage(target, message.msg, handle); + }); + }); + + target.send = function(message, handle) { if (typeof message === 'undefined') { throw new TypeError('message cannot be undefined'); } @@ -112,12 +313,43 @@ function setupChannel(target, channel) { return false; } + // package messages with a handle object + if (handle) { + // this message will be handled by an internalMessage event handler + message = { + cmd: 'NODE_HANDLE', + type: 'net.', + msg: message + }; + + switch (handle.constructor.name) { + case 'Socket': + message.type += 'Socket'; break; + case 'Server': + message.type += 'Server'; break; + case 'Pipe': + case 'TCP': + message.type += 'Native'; break; + } + + var obj = handleConversion[message.type]; + + // convert TCP object to native handle object + handle = handleConversion[message.type].send.apply(target, arguments); + + // Update simultaneous accepts on Windows + if (obj.simultaneousAccepts) { + net._setSimultaneousAccepts(handle); + } + } + var string = JSON.stringify(message) + '\n'; + var writeReq = channel.writeUtf8String(string, handle); - // Update simultaneous accepts on Windows - net._setSimultaneousAccepts(sendHandle); - - var writeReq = channel.writeUtf8String(string, sendHandle); + // Close the Socket handle after sending it + if (message && message.type === 'net.Socket') { + handle.close(); + } if (!writeReq) { var er = errnoException(errno, 'write', 'cannot write to IPC channel.'); diff --git a/lib/net.js b/lib/net.js index 9705f557a73..49faf59db6d 100644 --- a/lib/net.js +++ b/lib/net.js @@ -352,7 +352,7 @@ Socket.prototype._destroy = function(exception, cb) { timers.unenroll(this); if (this.server) { - this.server.connections--; + this.server._connections--; this.server._emitCloseIfDrained(); } @@ -800,7 +800,23 @@ function Server(/* [ options, ] listener */) { } } - this.connections = 0; + this._connections = 0; + + // when server is using slaves .connections is not reliable + // so null will be return if thats the case + Object.defineProperty(this, 'connections', { + get: function() { + if (self._usingSlaves) { + return null; + } + return self._connections; + }, + set: function(val) { + return (self._connections = val); + }, + configurable: true, enumerable: true + }); + this.allowHalfOpen = options.allowHalfOpen || false; this._handle = null; @@ -881,6 +897,9 @@ Server.prototype._listen2 = function(address, port, addressType, backlog) { return; } + // generate connection key, this should be unique to the connection + this._connectionKey = addressType + ':' + address + ':' + port; + process.nextTick(function() { self.emit('listening'); }); @@ -970,7 +989,7 @@ function onconnection(clientHandle) { return; } - if (self.maxConnections && self.connections >= self.maxConnections) { + if (self.maxConnections && self._connections >= self.maxConnections) { clientHandle.close(); return; } @@ -983,7 +1002,7 @@ function onconnection(clientHandle) { socket.resume(); - self.connections++; + self._connections++; socket.server = self; DTRACE_NET_SERVER_CONNECTION(socket); @@ -1005,13 +1024,21 @@ Server.prototype.close = function(cb) { this._handle = null; this._emitCloseIfDrained(); + // fetch new socket lists + if (this._usingSlaves) { + this._slaves.forEach(function(socketList) { + if (socketList.list.length === 0) return; + socketList.update(); + }); + } + return this; }; Server.prototype._emitCloseIfDrained = function() { var self = this; - if (self._handle || self.connections) return; + if (self._handle || self._connections) return; process.nextTick(function() { self.emit('close'); @@ -1023,6 +1050,15 @@ Server.prototype.listenFD = function(fd, type) { throw new Error('This API is no longer supported. See child_process.fork'); }; +// when sending a socket using fork IPC this function is executed +Server.prototype._setupSlave = function(socketList) { + if (!this._usingSlaves) { + this._usingSlaves = true; + this._slaves = []; + } + this._slaves.push(socketList); +}; + // TODO: isIP should be moved to the DNS code. Putting it here now because // this is what the legacy system did. diff --git a/test/simple/test-child-process-fork-net.js b/test/simple/test-child-process-fork-net.js new file mode 100644 index 00000000000..6dd0e5fde38 --- /dev/null +++ b/test/simple/test-child-process-fork-net.js @@ -0,0 +1,198 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var assert = require('assert'); +var common = require('../common'); +var fork = require('child_process').fork; +var net = require('net'); + +// progress tracker +function ProgressTracker(missing, callback) { + this.missing = missing; + this.callback = callback; +} +ProgressTracker.prototype.done = function() { + this.missing -= 1; + this.check(); +}; +ProgressTracker.prototype.check = function() { + if (this.missing === 0) this.callback(); +}; + +if (process.argv[2] === 'child') { + + var serverScope; + + process.on('message', function onServer(msg, server) { + if (msg.what !== 'server') return; + process.removeListener('message', onServer); + + serverScope = server; + + server.on('connection', function(socket) { + console.log('CHILD: got connection'); + process.send({what: 'connection'}); + socket.destroy(); + }); + + // start making connection from parent + console.log('CHILD: server listening'); + process.send({what: 'listening'}); + }); + + process.on('message', function onClose(msg) { + if (msg.what !== 'close') return; + process.removeListener('message', onClose); + + serverScope.on('close', function() { + process.send({what: 'close'}); + }); + serverScope.close(); + }); + + process.on('message', function onSocket(msg, socket) { + if (msg.what !== 'socket') return; + process.removeListener('message', onSocket); + socket.end('echo'); + console.log('CHILD: got socket'); + }); + + process.send({what: 'ready'}); +} else { + + var child = fork(process.argv[1], ['child']); + + child.on('exit', function() { + console.log('CHILD: died'); + }); + + // send net.Server to child and test by connecting + var testServer = function(callback) { + + // destroy server execute callback when done + var progress = new ProgressTracker(2, function() { + server.on('close', function() { + console.log('PARENT: server closed'); + child.send({what: 'close'}); + }); + server.close(); + }); + + // we expect 10 connections and close events + var connections = new ProgressTracker(10, progress.done.bind(progress)); + var closed = new ProgressTracker(10, progress.done.bind(progress)); + + // create server and send it to child + var server = net.createServer(); + server.on('connection', function(socket) { + console.log('PARENT: got connection'); + socket.destroy(); + connections.done(); + }); + server.on('listening', function() { + console.log('PARENT: server listening'); + child.send({what: 'server'}, server); + }); + server.listen(common.PORT); + + // handle client messages + var messageHandlers = function(msg) { + + if (msg.what === 'listening') { + // make connections + var socket; + for (var i = 0; i < 10; i++) { + socket = net.connect(common.PORT, function() { + console.log('CLIENT: connected'); + }); + socket.on('close', function() { + closed.done(); + console.log('CLIENT: closed'); + }); + } + + } else if (msg.what === 'connection') { + // child got connection + connections.done(); + } else if (msg.what === 'close') { + child.removeListener('message', messageHandlers); + callback(); + } + }; + + child.on('message', messageHandlers); + }; + + // send net.Socket to child + var testSocket = function(callback) { + + // create a new server and connect to it, + // but the socket will be handled by the child + var server = net.createServer(); + server.on('connection', function(socket) { + socket.on('close', function() { + console.log('CLIENT: socket closed'); + }); + child.send({what: 'socket'}, socket); + }); + server.on('close', function() { + console.log('PARENT: server closed'); + callback(); + }); + server.listen(common.PORT, function() { + var connect = net.connect(common.PORT); + var store = ''; + connect.on('data', function(chunk) { + store += chunk; + console.log('CLIENT: got data'); + }); + connect.on('close', function() { + console.log('CLIENT: closed'); + assert.equal(store, 'echo'); + server.close(); + }); + }); + }; + + // create server and send it to child + var serverSucess = false; + var socketSucess = false; + child.on('message', function onReady(msg) { + if (msg.what !== 'ready') return; + child.removeListener('message', onReady); + + testServer(function() { + serverSucess = true; + + testSocket(function() { + socketSucess = true; + child.kill(); + }); + }); + + }); + + process.on('exit', function() { + assert.ok(serverSucess); + assert.ok(socketSucess); + }); + +} diff --git a/test/simple/test-child-process-fork-net2.js b/test/simple/test-child-process-fork-net2.js new file mode 100644 index 00000000000..48713566a62 --- /dev/null +++ b/test/simple/test-child-process-fork-net2.js @@ -0,0 +1,131 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var assert = require('assert'); +var common = require('../common'); +var fork = require('child_process').fork; +var net = require('net'); + +if (process.argv[2] === 'child') { + + var endMe = null; + + process.on('message', function(m, socket) { + if (!socket) return; + + // will call .end('end') or .write('write'); + socket[m](m); + + // store the unfinished socket + if (m === 'write') { + endMe = socket; + } + }); + + process.on('message', function(m) { + if (m !== 'close') return; + endMe.end('end'); + endMe = null; + }); + + process.on('disconnect', function() { + endMe.end('end'); + endMe = null; + }); + +} else { + + var child1 = fork(process.argv[1], ['child']); + var child2 = fork(process.argv[1], ['child']); + var child3 = fork(process.argv[1], ['child']); + + var server = net.createServer(); + + var connected = 0; + server.on('connection', function(socket) { + switch (connected) { + case 0: + child1.send('end', socket); break; + case 1: + child1.send('write', socket); break; + case 2: + child2.send('end', socket); break; + case 3: + child2.send('write', socket); break; + case 4: + child3.send('end', socket); break; + case 5: + child3.send('write', socket); break; + } + connected += 1; + + if (connected === 6) { + closeServer(); + } + }); + + var disconnected = 0; + server.on('listening', function() { + + var j = 6, client; + while (j--) { + client = net.connect(common.PORT, '127.0.0.1'); + client.on('close', function() { + disconnected += 1; + }); + } + }); + + var closeEmitted = false; + server.on('close', function() { + closeEmitted = true; + + child1.kill(); + child2.kill(); + child3.kill(); + }); + + server.listen(common.PORT, '127.0.0.1'); + + var timeElasped = 0; + var closeServer = function() { + var startTime = Date.now(); + server.on('close', function() { + timeElasped = Date.now() - startTime; + }); + + server.close(); + + setTimeout(function() { + child1.send('close'); + child2.send('close'); + child3.disconnect(); + }, 200); + }; + + process.on('exit', function() { + assert.equal(disconnected, 6); + assert.equal(connected, 6); + assert.ok(closeEmitted); + assert.ok(timeElasped >= 190 && timeElasped <= 1000, + 'timeElasped was not between 190 and 1000 ms'); + }); +}