cluster: support datagram sockets
This commit is contained in:
parent
c13354e339
commit
5e7e51c2fe
@ -155,6 +155,18 @@ var handleConversion = {
|
|||||||
|
|
||||||
emit(socket);
|
emit(socket);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
'dgram.Native': {
|
||||||
|
simultaneousAccepts: false,
|
||||||
|
|
||||||
|
send: function(message, handle) {
|
||||||
|
return handle;
|
||||||
|
},
|
||||||
|
|
||||||
|
got: function(message, handle, emit) {
|
||||||
|
emit(handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -355,18 +367,20 @@ function setupChannel(target, channel) {
|
|||||||
// this message will be handled by an internalMessage event handler
|
// this message will be handled by an internalMessage event handler
|
||||||
message = {
|
message = {
|
||||||
cmd: 'NODE_HANDLE',
|
cmd: 'NODE_HANDLE',
|
||||||
type: 'net.',
|
|
||||||
msg: message
|
msg: message
|
||||||
};
|
};
|
||||||
|
|
||||||
switch (handle.constructor.name) {
|
if (handle instanceof net.Socket) {
|
||||||
case 'Socket':
|
message.type = 'net.Socket';
|
||||||
message.type += 'Socket'; break;
|
} else if (handle instanceof net.Server) {
|
||||||
case 'Server':
|
message.type = 'net.Server';
|
||||||
message.type += 'Server'; break;
|
} else if (handle instanceof process.binding('tcp_wrap').TCP ||
|
||||||
case 'Pipe':
|
handle instanceof process.binding('pipe_wrap').Pipe) {
|
||||||
case 'TCP':
|
message.type = 'net.Native';
|
||||||
message.type += 'Native'; break;
|
} else if (handle instanceof process.binding('udp_wrap').UDP) {
|
||||||
|
message.type = 'dgram.Native';
|
||||||
|
} else {
|
||||||
|
throw new TypeError("This handle type can't be sent");
|
||||||
}
|
}
|
||||||
|
|
||||||
var obj = handleConversion[message.type];
|
var obj = handleConversion[message.type];
|
||||||
|
@ -227,8 +227,14 @@ if (cluster.isMaster) {
|
|||||||
|
|
||||||
if (serverHandlers.hasOwnProperty(key)) {
|
if (serverHandlers.hasOwnProperty(key)) {
|
||||||
handler = serverHandlers[key];
|
handler = serverHandlers[key];
|
||||||
|
} else if (message.addressType === 'udp4' ||
|
||||||
|
message.addressType === 'udp6') {
|
||||||
|
var dgram = require('dgram');
|
||||||
|
handler = dgram._createSocketHandle.apply(net, args);
|
||||||
|
serverHandlers[key] = handler;
|
||||||
} else {
|
} else {
|
||||||
handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
|
handler = net._createServerHandle.apply(net, args);
|
||||||
|
serverHandlers[key] = handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
// echo callback with the fd handler associated with it
|
// echo callback with the fd handler associated with it
|
||||||
@ -259,9 +265,9 @@ if (cluster.isMaster) {
|
|||||||
messageHandler.suicide = function(message, worker) {
|
messageHandler.suicide = function(message, worker) {
|
||||||
worker.suicide = true;
|
worker.suicide = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Messages to a worker will be handled using these methods
|
// Messages to a worker will be handled using these methods
|
||||||
else if (cluster.isWorker) {
|
else if (cluster.isWorker) {
|
||||||
|
|
||||||
@ -541,7 +547,8 @@ cluster._setupWorker = function() {
|
|||||||
sendInternalMessage(worker, { cmd: 'online' });
|
sendInternalMessage(worker, { cmd: 'online' });
|
||||||
};
|
};
|
||||||
|
|
||||||
// Internal function. Called by lib/net.js when attempting to bind a server.
|
// Internal function. Called by net.js and dgram.js when attempting to bind a
|
||||||
|
// TCP server or UDP socket.
|
||||||
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
|
cluster._getServer = function(tcpSelf, address, port, addressType, fd, cb) {
|
||||||
// This can only be called from a worker.
|
// This can only be called from a worker.
|
||||||
assert(cluster.isWorker);
|
assert(cluster.isWorker);
|
||||||
|
90
lib/dgram.js
90
lib/dgram.js
@ -19,6 +19,7 @@
|
|||||||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var assert = require('assert');
|
||||||
var util = require('util');
|
var util = require('util');
|
||||||
var events = require('events');
|
var events = require('events');
|
||||||
|
|
||||||
@ -29,6 +30,7 @@ var BIND_STATE_BINDING = 1;
|
|||||||
var BIND_STATE_BOUND = 2;
|
var BIND_STATE_BOUND = 2;
|
||||||
|
|
||||||
// lazily loaded
|
// lazily loaded
|
||||||
|
var cluster = null;
|
||||||
var dns = null;
|
var dns = null;
|
||||||
var net = null;
|
var net = null;
|
||||||
|
|
||||||
@ -86,6 +88,24 @@ function newHandle(type) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
exports._createSocketHandle = function(address, port, addressType, fd) {
|
||||||
|
// Opening an existing fd is not supported for UDP handles.
|
||||||
|
assert(typeof fd !== 'number' || fd < 0);
|
||||||
|
|
||||||
|
var handle = newHandle(addressType);
|
||||||
|
|
||||||
|
if (port || address) {
|
||||||
|
var r = handle.bind(address, port || 0, 0);
|
||||||
|
if (r == -1) {
|
||||||
|
handle.close();
|
||||||
|
handle = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
function Socket(type, listener) {
|
function Socket(type, listener) {
|
||||||
events.EventEmitter.call(this);
|
events.EventEmitter.call(this);
|
||||||
|
|
||||||
@ -110,41 +130,75 @@ exports.createSocket = function(type, listener) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
function startListening(socket) {
|
||||||
|
socket._handle.onmessage = onMessage;
|
||||||
|
// Todo: handle errors
|
||||||
|
socket._handle.recvStart();
|
||||||
|
socket._receiving = true;
|
||||||
|
socket._bindState = BIND_STATE_BOUND;
|
||||||
|
socket.fd = -42; // compatibility hack
|
||||||
|
|
||||||
|
socket.emit('listening');
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Socket.prototype.bind = function(port, address, callback) {
|
Socket.prototype.bind = function(port, address, callback) {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|
||||||
self._healthCheck();
|
self._healthCheck();
|
||||||
|
|
||||||
|
if (this._bindState != BIND_STATE_UNBOUND)
|
||||||
|
throw new Error('Socket is already bound');
|
||||||
|
|
||||||
|
this._bindState = BIND_STATE_BINDING;
|
||||||
|
|
||||||
if (typeof callback === 'function')
|
if (typeof callback === 'function')
|
||||||
self.once('listening', callback);
|
self.once('listening', callback);
|
||||||
|
|
||||||
// resolve address first
|
// resolve address first
|
||||||
self._handle.lookup(address, function(err, ip) {
|
self._handle.lookup(address, function(err, ip) {
|
||||||
self._bindState = BIND_STATE_UNBOUND;
|
|
||||||
|
|
||||||
if (!self._handle)
|
|
||||||
return; // handle has been closed in the mean time
|
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
|
self._bindState = BIND_STATE_UNBOUND;
|
||||||
self.emit('error', err);
|
self.emit('error', err);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
|
if (!cluster)
|
||||||
self.emit('error', errnoException(errno, 'bind'));
|
cluster = require('cluster');
|
||||||
return;
|
|
||||||
|
if (cluster.isWorker) {
|
||||||
|
cluster._getServer(self, ip, port, self.type, -1, function(handle) {
|
||||||
|
if (!self._handle)
|
||||||
|
// handle has been closed in the mean time.
|
||||||
|
return handle.close();
|
||||||
|
|
||||||
|
// Set up the handle that we got from master.
|
||||||
|
handle.lookup = self._handle.lookup;
|
||||||
|
handle.bind = self._handle.bind;
|
||||||
|
handle.send = self._handle.send;
|
||||||
|
handle.owner = self;
|
||||||
|
|
||||||
|
// Replace the existing handle by the handle we got from master.
|
||||||
|
self._handle.close();
|
||||||
|
self._handle = handle;
|
||||||
|
|
||||||
|
startListening(self);
|
||||||
|
});
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if (!self._handle)
|
||||||
|
return; // handle has been closed in the mean time
|
||||||
|
|
||||||
|
if (self._handle.bind(ip, port || 0, /*flags=*/ 0)) {
|
||||||
|
self.emit('error', errnoException(errno, 'bind'));
|
||||||
|
self._bindState = BIND_STATE_UNBOUND;
|
||||||
|
// Todo: close?
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
startListening(self);
|
||||||
}
|
}
|
||||||
|
|
||||||
self._handle.onmessage = onMessage;
|
|
||||||
self._handle.recvStart();
|
|
||||||
self._receiving = true;
|
|
||||||
self._bindState = BIND_STATE_BOUND;
|
|
||||||
self.fd = -42; // compatibility hack
|
|
||||||
|
|
||||||
self.emit('listening');
|
|
||||||
});
|
});
|
||||||
|
|
||||||
self._bindState = BIND_STATE_BINDING;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -53,6 +53,8 @@ class HandleWrap {
|
|||||||
static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
|
static v8::Handle<v8::Value> Ref(const v8::Arguments& args);
|
||||||
static v8::Handle<v8::Value> Unref(const v8::Arguments& args);
|
static v8::Handle<v8::Value> Unref(const v8::Arguments& args);
|
||||||
|
|
||||||
|
inline uv_handle_t* GetHandle() { return handle__; };
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
|
HandleWrap(v8::Handle<v8::Object> object, uv_handle_t* handle);
|
||||||
virtual ~HandleWrap();
|
virtual ~HandleWrap();
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#include "pipe_wrap.h"
|
#include "pipe_wrap.h"
|
||||||
#include "tcp_wrap.h"
|
#include "tcp_wrap.h"
|
||||||
#include "req_wrap.h"
|
#include "req_wrap.h"
|
||||||
|
#include "udp_wrap.h"
|
||||||
#include "node_counters.h"
|
#include "node_counters.h"
|
||||||
|
|
||||||
#include <stdlib.h> // abort()
|
#include <stdlib.h> // abort()
|
||||||
@ -118,7 +119,7 @@ StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
|
|||||||
|
|
||||||
void StreamWrap::SetHandle(uv_handle_t* h) {
|
void StreamWrap::SetHandle(uv_handle_t* h) {
|
||||||
HandleWrap::SetHandle(h);
|
HandleWrap::SetHandle(h);
|
||||||
stream_ = (uv_stream_t*)h;
|
stream_ = reinterpret_cast<uv_stream_t*>(h);
|
||||||
stream_->data = this;
|
stream_->data = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,6 +174,28 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
template <class WrapType, class UVType>
|
||||||
|
static Local<Object> AcceptHandle(uv_stream_t* pipe) {
|
||||||
|
HandleScope scope;
|
||||||
|
Local<Object> wrap_obj;
|
||||||
|
WrapType* wrap;
|
||||||
|
UVType* handle;
|
||||||
|
|
||||||
|
wrap_obj = WrapType::Instantiate();
|
||||||
|
if (wrap_obj.IsEmpty())
|
||||||
|
return Local<Object>();
|
||||||
|
|
||||||
|
wrap = static_cast<WrapType*>(
|
||||||
|
wrap_obj->GetAlignedPointerFromInternalField(0));
|
||||||
|
handle = wrap->UVHandle();
|
||||||
|
|
||||||
|
if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
|
||||||
|
abort();
|
||||||
|
|
||||||
|
return scope.Close(wrap_obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
|
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
|
||||||
uv_buf_t buf, uv_handle_type pending) {
|
uv_buf_t buf, uv_handle_type pending) {
|
||||||
HandleScope scope;
|
HandleScope scope;
|
||||||
@ -212,19 +235,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
|
|||||||
|
|
||||||
Local<Object> pending_obj;
|
Local<Object> pending_obj;
|
||||||
if (pending == UV_TCP) {
|
if (pending == UV_TCP) {
|
||||||
pending_obj = TCPWrap::Instantiate();
|
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
|
||||||
} else if (pending == UV_NAMED_PIPE) {
|
} else if (pending == UV_NAMED_PIPE) {
|
||||||
pending_obj = PipeWrap::Instantiate();
|
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
|
||||||
|
} else if (pending == UV_UDP) {
|
||||||
|
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
|
||||||
} else {
|
} else {
|
||||||
// We only support sending UV_TCP and UV_NAMED_PIPE right now.
|
|
||||||
assert(pending == UV_UNKNOWN_HANDLE);
|
assert(pending == UV_UNKNOWN_HANDLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pending_obj.IsEmpty()) {
|
if (!pending_obj.IsEmpty()) {
|
||||||
assert(pending_obj->InternalFieldCount() > 0);
|
|
||||||
StreamWrap* pending_wrap = static_cast<StreamWrap*>(
|
|
||||||
pending_obj->GetAlignedPointerFromInternalField(0));
|
|
||||||
if (uv_accept(handle, pending_wrap->GetStream())) abort();
|
|
||||||
argv[3] = pending_obj;
|
argv[3] = pending_obj;
|
||||||
argc++;
|
argc++;
|
||||||
}
|
}
|
||||||
@ -246,7 +266,7 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
|
|||||||
|
|
||||||
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
|
void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
|
||||||
uv_handle_type pending) {
|
uv_handle_type pending) {
|
||||||
OnReadCommon((uv_stream_t*)handle, nread, buf, pending);
|
OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -404,14 +424,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
|
|||||||
StreamWrap::AfterWrite);
|
StreamWrap::AfterWrite);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
uv_stream_t* send_stream = NULL;
|
uv_handle_t* send_handle = NULL;
|
||||||
|
|
||||||
if (args[1]->IsObject()) {
|
if (args[1]->IsObject()) {
|
||||||
Local<Object> send_stream_obj = args[1]->ToObject();
|
Local<Object> send_handle_obj = args[1]->ToObject();
|
||||||
assert(send_stream_obj->InternalFieldCount() > 0);
|
assert(send_handle_obj->InternalFieldCount() > 0);
|
||||||
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
|
HandleWrap* send_handle_wrap = static_cast<HandleWrap*>(
|
||||||
send_stream_obj->GetAlignedPointerFromInternalField(0));
|
send_handle_obj->GetAlignedPointerFromInternalField(0));
|
||||||
send_stream = send_stream_wrap->GetStream();
|
send_handle = send_handle_wrap->GetHandle();
|
||||||
|
|
||||||
// Reference StreamWrap instance to prevent it from being garbage
|
// Reference StreamWrap instance to prevent it from being garbage
|
||||||
// collected before `AfterWrite` is called.
|
// collected before `AfterWrite` is called.
|
||||||
@ -419,14 +439,14 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
|
|||||||
handle_sym = NODE_PSYMBOL("handle");
|
handle_sym = NODE_PSYMBOL("handle");
|
||||||
}
|
}
|
||||||
assert(!req_wrap->object_.IsEmpty());
|
assert(!req_wrap->object_.IsEmpty());
|
||||||
req_wrap->object_->Set(handle_sym, send_stream_obj);
|
req_wrap->object_->Set(handle_sym, send_handle_obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
r = uv_write2(&req_wrap->req_,
|
r = uv_write2(&req_wrap->req_,
|
||||||
wrap->stream_,
|
wrap->stream_,
|
||||||
&buf,
|
&buf,
|
||||||
1,
|
1,
|
||||||
send_stream,
|
reinterpret_cast<uv_stream_t*>(send_handle),
|
||||||
StreamWrap::AfterWrite);
|
StreamWrap::AfterWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +40,7 @@ typedef ReqWrap<uv_udp_send_t> SendWrap;
|
|||||||
// see tcp_wrap.cc
|
// see tcp_wrap.cc
|
||||||
Local<Object> AddressToJS(const sockaddr* addr);
|
Local<Object> AddressToJS(const sockaddr* addr);
|
||||||
|
|
||||||
|
static Persistent<Function> constructor;
|
||||||
static Persistent<String> buffer_sym;
|
static Persistent<String> buffer_sym;
|
||||||
static Persistent<String> oncomplete_sym;
|
static Persistent<String> oncomplete_sym;
|
||||||
static Persistent<String> onmessage_sym;
|
static Persistent<String> onmessage_sym;
|
||||||
@ -98,8 +99,9 @@ void UDPWrap::Initialize(Handle<Object> target) {
|
|||||||
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
|
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
|
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
|
||||||
|
|
||||||
target->Set(String::NewSymbol("UDP"),
|
constructor = Persistent<Function>::New(
|
||||||
Persistent<FunctionTemplate>::New(t)->GetFunction());
|
Persistent<FunctionTemplate>::New(t)->GetFunction());
|
||||||
|
target->Set(String::NewSymbol("UDP"), constructor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -392,6 +394,17 @@ UDPWrap* UDPWrap::Unwrap(Local<Object> obj) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Local<Object> UDPWrap::Instantiate() {
|
||||||
|
// If this assert fires then Initialize hasn't been called yet.
|
||||||
|
assert(constructor.IsEmpty() == false);
|
||||||
|
|
||||||
|
HandleScope scope;
|
||||||
|
Local<Object> obj = constructor->NewInstance();
|
||||||
|
|
||||||
|
return scope.Close(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
uv_udp_t* UDPWrap::UVHandle() {
|
uv_udp_t* UDPWrap::UVHandle() {
|
||||||
return &handle_;
|
return &handle_;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ class UDPWrap: public HandleWrap {
|
|||||||
static Handle<Value> SetTTL(const Arguments& args);
|
static Handle<Value> SetTTL(const Arguments& args);
|
||||||
static UDPWrap* Unwrap(Local<Object> obj);
|
static UDPWrap* Unwrap(Local<Object> obj);
|
||||||
|
|
||||||
|
static Local<Object> Instantiate();
|
||||||
uv_udp_t* UVHandle();
|
uv_udp_t* UVHandle();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
115
test/simple/test-cluster-dgram-1.js
Normal file
115
test/simple/test-cluster-dgram-1.js
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the
|
||||||
|
// "Software"), to deal in the Software without restriction, including
|
||||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||||
|
// following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included
|
||||||
|
// in all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var NUM_WORKERS = 4;
|
||||||
|
var PACKETS_PER_WORKER = 10;
|
||||||
|
|
||||||
|
var assert = require('assert');
|
||||||
|
var cluster = require('cluster');
|
||||||
|
var common = require('../common');
|
||||||
|
var dgram = require('dgram');
|
||||||
|
|
||||||
|
|
||||||
|
if (process.platform === 'win32') {
|
||||||
|
console.warn("dgram clustering is currently not supported on windows.");
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster.isMaster)
|
||||||
|
master();
|
||||||
|
else
|
||||||
|
worker();
|
||||||
|
|
||||||
|
|
||||||
|
function master() {
|
||||||
|
var listening = 0;
|
||||||
|
|
||||||
|
// Fork 4 workers.
|
||||||
|
for (var i = 0; i < NUM_WORKERS; i++)
|
||||||
|
cluster.fork();
|
||||||
|
|
||||||
|
// Wait until all workers are listening.
|
||||||
|
cluster.on('listening', function() {
|
||||||
|
if (++listening < NUM_WORKERS)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Start sending messages.
|
||||||
|
var buf = new Buffer('hello world');
|
||||||
|
var socket = dgram.createSocket('udp4');
|
||||||
|
var sent = 0;
|
||||||
|
doSend();
|
||||||
|
|
||||||
|
function doSend() {
|
||||||
|
socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1', afterSend);
|
||||||
|
}
|
||||||
|
|
||||||
|
function afterSend() {
|
||||||
|
sent++;
|
||||||
|
if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
|
||||||
|
doSend();
|
||||||
|
} else {
|
||||||
|
console.log('master sent %d packets', sent);
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Set up event handlers for every worker. Each worker sends a message when
|
||||||
|
// it has received the expected number of packets. After that it disconnects.
|
||||||
|
for (var key in cluster.workers) {
|
||||||
|
if (cluster.workers.hasOwnProperty(key))
|
||||||
|
setupWorker(cluster.workers[key]);
|
||||||
|
}
|
||||||
|
|
||||||
|
function setupWorker(worker) {
|
||||||
|
var received = 0;
|
||||||
|
|
||||||
|
worker.on('message', function(msg) {
|
||||||
|
received = msg.received;
|
||||||
|
console.log('worker %d received %d packets', worker.id, received);
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on('disconnect', function() {
|
||||||
|
assert(received === PACKETS_PER_WORKER);
|
||||||
|
console.log('worker %d disconnected', worker.id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function worker() {
|
||||||
|
var received = 0;
|
||||||
|
|
||||||
|
// Create udp socket and start listening.
|
||||||
|
var socket = dgram.createSocket('udp4');
|
||||||
|
|
||||||
|
socket.on('message', function(data, info) {
|
||||||
|
received++;
|
||||||
|
|
||||||
|
// Every 10 messages, notify the master.
|
||||||
|
if (received == PACKETS_PER_WORKER) {
|
||||||
|
process.send({received: received});
|
||||||
|
process.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.bind(common.PORT);
|
||||||
|
}
|
81
test/simple/test-cluster-dgram-2.js
Normal file
81
test/simple/test-cluster-dgram-2.js
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the
|
||||||
|
// "Software"), to deal in the Software without restriction, including
|
||||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||||
|
// following condonitions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included
|
||||||
|
// in all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var NUM_WORKERS = 4;
|
||||||
|
var PACKETS_PER_WORKER = 10;
|
||||||
|
|
||||||
|
var assert = require('assert');
|
||||||
|
var cluster = require('cluster');
|
||||||
|
var common = require('../common');
|
||||||
|
var dgram = require('dgram');
|
||||||
|
|
||||||
|
|
||||||
|
if (process.platform === 'win32') {
|
||||||
|
console.warn("dgram clustering is currently not supported on windows.");
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cluster.isMaster)
|
||||||
|
master();
|
||||||
|
else
|
||||||
|
worker();
|
||||||
|
|
||||||
|
|
||||||
|
function master() {
|
||||||
|
var i;
|
||||||
|
var received = 0;
|
||||||
|
|
||||||
|
// Start listening on a socket.
|
||||||
|
var socket = dgram.createSocket('udp4');
|
||||||
|
socket.bind(common.PORT);
|
||||||
|
|
||||||
|
// Disconnect workers when the expected number of messages have been
|
||||||
|
// received.
|
||||||
|
socket.on('message', function(data, info) {
|
||||||
|
received++;
|
||||||
|
|
||||||
|
if (received == PACKETS_PER_WORKER * NUM_WORKERS) {
|
||||||
|
console.log('master received %d packets', received);
|
||||||
|
|
||||||
|
// Close the socket.
|
||||||
|
socket.close();
|
||||||
|
|
||||||
|
// Disconnect all workers.
|
||||||
|
cluster.disconnect();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fork workers.
|
||||||
|
for (var i = 0; i < NUM_WORKERS; i++)
|
||||||
|
cluster.fork();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function worker() {
|
||||||
|
// Create udp socket and send packets to master.
|
||||||
|
var socket = dgram.createSocket('udp4');
|
||||||
|
var buf = new Buffer('hello world');
|
||||||
|
|
||||||
|
for (var i = 0; i < PACKETS_PER_WORKER; i++)
|
||||||
|
socket.send(buf, 0, buf.length, common.PORT, '127.0.0.1');
|
||||||
|
|
||||||
|
console.log('worker %d sent %d packets', cluster.worker.id, PACKETS_PER_WORKER);
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user