Implement stream.send()
This commit is contained in:
parent
1da15d623e
commit
bddd6e9ca3
@ -254,6 +254,90 @@ static Handle<Value> Slice(const Arguments &args) {
|
||||
return scope.Close(slice);
|
||||
}
|
||||
|
||||
|
||||
// var charsWritten = buffer.utf8Write(string, offset, length);
|
||||
static Handle<Value> Utf8Write(const Arguments &args) {
|
||||
HandleScope scope;
|
||||
|
||||
struct buffer *buffer = BufferUnwrap(args.This());
|
||||
|
||||
if (!args[0]->IsString()) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Argument must be a string")));
|
||||
}
|
||||
|
||||
Local<String> s = args[0]->ToString();
|
||||
|
||||
size_t offset = args[1]->Int32Value();
|
||||
|
||||
char *p = buffer_p(buffer, offset);
|
||||
if (buffer_p(buffer, offset) == NULL) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Offset is out of bounds")));
|
||||
}
|
||||
|
||||
size_t toWrite = args[2]->Int32Value();
|
||||
|
||||
if (buffer_remaining(buffer, offset) < toWrite) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Length is out of bounds")));
|
||||
}
|
||||
|
||||
int written = s->WriteUtf8(p, toWrite);
|
||||
|
||||
return scope.Close(Integer::New(written));
|
||||
}
|
||||
|
||||
|
||||
// var charsWritten = buffer.asciiWrite(string, offset, length);
|
||||
static Handle<Value> AsciiWrite(const Arguments &args) {
|
||||
HandleScope scope;
|
||||
|
||||
struct buffer *buffer = BufferUnwrap(args.This());
|
||||
|
||||
if (!args[0]->IsString()) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Argument must be a string")));
|
||||
}
|
||||
|
||||
Local<String> s = args[0]->ToString();
|
||||
|
||||
size_t offset = args[1]->Int32Value();
|
||||
|
||||
char *p = buffer_p(buffer, offset);
|
||||
if (buffer_p(buffer, offset) == NULL) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Offset is out of bounds")));
|
||||
}
|
||||
|
||||
size_t toWrite = args[2]->Int32Value();
|
||||
|
||||
if (buffer_remaining(buffer, offset) < toWrite) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Length is out of bounds")));
|
||||
}
|
||||
|
||||
// TODO Expose the second argument of WriteAscii?
|
||||
// Could avoid doing slices when the string doesn't fit in a buffer. V8
|
||||
// slice() does copy the string, so exposing that argument would help.
|
||||
|
||||
int written = s->WriteAscii(p, 0, toWrite);
|
||||
|
||||
return scope.Close(Integer::New(written));
|
||||
}
|
||||
|
||||
|
||||
static Handle<Value> Utf8Length(const Arguments &args) {
|
||||
HandleScope scope;
|
||||
if (!args[0]->IsString()) {
|
||||
return ThrowException(Exception::TypeError(String::New(
|
||||
"Argument must be a string")));
|
||||
}
|
||||
Local<String> s = args[0]->ToString();
|
||||
return scope.Close(Integer::New(s->Utf8Length()));
|
||||
}
|
||||
|
||||
|
||||
void InitBuffer(Handle<Object> target) {
|
||||
HandleScope scope;
|
||||
|
||||
@ -271,6 +355,11 @@ void InitBuffer(Handle<Object> target) {
|
||||
// copy
|
||||
NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Slice", Utf8Slice);
|
||||
|
||||
NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Write", Utf8Write);
|
||||
NODE_SET_PROTOTYPE_METHOD(constructor_template, "asciiWrite", AsciiWrite);
|
||||
|
||||
NODE_SET_METHOD(constructor_template->GetFunction(), "utf8Length", Utf8Length);
|
||||
|
||||
target->Set(String::NewSymbol("Buffer"), constructor_template->GetFunction());
|
||||
}
|
||||
|
||||
|
@ -425,9 +425,9 @@ static Handle<Value> Read(const Arguments& args) {
|
||||
String::New("Length is extends beyond buffer")));
|
||||
}
|
||||
|
||||
size_t bytes_read = read(fd,
|
||||
buffer_p(buffer, off),
|
||||
buffer_remaining(buffer, off));
|
||||
ssize_t bytes_read = read(fd,
|
||||
buffer_p(buffer, off),
|
||||
buffer_remaining(buffer, off));
|
||||
|
||||
if (bytes_read < 0) {
|
||||
if (errno == EAGAIN || errno == EINTR) return Null();
|
||||
@ -457,20 +457,20 @@ static Handle<Value> Write(const Arguments& args) {
|
||||
struct buffer * buffer = BufferUnwrap(args[1]);
|
||||
|
||||
size_t off = args[2]->Int32Value();
|
||||
if (buffer_p(buffer, off) == NULL) {
|
||||
char *p = buffer_p(buffer, off);
|
||||
if (p == NULL) {
|
||||
return ThrowException(Exception::Error(
|
||||
String::New("Offset is out of bounds")));
|
||||
}
|
||||
|
||||
size_t len = args[3]->Int32Value();
|
||||
if (buffer_remaining(buffer, off) < len) {
|
||||
size_t remaining = buffer_remaining(buffer, off);
|
||||
if (remaining < len) {
|
||||
return ThrowException(Exception::Error(
|
||||
String::New("Length is extends beyond buffer")));
|
||||
}
|
||||
|
||||
size_t written = write(fd,
|
||||
buffer_p(buffer, off),
|
||||
buffer_remaining(buffer, off));
|
||||
ssize_t written = write(fd, p, len);
|
||||
|
||||
if (written < 0) {
|
||||
if (errno == EAGAIN || errno == EINTR) return Null();
|
||||
|
153
tcp.js
153
tcp.js
@ -16,15 +16,18 @@ var read = process.read;
|
||||
var write = process.write;
|
||||
var toRead = process.toRead;
|
||||
|
||||
var Peer = function (peerInfo) {
|
||||
var Stream = function (peerInfo) {
|
||||
process.EventEmitter.call();
|
||||
|
||||
var self = this;
|
||||
|
||||
process.mixin(self, peerInfo);
|
||||
self.fd = peerInfo.fd;
|
||||
self.remoteAddress = peerInfo.remoteAddress;
|
||||
self.remotePort = peerInfo.remotePort;
|
||||
|
||||
// Allocated on demand.
|
||||
self.recvBuffer = null;
|
||||
self.sendQueue = [];
|
||||
|
||||
self.readWatcher = new process.IOWatcher(function () {
|
||||
debug("\n" + self.fd + " readable");
|
||||
@ -32,25 +35,25 @@ var Peer = function (peerInfo) {
|
||||
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
||||
// most of the recvBuffer, allocate a new one.
|
||||
if (!self.recvBuffer ||
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed < 128) {
|
||||
self.recvBuffer.length - self.recvBuffer.used < 128) {
|
||||
self._allocateNewRecvBuf();
|
||||
}
|
||||
|
||||
debug("recvBufferBytesUsed " + self.recvBufferBytesUsed);
|
||||
debug("recvBuffer.used " + self.recvBuffer.used);
|
||||
var bytesRead = read(self.fd,
|
||||
self.recvBuffer,
|
||||
self.recvBufferBytesUsed,
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed);
|
||||
self.recvBuffer.used,
|
||||
self.recvBuffer.length - self.recvBuffer.used);
|
||||
debug("bytesRead " + bytesRead + "\n");
|
||||
|
||||
if (bytesRead == 0) {
|
||||
if (bytesRead == 0) {
|
||||
self.readable = false;
|
||||
self.readWatcher.stop();
|
||||
self.emit("eof");
|
||||
} else {
|
||||
var slice = self.recvBuffer.slice(self.recvBufferBytesUsed,
|
||||
self.recvBufferBytesUsed + bytesRead);
|
||||
self.recvBufferBytesUsed += bytesRead;
|
||||
var slice = self.recvBuffer.slice(self.recvBuffer.used,
|
||||
self.recvBuffer.used + bytesRead);
|
||||
self.recvBuffer.used += bytesRead;
|
||||
self.emit("receive", slice);
|
||||
}
|
||||
});
|
||||
@ -58,18 +61,16 @@ var Peer = function (peerInfo) {
|
||||
self.readWatcher.start();
|
||||
|
||||
self.writeWatcher = new process.IOWatcher(function () {
|
||||
debug(self.fd + " writable");
|
||||
self.flush();
|
||||
});
|
||||
self.writeWatcher.set(self.fd, false, true);
|
||||
|
||||
self.readable = true;
|
||||
self.writable = true;
|
||||
|
||||
self._out = [];
|
||||
};
|
||||
process.inherits(Peer, process.EventEmitter);
|
||||
process.inherits(Stream, process.EventEmitter);
|
||||
|
||||
Peer.prototype._allocateNewRecvBuf = function () {
|
||||
Stream.prototype._allocateNewRecvBuf = function () {
|
||||
var self = this;
|
||||
|
||||
var newBufferSize = 1024; // TODO make this adjustable from user API
|
||||
@ -83,7 +84,7 @@ Peer.prototype._allocateNewRecvBuf = function () {
|
||||
} else if (bytesToRead == 0) {
|
||||
// Probably getting an EOF - so let's not allocate so much.
|
||||
if (self.recvBuffer &&
|
||||
self.recvBuffer.length - self.recvBufferBytesUsed > 0) {
|
||||
self.recvBuffer.length - self.recvBuffer.used > 0) {
|
||||
return; // just recv the eof on the old buf.
|
||||
}
|
||||
newBufferSize = 128;
|
||||
@ -91,10 +92,115 @@ Peer.prototype._allocateNewRecvBuf = function () {
|
||||
}
|
||||
|
||||
self.recvBuffer = new process.Buffer(newBufferSize);
|
||||
self.recvBufferBytesUsed = 0;
|
||||
self.recvBuffer.used = 0;
|
||||
};
|
||||
|
||||
Peer.prototype.close = function () {
|
||||
Stream.prototype._allocateSendBuffer = function () {
|
||||
var b = new process.Buffer(1024);
|
||||
b.used = 0;
|
||||
b.sent = 0;
|
||||
this.sendQueue.push(b);
|
||||
return b;
|
||||
};
|
||||
|
||||
Stream.prototype.send = function (data, encoding) {
|
||||
var self = this;
|
||||
if (typeof(data) == "string") {
|
||||
var buffer;
|
||||
if (self.sendQueue.length == 0) {
|
||||
buffer = self._allocateSendBuffer();
|
||||
} else {
|
||||
// walk through the sendQueue, find the first empty buffer
|
||||
for (var i = 0; i < self.sendQueue.length; i++) {
|
||||
if (self.sendQueue[i].used == 0) {
|
||||
buffer = self.sendQueue[i];
|
||||
break;
|
||||
}
|
||||
}
|
||||
// if we didn't find one, take the last
|
||||
if (!buffer) {
|
||||
buffer = self.sendQueue[self.sendQueue.length-1];
|
||||
// if last buffer is empty
|
||||
if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
encoding = encoding || "ascii"; // default to ascii since it's faster
|
||||
|
||||
var charsWritten;
|
||||
|
||||
if (encoding.toLowerCase() == "utf8") {
|
||||
charsWritten = buffer.utf8Write(data,
|
||||
buffer.used,
|
||||
buffer.length - buffer.used);
|
||||
buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
|
||||
} else {
|
||||
// ascii
|
||||
charsWritten = buffer.asciiWrite(data,
|
||||
buffer.used,
|
||||
buffer.length - buffer.used);
|
||||
buffer.used += charsWritten;
|
||||
debug("ascii charsWritten " + charsWritten);
|
||||
debug("ascii buffer.used " + buffer.used);
|
||||
}
|
||||
|
||||
|
||||
// If we didn't finish, then recurse with the rest of the string.
|
||||
if (charsWritten < data.length) {
|
||||
debug("recursive send");
|
||||
self.send(data.slice(charsWritten), encoding);
|
||||
}
|
||||
} else {
|
||||
// data is a process.Buffer
|
||||
|
||||
// walk through the sendQueue, find the first empty buffer
|
||||
var inserted = false;
|
||||
data.sent = 0;
|
||||
data.used = data.length;
|
||||
for (var i = 0; i < self.sendQueue.length; i++) {
|
||||
if (self.sendQueue[i].used == 0) {
|
||||
// if found, insert the data there
|
||||
self.sendQueue.splice(i, 0, data);
|
||||
inserted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!inserted) self.sendQueue.push(data);
|
||||
}
|
||||
this.flush();
|
||||
};
|
||||
|
||||
// returns true if flushed without getting EAGAIN
|
||||
// false if it got EAGAIN
|
||||
Stream.prototype.flush = function () {
|
||||
var self = this;
|
||||
var bytesWritten;
|
||||
while (self.sendQueue.length > 0) {
|
||||
var b = self.sendQueue[0];
|
||||
|
||||
if (b.sent == b.used) {
|
||||
// this can be improved - save the buffer for later?
|
||||
self.sendQueue.shift()
|
||||
continue;
|
||||
}
|
||||
|
||||
bytesWritten = write(self.fd,
|
||||
b,
|
||||
b.sent,
|
||||
b.used - b.sent);
|
||||
if (bytesWritten === null) {
|
||||
this.writeWatcher.start();
|
||||
return false;
|
||||
}
|
||||
b.sent += bytesWritten;
|
||||
debug("bytes sent: " + b.sent);
|
||||
}
|
||||
this.writeWatcher.stop();
|
||||
return true;
|
||||
};
|
||||
|
||||
Stream.prototype.close = function () {
|
||||
this.readable = false;
|
||||
this.writable = false;
|
||||
|
||||
@ -113,14 +219,11 @@ var Server = function (listener) {
|
||||
}
|
||||
|
||||
self.watcher = new process.IOWatcher(function (readable, writeable) {
|
||||
debug("readable " + readable);
|
||||
debug("writable " + writeable);
|
||||
while (self.fd) {
|
||||
debug("accept from " + self.fd);
|
||||
var peerInfo = accept(self.fd);
|
||||
debug("accept: " + JSON.stringify(peerInfo));
|
||||
if (!peerInfo) return;
|
||||
var peer = new Peer(peerInfo);
|
||||
var peer = new Stream(peerInfo);
|
||||
self.emit("connection", peer);
|
||||
}
|
||||
});
|
||||
@ -132,6 +235,7 @@ Server.prototype.listen = function () {
|
||||
|
||||
if (self.fd) throw new Error("Already running");
|
||||
|
||||
var backlogIndex;
|
||||
if (typeof(arguments[0]) == "string" && arguments.length == 1) {
|
||||
// the first argument specifies a path
|
||||
self.fd = process.socket("UNIX");
|
||||
@ -141,13 +245,15 @@ Server.prototype.listen = function () {
|
||||
// unlink(SOCKFILE);
|
||||
// }
|
||||
bind(self.fd, arguments[0]);
|
||||
backlogIndex = 1;
|
||||
} else {
|
||||
// the first argument is the port, the second an IP
|
||||
self.fd = process.socket("TCP");
|
||||
// TODO dns resolution on arguments[1]
|
||||
bind(self.fd, arguments[0], arguments[1]);
|
||||
backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1;
|
||||
}
|
||||
listen(self.fd, 128); // TODO configurable backlog
|
||||
listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
|
||||
|
||||
self.watcher.set(self.fd, true, false);
|
||||
self.watcher.start();
|
||||
@ -179,6 +285,7 @@ var server = new Server(function (peer) {
|
||||
|
||||
peer.addListener("receive", function (b) {
|
||||
sys.puts("recv (" + b.length + "): " + b);
|
||||
peer.send("pong\r\n");
|
||||
});
|
||||
});
|
||||
//server.listen(8000);
|
||||
|
Loading…
x
Reference in New Issue
Block a user