Fix buffer bug, implement tcp recv
This commit is contained in:
parent
0ecd0fa598
commit
1da15d623e
@ -39,7 +39,7 @@ static inline struct buffer* buffer_root(buffer *buffer) {
|
||||
/* Determines the absolute position for a relative offset */
|
||||
static inline size_t buffer_abs_off(buffer *buffer, size_t off) {
|
||||
struct buffer *root = buffer_root(buffer);
|
||||
off += root->offset;
|
||||
off += buffer->offset;
|
||||
return MIN(root->length, off);
|
||||
}
|
||||
|
||||
|
@ -13,6 +13,13 @@
|
||||
#include <fcntl.h>
|
||||
#include <arpa/inet.h> /* inet_pton */
|
||||
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/tcp.h>
|
||||
|
||||
#include <sys/ioctl.h>
|
||||
#include <linux/sockios.h>
|
||||
|
||||
|
||||
#include <errno.h>
|
||||
|
||||
namespace node {
|
||||
@ -427,7 +434,7 @@ static Handle<Value> Read(const Arguments& args) {
|
||||
return ThrowException(ErrnoException(errno, "read"));
|
||||
}
|
||||
|
||||
return Integer::New(bytes_read);
|
||||
return scope.Close(Integer::New(bytes_read));
|
||||
}
|
||||
|
||||
// var bytesWritten = t.write(fd, buffer, offset, length);
|
||||
@ -470,9 +477,28 @@ static Handle<Value> Write(const Arguments& args) {
|
||||
return ThrowException(ErrnoException(errno, "write"));
|
||||
}
|
||||
|
||||
return Integer::New(written);
|
||||
return scope.Close(Integer::New(written));
|
||||
}
|
||||
|
||||
|
||||
// Probably only works for Linux TCP sockets?
|
||||
// Returns the amount of data on the read queue.
|
||||
static Handle<Value> ToRead(const Arguments& args) {
|
||||
HandleScope scope;
|
||||
|
||||
FD_ARG(args[0])
|
||||
|
||||
int value;
|
||||
int r = ioctl(fd, SIOCINQ, &value);
|
||||
|
||||
if (r < 0) {
|
||||
return ThrowException(ErrnoException(errno, "ioctl"));
|
||||
}
|
||||
|
||||
return scope.Close(Integer::New(value));
|
||||
}
|
||||
|
||||
|
||||
void InitNet2(Handle<Object> target) {
|
||||
HandleScope scope;
|
||||
|
||||
@ -490,6 +516,7 @@ void InitNet2(Handle<Object> target) {
|
||||
NODE_SET_METHOD(target, "listen", Listen);
|
||||
NODE_SET_METHOD(target, "accept", Accept);
|
||||
NODE_SET_METHOD(target, "getSocketError", GetSocketError);
|
||||
NODE_SET_METHOD(target, "toRead", ToRead);
|
||||
|
||||
|
||||
target->Set(String::NewSymbol("EINPROGRESS"), Integer::New(EINPROGRESS));
|
||||
|
97
tcp.js
97
tcp.js
@ -12,18 +12,47 @@ var listen = process.listen;
|
||||
var accept = process.accept;
|
||||
var close = process.close;
|
||||
var shutdown = process.shutdown;
|
||||
var read = process.read;
|
||||
var write = process.write;
|
||||
var toRead = process.toRead;
|
||||
|
||||
var Peer = function (peerInfo) {
|
||||
process.EventEmitter.call();
|
||||
|
||||
var self = this;
|
||||
|
||||
self.fd = peerInfo.fd;
|
||||
self.remoteAddress = peerInfo.remoteAddress;
|
||||
self.remotePort = peerInfo.remotePort;
|
||||
process.mixin(self, peerInfo);
|
||||
|
||||
// Allocated on demand.
|
||||
self.recvBuffer = null;
|
||||
|
||||
self.readWatcher = new process.IOWatcher(function () {
|
||||
debug(self.fd + " readable");
|
||||
debug("\n" + self.fd + " readable");
|
||||
|
||||
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
||||
// most of the recvBuffer, allocate a new one.
|
||||
if (!self.recvBuffer ||
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed < 128) {
|
||||
self._allocateNewRecvBuf();
|
||||
}
|
||||
|
||||
debug("recvBufferBytesUsed " + self.recvBufferBytesUsed);
|
||||
var bytesRead = read(self.fd,
|
||||
self.recvBuffer,
|
||||
self.recvBufferBytesUsed,
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed);
|
||||
debug("bytesRead " + bytesRead + "\n");
|
||||
|
||||
if (bytesRead == 0) {
|
||||
self.readable = false;
|
||||
self.readWatcher.stop();
|
||||
self.emit("eof");
|
||||
} else {
|
||||
var slice = self.recvBuffer.slice(self.recvBufferBytesUsed,
|
||||
self.recvBufferBytesUsed + bytesRead);
|
||||
self.recvBufferBytesUsed += bytesRead;
|
||||
self.emit("receive", slice);
|
||||
}
|
||||
});
|
||||
self.readWatcher.set(self.fd, true, false);
|
||||
self.readWatcher.start();
|
||||
@ -35,9 +64,36 @@ var Peer = function (peerInfo) {
|
||||
|
||||
self.readable = true;
|
||||
self.writable = true;
|
||||
|
||||
self._out = [];
|
||||
};
|
||||
process.inherits(Peer, process.EventEmitter);
|
||||
|
||||
Peer.prototype._allocateNewRecvBuf = function () {
|
||||
var self = this;
|
||||
|
||||
var newBufferSize = 1024; // TODO make this adjustable from user API
|
||||
|
||||
if (toRead) {
|
||||
// Note: only Linux supports toRead().
|
||||
// Is the extra system call even worth it?
|
||||
var bytesToRead = toRead(self.fd);
|
||||
if (bytesToRead > 1024) {
|
||||
newBufferSize = 4*1024;
|
||||
} else if (bytesToRead == 0) {
|
||||
// Probably getting an EOF - so let's not allocate so much.
|
||||
if (self.recvBuffer &&
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed > 0) {
|
||||
return; // just recv the eof on the old buf.
|
||||
}
|
||||
newBufferSize = 128;
|
||||
}
|
||||
}
|
||||
|
||||
self.recvBuffer = new process.Buffer(newBufferSize);
|
||||
self.recvBufferBytesUsed = 0;
|
||||
};
|
||||
|
||||
Peer.prototype.close = function () {
|
||||
this.readable = false;
|
||||
this.writable = false;
|
||||
@ -49,8 +105,6 @@ Peer.prototype.close = function () {
|
||||
this.fd = null;
|
||||
};
|
||||
|
||||
|
||||
|
||||
var Server = function (listener) {
|
||||
var self = this;
|
||||
|
||||
@ -73,14 +127,26 @@ var Server = function (listener) {
|
||||
};
|
||||
process.inherits(Server, process.EventEmitter);
|
||||
|
||||
Server.prototype.listen = function (port, host) {
|
||||
Server.prototype.listen = function () {
|
||||
var self = this;
|
||||
|
||||
if (self.fd) throw new Error("Already running");
|
||||
|
||||
self.fd = process.socket("TCP");
|
||||
// TODO dns resolution
|
||||
bind(self.fd, port, host);
|
||||
if (typeof(arguments[0]) == "string" && arguments.length == 1) {
|
||||
// the first argument specifies a path
|
||||
self.fd = process.socket("UNIX");
|
||||
// TODO unlink sockfile if exists?
|
||||
// if (lstat(SOCKFILE, &tstat) == 0) {
|
||||
// assert(S_ISSOCK(tstat.st_mode));
|
||||
// unlink(SOCKFILE);
|
||||
// }
|
||||
bind(self.fd, arguments[0]);
|
||||
} else {
|
||||
// the first argument is the port, the second an IP
|
||||
self.fd = process.socket("TCP");
|
||||
// TODO dns resolution on arguments[1]
|
||||
bind(self.fd, arguments[0], arguments[1]);
|
||||
}
|
||||
listen(self.fd, 128); // TODO configurable backlog
|
||||
|
||||
self.watcher.set(self.fd, true, false);
|
||||
@ -97,7 +163,12 @@ Server.prototype.close = function () {
|
||||
|
||||
///////////////////////////////////////////////////////
|
||||
|
||||
process.Buffer.prototype.toString = function () {
|
||||
return this.utf8Slice(0, this.length);
|
||||
};
|
||||
|
||||
var sys = require("sys");
|
||||
|
||||
var server = new Server(function (peer) {
|
||||
sys.puts("connection (" + peer.fd + "): "
|
||||
+ peer.remoteAddress
|
||||
@ -105,7 +176,11 @@ var server = new Server(function (peer) {
|
||||
+ peer.remotePort
|
||||
);
|
||||
sys.puts("server fd: " + server.fd);
|
||||
peer.close();
|
||||
|
||||
peer.addListener("receive", function (b) {
|
||||
sys.puts("recv (" + b.length + "): " + b);
|
||||
});
|
||||
});
|
||||
//server.listen(8000);
|
||||
server.listen(8000);
|
||||
sys.puts("server fd: " + server.fd);
|
||||
|
Loading…
x
Reference in New Issue
Block a user