diff --git a/lib/dgram.js b/lib/dgram.js index 26e0a2a3e81..42473b40e77 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -243,16 +243,26 @@ Socket.prototype.setMulticastLoopback = function(arg) { Socket.prototype.addMembership = function(multicastAddress, - multicastInterface) { - // are we ever going to support this in libuv? - throw new Error('not yet implemented'); + interfaceAddress) { + this._healthCheck(); + + if (!multicastAddress) { + throw new Error('multicast address must be specified'); + } + + return this._handle.addMembership(multicastAddress, interfaceAddress); }; Socket.prototype.dropMembership = function(multicastAddress, - multicastInterface) { - // are we ever going to support this in libuv? - throw new Error('not yet implemented'); + interfaceAddress) { + this._healthCheck(); + + if (!multicastAddress) { + throw new Error('multicast address must be specified'); + } + + return this._handle.dropMembership(multicastAddress, interfaceAddress); }; diff --git a/src/udp_wrap.cc b/src/udp_wrap.cc index aa5160cebbc..5b2fb52923c 100644 --- a/src/udp_wrap.cc +++ b/src/udp_wrap.cc @@ -91,6 +91,8 @@ public: static Handle RecvStart(const Arguments& args); static Handle RecvStop(const Arguments& args); static Handle GetSockName(const Arguments& args); + static Handle AddMembership(const Arguments& args); + static Handle DropMembership(const Arguments& args); private: static inline char* NewSlab(v8::Handle global, v8::Handle wrap_obj); @@ -100,6 +102,8 @@ private: static Handle DoBind(const Arguments& args, int family); static Handle DoSend(const Arguments& args, int family); + static Handle SetMembership(const Arguments& args, + uv_membership membership); static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size); static void OnSend(uv_udp_send_t* req, int status); @@ -147,6 +151,8 @@ void UDPWrap::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(t, "recvStart", RecvStart); NODE_SET_PROTOTYPE_METHOD(t, "recvStop", RecvStop); NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); + NODE_SET_PROTOTYPE_METHOD(t, "addMembership", AddMembership); + NODE_SET_PROTOTYPE_METHOD(t, "dropMembership", DropMembership); target->Set(String::NewSymbol("UDP"), Persistent::New(t)->GetFunction()); @@ -204,6 +210,41 @@ Handle UDPWrap::Bind6(const Arguments& args) { } +Handle UDPWrap::SetMembership(const Arguments& args, + uv_membership membership) { + HandleScope scope; + UNWRAP + + assert(args.Length() == 2); + + String::Utf8Value address(args[0]->ToString()); + String::Utf8Value interface(args[1]->ToString()); + + const char* interface_cstr = *interface; + if (args[1]->IsUndefined() || args[1]->IsNull()) { + interface_cstr = NULL; + } + + int r = uv_udp_set_membership(&wrap->handle_, *address, interface_cstr, + membership); + + if (r) + SetErrno(uv_last_error(uv_default_loop())); + + return scope.Close(Integer::New(r)); +} + + +Handle UDPWrap::AddMembership(const Arguments& args) { + return SetMembership(args, UV_JOIN_GROUP); +} + + +Handle UDPWrap::DropMembership(const Arguments& args) { + return SetMembership(args, UV_LEAVE_GROUP); +} + + Handle UDPWrap::DoSend(const Arguments& args, int family) { HandleScope scope; int r; diff --git a/test/simple/test-dgram-multicast-multi-process.js b/test/simple/test-dgram-multicast-multi-process.js new file mode 100644 index 00000000000..784bb6c0943 --- /dev/null +++ b/test/simple/test-dgram-multicast-multi-process.js @@ -0,0 +1,160 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +var common = require('../common'), + assert = require('assert'), + cluster = require('cluster'), + dgram = require('dgram'), + util = require('util'), + assert = require('assert'), + Buffer = require('buffer').Buffer, + LOCAL_BROADCAST_HOST = '224.0.0.1', + messages = [ + new Buffer('First message to send'), + new Buffer('Second message to send'), + new Buffer('Third message to send'), + new Buffer('Fourth message to send') + ]; + +if (cluster.isMaster) { + var workers = {}, + listeners = 3, + listening = 0, + i = 0, + done = 0; + + //launch child processes + for (var x = 0; x < listeners; x++) { + (function () { + var worker = cluster.fork(); + workers[worker.pid] = worker; + + worker.messagesReceived = []; + + worker.on('message', function (msg) { + if (msg.listening) { + listening += 1; + + if (listening === listeners) { + //all child process are listening, so start sending + sendSocket.sendNext(); + } + } + else if (msg.message) { + worker.messagesReceived.push(msg.message); + + if (worker.messagesReceived.length === messages.length) { + done += 1; + console.error('%d received %d messages total.', worker.pid, + worker.messagesReceived.length); + } + + if (done === listeners) { + console.error('All workers have received the required number of' + + 'messages. Will now compare.'); + + Object.keys(workers).forEach(function (pid) { + var worker = workers[pid]; + + var count = 0; + + worker.messagesReceived.forEach(function(buf) { + for (var i = 0; i < messages.length; ++i) { + if (buf.toString() === messages[i].toString()) { + count++; + break; + } + } + }); + + console.error('%d received %d matching messges.', worker.pid + , count); + + assert.equal(count, messages.length + ,'A worker received an invalid multicast message'); + }); + } + } + }); + })(x); + } + + var sendSocket = dgram.createSocket('udp4'); + + //sendSocket.setBroadcast(true); + //sendSocket.setMulticastTTL(1); + //sendSocket.setMulticastLoopback(true); + + sendSocket.on('close', function() { + console.error('sendSocket closed'); + }); + + sendSocket.sendNext = function() { + var buf = messages[i++]; + + if (!buf) { + try { sendSocket.close(); } catch (e) {} + return; + } + + sendSocket.send(buf, 0, buf.length, + common.PORT, LOCAL_BROADCAST_HOST, function(err) { + if (err) throw err; + console.error('sent %s to %s', util.inspect(buf.toString()), + LOCAL_BROADCAST_HOST + common.PORT); + process.nextTick(sendSocket.sendNext); + }); + }; +} + +if (!cluster.isMaster) { + var receivedMessages = []; + var listenSocket = dgram.createSocket('udp4'); + + listenSocket.addMembership(LOCAL_BROADCAST_HOST); + + listenSocket.on('message', function(buf, rinfo) { + console.error('%s received %s from %j', process.pid + ,util.inspect(buf.toString()), rinfo); + + receivedMessages.push(buf); + + process.send({ message : buf.toString() }); + + if (receivedMessages.length == messages.length) { + listenSocket.dropMembership(LOCAL_BROADCAST_HOST); + process.nextTick(function() { // TODO should be changed to below. + // listenSocket.dropMembership(LOCAL_BROADCAST_HOST, function() { + listenSocket.close(); + }); + } + }); + + listenSocket.on('close', function() { + process.exit(); + }); + + listenSocket.on('listening', function() { + process.send({ listening : true }); + }); + + listenSocket.bind(common.PORT); +}