Send and receive file descriptors through net.Stream.
a) create a layer of indirection in net.Stream to allow swapping in different read/write implementations and b) emit an 'fd' event when file descriptors are received over a UNIX pipe, as finally as a tangential benefit c) remove a bunch of conditionals from the primary codepaths for ease-of-reading.
This commit is contained in:
parent
55a6f01732
commit
8f0b4e9111
334
lib/net.js
334
lib/net.js
@ -42,6 +42,8 @@ var setKeepAlive= binding.setKeepAlive;
|
||||
var socketError = binding.socketError;
|
||||
var getsockname = binding.getsockname;
|
||||
var errnoException = binding.errnoException;
|
||||
var sendMsg = binding.sendMsg;
|
||||
var recvMsg = binding.recvMsg;
|
||||
var EINPROGRESS = binding.EINPROGRESS;
|
||||
var ENOENT = binding.ENOENT;
|
||||
var EMFILE = binding.EMFILE;
|
||||
@ -271,6 +273,182 @@ function _doFlush () {
|
||||
}
|
||||
}
|
||||
|
||||
function setImplmentationMethods (self) {
|
||||
function noData(buf, off, len) {
|
||||
return !buf ||
|
||||
(off != undefined && off >= buf.length) ||
|
||||
(len == 0);
|
||||
};
|
||||
|
||||
if (self.type == 'unix') {
|
||||
self._writeImpl = function(buf, off, len, fd, flags) {
|
||||
// Detect and disallow zero-byte writes wth an attached file
|
||||
// descriptor. This is an implementation limitation of sendmsg(2).
|
||||
if (fd && noData(buf, off, len)) {
|
||||
throw new Error('File descriptors can only be written with data');
|
||||
}
|
||||
|
||||
return sendMsg(self.fd, buf, off, len, fd, flags);
|
||||
};
|
||||
|
||||
self._readImpl = function(buf, off, len, calledByIOWatcher) {
|
||||
var bytesRead = recvMsg(self.fd, buf, off, len);
|
||||
|
||||
// Do not emit this in the same stack, otherwise we risk corrupting
|
||||
// our buffer pool which is full of read data, but has not had
|
||||
// had its pointers updated just yet.
|
||||
if (recvMsg.fd !== null) {
|
||||
process.nextTick(function() {
|
||||
self.emit('fd', recvMsg.fd);
|
||||
});
|
||||
}
|
||||
|
||||
return bytesRead;
|
||||
};
|
||||
} else {
|
||||
self._writeImpl = function(buf, off, len, fd, flags) {
|
||||
// XXX: TLS support requires that 0-byte writes get processed
|
||||
// by the kernel for some reason. Otherwise, we'd just
|
||||
// fast-path return here.
|
||||
|
||||
return write(self.fd, buf, off, len, fd, flags);
|
||||
};
|
||||
|
||||
self._readImpl = function(buf, off, len, calledByIOWatcher) {
|
||||
return read(self.fd, buf, off, len);
|
||||
};
|
||||
}
|
||||
|
||||
self._shutdownImpl = function() {
|
||||
shutdown(self.fd, 'write')
|
||||
};
|
||||
|
||||
if (self.secure) {
|
||||
var oldWrite = self._writeImpl;
|
||||
self._writeImpl = function(buf, off, len, fd, flags) {
|
||||
assert(buf);
|
||||
assert(self.secure);
|
||||
|
||||
var bytesWritten = self.secureStream.writeInject(buf, off, len);
|
||||
|
||||
if (!securePool) {
|
||||
allocNewSecurePool();
|
||||
}
|
||||
|
||||
var secureLen = self.secureStream.writeExtract(
|
||||
securePool, 0, securePool.length
|
||||
);
|
||||
|
||||
if (secureLen == -1) {
|
||||
// Check our read again for secure handshake
|
||||
self._readWatcher.callback();
|
||||
} else {
|
||||
oldWrite(securePool, 0, secureLen, fd, flags);
|
||||
}
|
||||
|
||||
if (!self.secureEstablished && self.secureStream.isInitFinished()) {
|
||||
self.secureEstablished = true;
|
||||
|
||||
if (self._events && self._events['secure']) {
|
||||
self.emit('secure');
|
||||
}
|
||||
}
|
||||
|
||||
return bytesWritten;
|
||||
};
|
||||
|
||||
var oldRead = self._readImpl;
|
||||
self._readImpl = function(buf, off, len, calledByIOWatcher) {
|
||||
assert(self.secure);
|
||||
|
||||
var bytesRead = 0;
|
||||
var secureBytesRead = null;
|
||||
|
||||
if (!securePool) {
|
||||
allocNewSecurePool();
|
||||
}
|
||||
|
||||
if (calledByIOWatcher) {
|
||||
secureBytesRead = oldRead(securePool, 0, securePool.length);
|
||||
self.secureStream.readInject(securePool, 0, secureBytesRead);
|
||||
}
|
||||
|
||||
var chunkBytes;
|
||||
do {
|
||||
chunkBytes = self.secureStream.readExtract(
|
||||
pool,
|
||||
pool.used + bytesRead,
|
||||
pool.length - pool.used - bytesRead
|
||||
);
|
||||
|
||||
bytesRead += chunkBytes;
|
||||
} while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
|
||||
|
||||
if (bytesRead == 0 && !calledByIOWatcher) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (self.secureStream.readPending()) {
|
||||
process.nextTick(function () {
|
||||
if(self._readWatcher)
|
||||
self._readWatcher.callback();
|
||||
});
|
||||
}
|
||||
|
||||
if (!self.secureEstablished) {
|
||||
if (self.secureStream.isInitFinished()) {
|
||||
self.secureEstablished = true;
|
||||
if (self._events && self._events['secure']) {
|
||||
self.emit('secure');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (calledByIOWatcher && secureBytesRead === null && !self.server) {
|
||||
// Client needs to write as part of handshake
|
||||
self._writeWatcher.start();
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (bytesRead == 0 && secureBytesRead > 0) {
|
||||
// Deal with SSL handshake
|
||||
if (self.server) {
|
||||
self._checkForSecureHandshake();
|
||||
} else {
|
||||
if (self.secureEstablised) {
|
||||
self.flush();
|
||||
} else {
|
||||
self._checkForSecureHandshake();
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return bytesRead;
|
||||
};
|
||||
|
||||
var oldShutdown = self._shutdownImpl;
|
||||
self._shutdownImpl = function() {
|
||||
self.secureStream.shutdown();
|
||||
|
||||
if (!securePool) {
|
||||
allocNewSecurePool();
|
||||
}
|
||||
|
||||
var secureLen = self.secureStream.writeExtract(
|
||||
securePool, 0, securePool.length
|
||||
);
|
||||
|
||||
try {
|
||||
oldWrite(securePool, 0, secureLen);
|
||||
} catch (e) { }
|
||||
|
||||
oldShutdown();
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
function initStream (self) {
|
||||
self._readWatcher = ioWatchers.alloc();
|
||||
self._readWatcher.callback = function () {
|
||||
@ -285,68 +463,19 @@ function initStream (self) {
|
||||
|
||||
//debug('pool.used ' + pool.used);
|
||||
var bytesRead;
|
||||
var secureBytesRead;
|
||||
|
||||
try {
|
||||
if (self.secure) {
|
||||
if (!securePool) allocNewSecurePool();
|
||||
var calledByNextTick = (arguments.length == 0); // IOWatcher always passes arguments
|
||||
if (!calledByNextTick) {
|
||||
secureBytesRead = read(self.fd, securePool, 0, securePool.length);
|
||||
self.secureStream.readInject(securePool, 0, secureBytesRead);
|
||||
}
|
||||
var chunkBytes;
|
||||
bytesRead = 0;
|
||||
do {
|
||||
chunkBytes = self.secureStream.readExtract(pool,
|
||||
pool.used + bytesRead,
|
||||
pool.length - pool.used - bytesRead);
|
||||
bytesRead += chunkBytes;
|
||||
} while ((chunkBytes > 0) && (pool.used + bytesRead < pool.length));
|
||||
if (bytesRead == 0 && calledByNextTick)
|
||||
return;
|
||||
if (self.secureStream.readPending()) {
|
||||
process.nextTick(function () {
|
||||
if(self._readWatcher)
|
||||
self._readWatcher.callback();
|
||||
});
|
||||
}
|
||||
if (!self.secureEstablished) {
|
||||
if (self.secureStream.isInitFinished()) {
|
||||
self.secureEstablished = true;
|
||||
if (self._events && self._events['secure']) self.emit('secure');
|
||||
}
|
||||
}
|
||||
if (secureBytesRead === null && !self.server) {
|
||||
// Client needs to write as part of handshake
|
||||
self._writeWatcher.start();
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
bytesRead = read(self.fd,
|
||||
pool,
|
||||
pool.used,
|
||||
pool.length - pool.used);
|
||||
}
|
||||
bytesRead = self._readImpl(pool, pool.used, pool.length - pool.used, (arguments.length > 0));
|
||||
} catch (e) {
|
||||
self.destroy(e);
|
||||
return;
|
||||
}
|
||||
|
||||
//debug('bytesRead ' + bytesRead + '\n');
|
||||
// Note that some _readImpl() implementations return -1 bytes
|
||||
// read as an indication not to do any processing on the result
|
||||
// (but not an error).
|
||||
|
||||
if (self.secure && bytesRead == 0 && secureBytesRead > 0) {
|
||||
// Deal with SSL handshake
|
||||
if (self.server) {
|
||||
self._checkForSecureHandshake();
|
||||
} else {
|
||||
if (self.secureEstablised) {
|
||||
self.flush();
|
||||
} else {
|
||||
self._checkForSecureHandshake();
|
||||
}
|
||||
}
|
||||
} else if (bytesRead === 0) {
|
||||
if (bytesRead === 0) {
|
||||
self.readable = false;
|
||||
self._readWatcher.stop();
|
||||
|
||||
@ -384,6 +513,7 @@ function initStream (self) {
|
||||
// Queue of buffers and string that need to be written to socket.
|
||||
self._writeQueue = [];
|
||||
self._writeQueueEncoding = [];
|
||||
self._writeQueueFD = [];
|
||||
|
||||
self._writeWatcher = ioWatchers.alloc();
|
||||
self._writeWatcher.socket = self;
|
||||
@ -391,14 +521,17 @@ function initStream (self) {
|
||||
self.writable = false;
|
||||
}
|
||||
|
||||
function Stream (fd) {
|
||||
function Stream (fd, type) {
|
||||
events.EventEmitter.call(this);
|
||||
|
||||
this.fd = null;
|
||||
this.type = null;
|
||||
this.secure = false;
|
||||
|
||||
if (parseInt(fd) >= 0) {
|
||||
this.open(fd);
|
||||
this.open(fd, type);
|
||||
} else {
|
||||
setImplmentationMethods(this);
|
||||
}
|
||||
};
|
||||
sys.inherits(Stream, events.EventEmitter);
|
||||
@ -423,6 +556,8 @@ Stream.prototype.setSecure = function(credentials) {
|
||||
}
|
||||
this.secureStream = new SecureStream(this.credentials.context, this.server ? 1 : 0, this.credentials.shouldVerify ? 1 : 0);
|
||||
|
||||
setImplmentationMethods(this);
|
||||
|
||||
if (!this.server) {
|
||||
// If client, trigger handshake
|
||||
this._checkForSecureHandshake();
|
||||
@ -439,6 +574,10 @@ Stream.prototype.verifyPeer = function() {
|
||||
|
||||
|
||||
Stream.prototype._checkForSecureHandshake = function() {
|
||||
if (!this.writable) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do an empty write to see if we need to write out as part of handshake
|
||||
if (!emptyBuffer) allocEmptyBuffer();
|
||||
this.write(emptyBuffer);
|
||||
@ -461,13 +600,15 @@ Stream.prototype.getCipher = function() {
|
||||
}
|
||||
|
||||
|
||||
Stream.prototype.open = function (fd) {
|
||||
Stream.prototype.open = function (fd, type) {
|
||||
initStream(this);
|
||||
|
||||
this.fd = fd;
|
||||
|
||||
this.type = type || null;
|
||||
this.readable = true;
|
||||
|
||||
setImplmentationMethods(this);
|
||||
|
||||
this._writeWatcher.set(this.fd, false, true);
|
||||
this.writable = true;
|
||||
}
|
||||
@ -504,7 +645,9 @@ Object.defineProperty(Stream.prototype, 'readyState', {
|
||||
// Returns true if all the data was flushed to socket. Returns false if
|
||||
// something was queued. If data was queued, then the "drain" event will
|
||||
// signal when it has been finally flushed to socket.
|
||||
Stream.prototype.write = function (data, encoding) {
|
||||
//
|
||||
// XXX: Caller cannot close the given fd until the stream has drained.
|
||||
Stream.prototype.write = function (data, encoding, fd) {
|
||||
if (this._writeQueue && this._writeQueue.length) {
|
||||
// Slow. There is already a write queue, so let's append to it.
|
||||
if (this._writeQueueLast() === END_OF_FILE) {
|
||||
@ -519,26 +662,20 @@ Stream.prototype.write = function (data, encoding) {
|
||||
this._writeQueue.push(data);
|
||||
this._writeQueueEncoding.push(encoding);
|
||||
}
|
||||
|
||||
if (fd != undefined) {
|
||||
this._writeQueueFD.push(fd);
|
||||
}
|
||||
|
||||
return false;
|
||||
} else {
|
||||
// Fast.
|
||||
// The most common case. There is no write queue. Just push the data
|
||||
// directly to the socket.
|
||||
return this._writeOut(data, encoding);
|
||||
return this._writeOut(data, encoding, fd);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Stream.prototype._shutdownSecure = function () {
|
||||
this.secureStream.shutdown();
|
||||
if (!securePool) allocNewSecurePool();
|
||||
var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
|
||||
try {
|
||||
var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
// Directly writes the data to socket.
|
||||
//
|
||||
// Steps:
|
||||
@ -547,12 +684,10 @@ Stream.prototype._shutdownSecure = function () {
|
||||
// 2. Write data to socket. Return true if flushed.
|
||||
// 3. Slice out remaining
|
||||
// 4. Unshift remaining onto _writeQueue. Return false.
|
||||
Stream.prototype._writeOut = function (data, encoding) {
|
||||
Stream.prototype._writeOut = function (data, encoding, fd) {
|
||||
if (!this.writable) {
|
||||
if (this.secure) return false;
|
||||
else throw new Error('Stream is not writable');
|
||||
throw new Error('Stream is not writable');
|
||||
}
|
||||
if (!this.secure && data.length == 0) return true;
|
||||
|
||||
var buffer, off, len;
|
||||
var bytesWritten, charsWritten;
|
||||
@ -581,7 +716,9 @@ Stream.prototype._writeOut = function (data, encoding) {
|
||||
charsWritten = bytesWritten;
|
||||
}
|
||||
|
||||
if (encoding) assert(bytesWritten > 0);
|
||||
if (encoding && data.length > 0) {
|
||||
assert(bytesWritten > 0);
|
||||
}
|
||||
|
||||
buffer = pool;
|
||||
len = bytesWritten;
|
||||
@ -602,30 +739,7 @@ Stream.prototype._writeOut = function (data, encoding) {
|
||||
}
|
||||
|
||||
try {
|
||||
if (this.secure) {
|
||||
if (!buffer) return false;
|
||||
bytesWritten = this.secureStream.writeInject(buffer, off, len);
|
||||
if (!securePool) allocNewSecurePool();
|
||||
var secureLen = this.secureStream.writeExtract(securePool, 0, securePool.length);
|
||||
if (secureLen==-1) {
|
||||
// Check our read again for secure handshake
|
||||
this._readWatcher.callback();
|
||||
secureBytesWritten = 0;
|
||||
} else {
|
||||
var secureBytesWritten = write(this.fd, securePool, 0, secureLen);
|
||||
}
|
||||
if (!this.secureEstablished && this.secureStream.isInitFinished()) {
|
||||
this.secureEstablished = true;
|
||||
try {
|
||||
if (this._events && this._events['secure']) this.emit('secure');
|
||||
} catch (e) {
|
||||
this.destroy(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bytesWritten = write(this.fd, buffer, off, len);
|
||||
}
|
||||
bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
|
||||
} catch (e) {
|
||||
this.destroy(e);
|
||||
return false;
|
||||
@ -665,6 +779,11 @@ Stream.prototype._writeOut = function (data, encoding) {
|
||||
this._writeQueue.unshift(leftOver);
|
||||
this._writeQueueEncoding.unshift(null);
|
||||
|
||||
// If didn't successfully write any bytes, enqueue our fd and try again
|
||||
if (!bytesWritten) {
|
||||
this._writeQueueFD.unshift(fd);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -675,13 +794,14 @@ Stream.prototype.flush = function () {
|
||||
while (this._writeQueue && this._writeQueue.length) {
|
||||
var data = this._writeQueue.shift();
|
||||
var encoding = this._writeQueueEncoding.shift();
|
||||
var fd = this._writeQueueFD.shift();
|
||||
|
||||
if (data === END_OF_FILE) {
|
||||
this._shutdown();
|
||||
return true;
|
||||
}
|
||||
|
||||
var flushed = this._writeOut(data,encoding);
|
||||
var flushed = this._writeOut(data,encoding,fd);
|
||||
if (!flushed) return false;
|
||||
}
|
||||
if (this._writeWatcher) this._writeWatcher.stop();
|
||||
@ -788,6 +908,8 @@ Stream.prototype.connect = function () {
|
||||
// UNIX
|
||||
self.fd = socket('unix');
|
||||
self.type = 'unix';
|
||||
|
||||
setImplmentationMethods(this);
|
||||
doConnect(self, arguments[0]);
|
||||
}
|
||||
};
|
||||
@ -890,14 +1012,10 @@ Stream.prototype._shutdown = function () {
|
||||
// readable and writable
|
||||
this.writable = false;
|
||||
|
||||
if (this.secure) {
|
||||
this._shutdownSecure();
|
||||
}
|
||||
try {
|
||||
shutdown(this.fd, 'write')
|
||||
this._shutdownImpl();
|
||||
} catch (e) {
|
||||
this.destroy(e);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// writable but not readable
|
||||
@ -951,7 +1069,7 @@ function Server (listener) {
|
||||
}
|
||||
if (!peerInfo) return;
|
||||
|
||||
var s = new Stream(peerInfo.fd);
|
||||
var s = new Stream(peerInfo.fd, self.type);
|
||||
s.remoteAddress = peerInfo.address;
|
||||
s.remotePort = peerInfo.port;
|
||||
s.type = self.type;
|
||||
@ -1078,3 +1196,5 @@ Server.prototype.close = function () {
|
||||
self.emit("close");
|
||||
}
|
||||
};
|
||||
|
||||
// vim:ts=2 sw=2
|
||||
|
149
src/node_net.cc
149
src/node_net.cc
@ -554,8 +554,6 @@ static Handle<Value> RecvMsg(const Arguments& args) {
|
||||
String::New("Length is extends beyond buffer")));
|
||||
}
|
||||
|
||||
int received_fd;
|
||||
|
||||
struct iovec iov[1];
|
||||
iov[0].iov_base = (char*)buffer->data() + off;
|
||||
iov[0].iov_len = len;
|
||||
@ -583,15 +581,36 @@ static Handle<Value> RecvMsg(const Arguments& args) {
|
||||
// that the wrapper can pick up. Since we're single threaded, this is not
|
||||
// a problem - just make sure to copy out that variable before the next
|
||||
// call to recvmsg().
|
||||
//
|
||||
// XXX: Some implementations can send multiple file descriptors in a
|
||||
// single message. We should be using CMSG_NXTHDR() to walk the
|
||||
// chain to get at them all. This would require changing the
|
||||
// API to hand these back up the caller, is a pain.
|
||||
|
||||
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||
if (cmsg && cmsg->cmsg_type == SCM_RIGHTS) {
|
||||
received_fd = *(int *) CMSG_DATA(cmsg);
|
||||
recv_msg_template->GetFunction()->Set(fd_symbol, Integer::New(received_fd));
|
||||
} else {
|
||||
recv_msg_template->GetFunction()->Set(fd_symbol, Null());
|
||||
int received_fd = -1;
|
||||
for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||
msg.msg_controllen > 0 && cmsg != NULL;
|
||||
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
||||
if (cmsg->cmsg_type == SCM_RIGHTS) {
|
||||
if (received_fd != -1) {
|
||||
fprintf(stderr, "ignoring extra FD received: %d\n", received_fd);
|
||||
}
|
||||
|
||||
received_fd = *(int *) CMSG_DATA(cmsg);
|
||||
} else {
|
||||
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
|
||||
cmsg->cmsg_type
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
recv_msg_template->GetFunction()->Set(
|
||||
fd_symbol,
|
||||
(received_fd != -1) ?
|
||||
Integer::New(received_fd) :
|
||||
Null()
|
||||
);
|
||||
|
||||
return scope.Close(Integer::New(bytes_read));
|
||||
}
|
||||
|
||||
@ -640,49 +659,115 @@ static Handle<Value> Write(const Arguments& args) {
|
||||
}
|
||||
|
||||
|
||||
// var bytesWritten = t.sendFD(self.fd)
|
||||
// returns null on EAGAIN or EINTR, raises an exception on all other errors
|
||||
static Handle<Value> SendFD(const Arguments& args) {
|
||||
// var bytes = sendmsg(fd, buf, off, len, fd, flags);
|
||||
//
|
||||
// Write a buffer with optional offset and length to the given file
|
||||
// descriptor. Note that we refuse to send 0 bytes.
|
||||
//
|
||||
// The 'fd' parameter is a numerical file descriptor, or the undefined value
|
||||
// to send none.
|
||||
//
|
||||
// The 'flags' parameter is a number representing a bitmask of MSG_* values.
|
||||
// This is passed directly to sendmsg().
|
||||
//
|
||||
// Returns null on EAGAIN or EINTR, raises an exception on all other errors
|
||||
static Handle<Value> SendMsg(const Arguments& args) {
|
||||
HandleScope scope;
|
||||
|
||||
struct iovec iov;
|
||||
|
||||
if (args.Length() < 2) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("Takes 2 parameters")));
|
||||
}
|
||||
|
||||
// The first argument should be a file descriptor
|
||||
FD_ARG(args[0])
|
||||
|
||||
// TODO: make sure fd is a unix domain socket?
|
||||
|
||||
if (!args[1]->IsInt32()) {
|
||||
// Grab the actul data to be written, stuffing it into iov
|
||||
if (!Buffer::HasInstance(args[1])) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("FD to send is not an integer")));
|
||||
String::New("Expected either a string or a buffer")));
|
||||
}
|
||||
|
||||
int fd_to_send = args[1]->Int32Value();
|
||||
Buffer *buf = ObjectWrap::Unwrap<Buffer>(args[1]->ToObject());
|
||||
|
||||
size_t offset = 0;
|
||||
if (args.Length() >= 3 && !args[2]->IsUndefined()) {
|
||||
if (!args[2]->IsUint32()) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("Expected unsigned integer for offset")));
|
||||
}
|
||||
|
||||
offset = args[2]->Uint32Value();
|
||||
if (offset >= buf->length()) {
|
||||
return ThrowException(Exception::Error(
|
||||
String::New("Offset into buffer too large")));
|
||||
}
|
||||
}
|
||||
|
||||
size_t length = buf->length() - offset;
|
||||
if (args.Length() >= 4 && !args[3]->IsUndefined()) {
|
||||
if (!args[3]->IsUint32()) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("Expected unsigned integer for length")));
|
||||
}
|
||||
|
||||
length = args[3]->Uint32Value();
|
||||
if (offset + length > buf->length()) {
|
||||
return ThrowException(Exception::Error(
|
||||
String::New("offset + length beyond buffer length")));
|
||||
}
|
||||
}
|
||||
|
||||
iov.iov_base = buf->data() + offset;
|
||||
iov.iov_len = length;
|
||||
|
||||
int fd_to_send = -1;
|
||||
if (args.Length() >= 5 && !args[4]->IsUndefined()) {
|
||||
if (!args[4]->IsUint32()) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("Expected unsigned integer for a file descriptor")));
|
||||
}
|
||||
|
||||
fd_to_send = args[4]->Uint32Value();
|
||||
}
|
||||
|
||||
int flags = 0;
|
||||
if (args.Length() >= 6 && !args[5]->IsUndefined()) {
|
||||
if (!args[5]->IsUint32()) {
|
||||
return ThrowException(Exception::TypeError(
|
||||
String::New("Expected unsigned integer for a flags argument")));
|
||||
}
|
||||
|
||||
flags = args[5]->Uint32Value();
|
||||
}
|
||||
|
||||
struct msghdr msg;
|
||||
struct iovec iov[1];
|
||||
char control_msg[CMSG_SPACE(sizeof(fd_to_send))];
|
||||
struct cmsghdr *cmsg;
|
||||
static char dummy = 'd'; // Need to send at least a byte of data in the message
|
||||
char scratch[64];
|
||||
|
||||
iov[0].iov_base = &dummy;
|
||||
iov[0].iov_len = 1;
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iov = &iov;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
msg.msg_flags = 0;
|
||||
msg.msg_control = (void *) control_msg;
|
||||
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
|
||||
cmsg = CMSG_FIRSTHDR(&msg);
|
||||
cmsg->cmsg_level = SOL_SOCKET;
|
||||
cmsg->cmsg_type = SCM_RIGHTS;
|
||||
cmsg->cmsg_len = msg.msg_controllen;
|
||||
*(int*) CMSG_DATA(cmsg) = fd_to_send;
|
||||
msg.msg_control = NULL;
|
||||
msg.msg_controllen = 0;
|
||||
|
||||
ssize_t written = sendmsg(fd, &msg, 0);
|
||||
if (fd_to_send >= 0) {
|
||||
struct cmsghdr *cmsg;
|
||||
|
||||
msg.msg_control = (void *) scratch;
|
||||
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
|
||||
|
||||
cmsg = CMSG_FIRSTHDR(&msg);
|
||||
cmsg->cmsg_level = SOL_SOCKET;
|
||||
cmsg->cmsg_type = SCM_RIGHTS;
|
||||
cmsg->cmsg_len = msg.msg_controllen;
|
||||
*(int*) CMSG_DATA(cmsg) = fd_to_send;
|
||||
}
|
||||
|
||||
ssize_t written = sendmsg(fd, &msg, flags);
|
||||
|
||||
if (written < 0) {
|
||||
if (errno == EAGAIN || errno == EINTR) return Null();
|
||||
@ -805,7 +890,7 @@ void InitNet(Handle<Object> target) {
|
||||
NODE_SET_METHOD(target, "write", Write);
|
||||
NODE_SET_METHOD(target, "read", Read);
|
||||
|
||||
NODE_SET_METHOD(target, "sendFD", SendFD);
|
||||
NODE_SET_METHOD(target, "sendMsg", SendMsg);
|
||||
|
||||
recv_msg_template =
|
||||
Persistent<FunctionTemplate>::New(FunctionTemplate::New(RecvMsg));
|
||||
|
57
test/fixtures/recvfd.js
vendored
Normal file
57
test/fixtures/recvfd.js
vendored
Normal file
@ -0,0 +1,57 @@
|
||||
// See test/simple/test-sendfd.js for a complete description of what this
|
||||
// script is doing and how it fits into the test as a whole.
|
||||
|
||||
var net = require('net');
|
||||
var sys = require('sys');
|
||||
|
||||
var receivedData = [];
|
||||
var receivedFDs = [];
|
||||
var numSentMessages = 0;
|
||||
|
||||
function processData(s) {
|
||||
if (receivedData.length == 0 || receivedFDs.length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
var fd = receivedFDs.shift();
|
||||
var d = receivedData.shift();
|
||||
|
||||
// Augment our received object before sending it back across the pipe.
|
||||
d.pid = process.pid;
|
||||
|
||||
// Create a stream around the FD that we received and send a serialized
|
||||
// version of our modified object back. Clean up when we're done.
|
||||
var pipeStream = new net.Stream(fd);
|
||||
|
||||
var drainFunc = function() {
|
||||
pipeStream.destroy();
|
||||
|
||||
if (++numSentMessages == 2) {
|
||||
s.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
pipeStream.addListener('drain', drainFunc);
|
||||
pipeStream.resume();
|
||||
|
||||
if (pipeStream.write(JSON.stringify(d) + '\n')) {
|
||||
drainFunc();
|
||||
}
|
||||
};
|
||||
|
||||
// Create a UNIX socket to the path defined by argv[2] and read a file
|
||||
// descriptor and misc data from it.
|
||||
var s = new net.Stream();
|
||||
s.addListener('fd', function(fd) {
|
||||
receivedFDs.unshift(fd);
|
||||
processData(s);
|
||||
});
|
||||
s.addListener('data', function(data) {
|
||||
data.toString('utf8').trim().split('\n').forEach(function(d) {
|
||||
receivedData.unshift(JSON.parse(d));
|
||||
});
|
||||
processData(s);
|
||||
});
|
||||
s.connect(process.argv[2]);
|
||||
|
||||
// vim:ts=2 sw=2 et
|
126
test/simple/test-sendfd.js
Normal file
126
test/simple/test-sendfd.js
Normal file
@ -0,0 +1,126 @@
|
||||
// Test sending and receiving a file descriptor.
|
||||
//
|
||||
// This test is pretty complex. It ends up spawning test/fixtures/recvfd.js
|
||||
// as a child to test desired behavior. What happens is
|
||||
//
|
||||
// 1. Create an in-memory pipe via pipe(2). These two file descriptors
|
||||
// are not visible to any other process, and so make a good test-case
|
||||
// for sharing.
|
||||
// 2. Create a a UNIX socket at SOCK_PATH. When a client connects to this
|
||||
// path, they are sent the write end of the pipe from above.
|
||||
// 3. The client is sent n JSON representations of the DATA variable, each
|
||||
// with a different ordinal. We send these delimited by '\n' strings
|
||||
// so that the receiving end can avoid any coalescing that hapepns
|
||||
// due to the stream nature of the socket (e.g. '{}{}' is not a valid
|
||||
// JSON string).
|
||||
// 4. The child process receives file descriptors and JSON blobs and,
|
||||
// whenever it has at least one of each, writes a modified JSON blob
|
||||
// to the FD. The blob is modified to include the child's process ID.
|
||||
// 5. Once the child process has sent n responses, it closes the write end
|
||||
// of the pipe, which signals to the parent that there is no more data
|
||||
// coming.
|
||||
// 6. The parent listens to the read end of the pipe, accumulating JSON
|
||||
// blobs (again, delimited by '\n') and verifying that a) the 'pid'
|
||||
// attribute belongs to the child and b) the 'ord' field has not been
|
||||
// seen in a response yet. This is intended to ensure that all blobs
|
||||
// sent out have been relayed back to us.
|
||||
|
||||
require('../common');
|
||||
|
||||
var buffer = require('buffer');
|
||||
var child_process = require('child_process');
|
||||
var fs = require('fs');
|
||||
var net = require('net');
|
||||
var netBinding = process.binding('net');
|
||||
var path = require('path');
|
||||
var sys = require('sys');
|
||||
|
||||
var DATA = {
|
||||
'ppid' : process.pid,
|
||||
'ord' : 0
|
||||
};
|
||||
|
||||
var SOCK_PATH = path.join(
|
||||
__dirname,
|
||||
'..',
|
||||
path.basename(__filename, '.js') + '.sock'
|
||||
);
|
||||
|
||||
var logChild = function(d) {
|
||||
if (typeof d == 'object') {
|
||||
d = d.toString();
|
||||
}
|
||||
|
||||
d.split('\n').forEach(function(l) {
|
||||
if (l.length > 0) {
|
||||
sys.debug('CHILD: ' + l);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
// Create a pipe
|
||||
//
|
||||
// We establish a listener on the read end of the pipe so that we can
|
||||
// validate any data sent back by the child. We send the write end of the
|
||||
// pipe to the child and close it off in our process.
|
||||
var pipeFDs = netBinding.pipe();
|
||||
assert.equal(pipeFDs.length, 2);
|
||||
|
||||
var seenOrdinals = [];
|
||||
|
||||
var pipeReadStream = new net.Stream();
|
||||
pipeReadStream.addListener('data', function(data) {
|
||||
data.toString('utf8').trim().split('\n').forEach(function(d) {
|
||||
var rd = JSON.parse(d);
|
||||
|
||||
assert.equal(rd.pid, cpp);
|
||||
assert.equal(seenOrdinals.indexOf(rd.ord), -1);
|
||||
|
||||
seenOrdinals.unshift(rd.ord);
|
||||
});
|
||||
});
|
||||
pipeReadStream.open(pipeFDs[0]);
|
||||
pipeReadStream.resume();
|
||||
|
||||
// Create a UNIX socket at SOCK_PATH and send DATA and the write end
|
||||
// of the pipe to whoever connects.
|
||||
//
|
||||
// We send two messages here, both with the same pipe FD: one string, and
|
||||
// one buffer. We want to make sure that both datatypes are handled
|
||||
// correctly.
|
||||
var srv = net.createServer(function(s) {
|
||||
var str = JSON.stringify(DATA) + '\n';
|
||||
|
||||
DATA.ord = DATA.ord + 1;
|
||||
var buf = new buffer.Buffer(str.length);
|
||||
buf.write(JSON.stringify(DATA) + '\n', 'utf8');
|
||||
|
||||
s.write(str, 'utf8', pipeFDs[1]);
|
||||
if (s.write(buf, undefined, pipeFDs[1])) {
|
||||
netBinding.close(pipeFDs[1]);
|
||||
} else {
|
||||
s.addListener('drain', function() {
|
||||
netBinding.close(pipeFDs[1]);
|
||||
});
|
||||
}
|
||||
});
|
||||
srv.listen(SOCK_PATH);
|
||||
|
||||
// Spawn a child running test/fixtures/recvfd.js
|
||||
var cp = child_process.spawn(process.argv[0],
|
||||
[path.join(fixturesDir, 'recvfd.js'), SOCK_PATH]);
|
||||
|
||||
cp.stdout.addListener('data', logChild);
|
||||
cp.stderr.addListener('data', logChild);
|
||||
|
||||
// When the child exits, clean up and validate its exit status
|
||||
var cpp = cp.pid;
|
||||
cp.addListener('exit', function(code, signal) {
|
||||
srv.close();
|
||||
// fs.unlinkSync(SOCK_PATH);
|
||||
|
||||
assert.equal(code, 0);
|
||||
assert.equal(seenOrdinals.length, 2);
|
||||
});
|
||||
|
||||
// vim:ts=2 sw=2 et
|
Loading…
x
Reference in New Issue
Block a user