Optimize, clean up net2 net.js and http2.js

This commit is contained in:
Ryan Dahl 2010-01-28 14:30:31 -08:00
parent c328f3e6c6
commit aadce8e1a9
2 changed files with 202 additions and 151 deletions

View File

@ -270,78 +270,6 @@ ClientRequest.prototype.finish = function (responseListener) {
}; };
function createIncomingMessageStream (socket, cb) {
var incoming, field, value;
socket._parser.onMessageBegin = function () {
incoming = new IncomingMessage(socket);
field = null;
value = null;
};
// Only servers will get URL events.
socket._parser.onURL = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (incoming.url) {
incoming.url += slice;
} else {
// Almost always will branch here.
incoming.url = slice;
}
};
socket._parser.onHeaderField = function (b, start, len) {
var slice = b.asciiSlice(start, start+len).toLowerCase();
if (value) {
incoming._addHeaderLine(field, value);
field = null;
value = null;
}
if (field) {
field += slice;
} else {
field = slice;
}
};
socket._parser.onHeaderValue = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (value) {
value += slice;
} else {
value = slice;
}
};
socket._parser.onHeadersComplete = function (info) {
if (field && value) {
incoming._addHeaderLine(field, value);
}
incoming.httpVersionMajor = info.versionMajor;
incoming.httpVersionMinor = info.versionMinor;
if (info.method) {
// server only
incoming.method = info.method;
} else {
// client only
incoming.statusCode = info.statusCode;
}
cb(incoming, info.shouldKeepAlive);
};
socket._parser.onBody = function (b, start, len) {
incoming.emit("data", b.slice(start, start+len));
};
socket._parser.onMessageComplete = function () {
incoming.emit("eof");
};
}
/* Returns true if the message queue is finished and the socket /* Returns true if the message queue is finished and the socket
* should be closed. */ * should be closed. */
function flushMessageQueue (socket, queue) { function flushMessageQueue (socket, queue) {
@ -368,24 +296,106 @@ function flushMessageQueue (socket, queue) {
} }
var parserFreeList = [];
function newParser (type) {
var parser;
if (parserFreeList.length) {
parser = parserFreeList.shift();
parser.reinitialize(type);
} else {
parser = new process.HTTPParser(type);
parser.onMessageBegin = function () {
parser.incoming = new IncomingMessage(parser.socket);
parser.field = null;
parser.value = null;
};
// Only servers will get URL events.
parser.onURL = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.incoming.url) {
parser.incoming.url += slice;
} else {
// Almost always will branch here.
parser.incoming.url = slice;
}
};
parser.onHeaderField = function (b, start, len) {
var slice = b.asciiSlice(start, start+len).toLowerCase();
if (parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
parser.field = null;
parser.value = null;
}
if (parser.field) {
parser.field += slice;
} else {
parser.field = slice;
}
};
parser.onHeaderValue = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.value) {
parser.value += slice;
} else {
parser.value = slice;
}
};
parser.onHeadersComplete = function (info) {
if (parser.field && parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
}
parser.incoming.httpVersionMajor = info.versionMajor;
parser.incoming.httpVersionMinor = info.versionMinor;
if (info.method) {
// server only
parser.incoming.method = info.method;
} else {
// client only
parser.incoming.statusCode = info.statusCode;
}
parser.onIncoming(parser.incoming, info.shouldKeepAlive);
};
parser.onBody = function (b, start, len) {
parser.incoming.emit("data", b.slice(start, start+len));
};
parser.onMessageComplete = function () {
parser.incoming.emit("eof");
};
}
return parser;
}
function freeParser (parser) {
if (parserFreeList.length < 1000) parserFreeList.push(parser);
}
function connectionListener (socket) { function connectionListener (socket) {
var self = this; var self = this;
if (socket._parser) throw new Error("socket already has a parser?"); var parser = newParser('request');
socket._parser = new process.HTTPParser('request');
// An array of responses for each socket. In pipelined connections // An array of responses for each socket. In pipelined connections
// we need to keep track of the order they were sent. // we need to keep track of the order they were sent.
var responses = []; var responses = [];
socket.addListener('data', function (d) { socket.addListener('dataLite', function (d, start, end) {
socket._parser.execute(d, 0, d.length); parser.execute(d, start, end - start);
}); });
// is this really needed? // is this really needed?
socket.addListener('eof', function () { socket.addListener('eof', function () {
socket._parser.finish(); parser.finish();
// unref the parser for easy gc // unref the parser for easy gc
socket._parser.host = null; freeParser(parser);
socket._parser = null;
if (responses.length == 0) { if (responses.length == 0) {
socket.close(); socket.close();
@ -394,10 +404,14 @@ function connectionListener (socket) {
} }
}); });
createIncomingMessageStream(socket, function (incoming, shouldKeepAlive) { parser.socket = socket;
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
parser.onIncoming = function (incoming, shouldKeepAlive) {
var req = incoming; var req = incoming;
var res = new ServerResponse(req); var res = new ServerResponse(req);
res.shouldKeepAlive = shouldKeepAlive; res.shouldKeepAlive = shouldKeepAlive;
res.addListener('flush', function () { res.addListener('flush', function () {
if (flushMessageQueue(socket, responses)) { if (flushMessageQueue(socket, responses)) {
@ -407,7 +421,7 @@ function connectionListener (socket) {
responses.push(res); responses.push(res);
self.emit('request', req, res); self.emit('request', req, res);
}); };
} }

View File

@ -30,55 +30,103 @@ var EINPROGRESS = process.EINPROGRESS;
var ENOENT = process.ENOENT; var ENOENT = process.ENOENT;
var END_OF_FILE = 0; var END_OF_FILE = 0;
// This is a free list to avoid creating so many of the same object.
function FreeList (name, max, constructor) {
this.name = name;
this.constructor = constructor;
this.max = max;
this.list = [];
}
FreeList.prototype.alloc = function () {
debug("alloc " + this.name + " " + this.list.length);
return this.list.length ? this.list.shift()
: this.constructor.apply(this, arguments);
}
FreeList.prototype.free = function (obj) {
debug("free " + this.name + " " + this.list.length);
if (this.list.length < this.max) {
this.list.push(obj);
}
}
var ioWatchers = new FreeList("iowatcher", 100, function () {
return new IOWatcher();
});
var buffers = new FreeList("buffer", 100, function (l) {
return new process.Buffer(1000);
});
// Allocated on demand.
var recvBuffer = null;
function allocRecvBuffer () {
recvBuffer = new process.Buffer(8*1024);
recvBuffer.used = 0;
}
function Socket (peerInfo) { function Socket (peerInfo) {
process.EventEmitter.call(this); process.EventEmitter.call(this);
var self = this; var self = this;
// Allocated on demand. self._readWatcher = ioWatchers.alloc();
self.recvBuffer = null; self._readWatcher.callback = function () {
self.readWatcher = new IOWatcher();
self.readWatcher.host = this;
self.readWatcher.callback = function () {
// If this is the first recv (recvBuffer doesn't exist) or we've used up // If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one. // most of the recvBuffer, allocate a new one.
if (!self.recvBuffer || if (recvBuffer) {
self.recvBuffer.length - self.recvBuffer.used < 128) { if (recvBuffer.length - recvBuffer.used < 128) {
self._allocateNewRecvBuf(); // discard the old recvBuffer. Can't add to the free list because
// users might have refernces to slices on it.
recvBuffer = null;
allocRecvBuffer();
}
} else {
allocRecvBuffer();
} }
debug('recvBuffer.used ' + self.recvBuffer.used); debug('recvBuffer.used ' + recvBuffer.used);
var bytesRead; var bytesRead;
if (self.type == "unix") { if (self.type == "unix") {
bytesRead = recvMsg(self.fd, bytesRead = recvMsg(self.fd,
self.recvBuffer, recvBuffer,
self.recvBuffer.used, recvBuffer.used,
self.recvBuffer.length - self.recvBuffer.used); recvBuffer.length - recvBuffer.used);
debug('recvMsg.fd ' + recvMsg.fd); debug('recvMsg.fd ' + recvMsg.fd);
if (recvMsg.fd) { if (recvMsg.fd) {
self.emit('fd', recvMsg.fd); self.emit('fd', recvMsg.fd);
} }
} else { } else {
bytesRead = read(self.fd, bytesRead = read(self.fd,
self.recvBuffer, recvBuffer,
self.recvBuffer.used, recvBuffer.used,
self.recvBuffer.length - self.recvBuffer.used); recvBuffer.length - recvBuffer.used);
} }
debug('bytesRead ' + bytesRead + '\n'); debug('bytesRead ' + bytesRead + '\n');
if (!recvMsg.fd && bytesRead == 0) { if (!recvMsg.fd && bytesRead == 0) {
self.readable = false; self.readable = false;
self.readWatcher.stop(); self._readWatcher.stop();
self.emit('eof'); self.emit('eof');
if (!self.writable) self.forceClose(); if (!self.writable) self.forceClose();
} else if (bytesRead > 0) { } else if (bytesRead > 0) {
var slice = self.recvBuffer.slice(self.recvBuffer.used, var start = recvBuffer.used;
self.recvBuffer.used + bytesRead); var end = recvBuffer.used + bytesRead;
self.recvBuffer.used += bytesRead; if (self.listeners('data').length) {
self.emit('data', slice); // emit a slice
self.emit('data', recvBuffer.slice(start, end));
}
if (self.listeners('dataLite').length) {
// emit the original buffer with end points.
self.emit('dataLite', recvBuffer, start, end);
}
recvBuffer.used += bytesRead;
} }
}; };
self.readable = false; self.readable = false;
@ -99,9 +147,8 @@ function Socket (peerInfo) {
self.emit("drain"); self.emit("drain");
} }
}; };
self.writeWatcher = new IOWatcher(); self._writeWatcher = ioWatchers.alloc();
self.writeWatcher.host = this; self._writeWatcher.callback = self._doFlush;
self.writeWatcher.callback = self._doFlush;
self.writable = false; self.writable = false;
if (peerInfo) { if (peerInfo) {
@ -109,11 +156,11 @@ function Socket (peerInfo) {
self.remoteAddress = peerInfo.remoteAddress; self.remoteAddress = peerInfo.remoteAddress;
self.remotePort = peerInfo.remotePort; self.remotePort = peerInfo.remotePort;
self.readWatcher.set(self.fd, true, false); self._readWatcher.set(self.fd, true, false);
self.readWatcher.start(); self._readWatcher.start();
self.readable = true; self.readable = true;
self.writeWatcher.set(self.fd, false, true); self._writeWatcher.set(self.fd, false, true);
self.writable = true; self.writable = true;
} }
}; };
@ -128,33 +175,8 @@ exports.createConnection = function (port, host) {
}; };
Socket.prototype._allocateNewRecvBuf = function () {
var self = this;
var newBufferSize = 128; // TODO make this adjustable from user API
if (toRead) {
// Is the extra system call even worth it?
var bytesToRead = toRead(self.fd);
if (bytesToRead > 1024) {
newBufferSize = 4*1024;
} else if (bytesToRead == 0) {
// Probably getting an EOF - so let's not allocate so much.
if (self.recvBuffer &&
self.recvBuffer.length - self.recvBuffer.used > 0) {
return; // just recv the eof on the old buf.
}
newBufferSize = 128;
}
}
self.recvBuffer = new process.Buffer(newBufferSize);
self.recvBuffer.used = 0;
};
Socket.prototype._allocateSendBuffer = function () { Socket.prototype._allocateSendBuffer = function () {
var b = new process.Buffer(1024); var b = buffers.alloc(1024);
b.used = 0; b.used = 0;
b.sent = 0; b.sent = 0;
b.isMsg = false; b.isMsg = false;
@ -313,6 +335,7 @@ Socket.prototype.flush = function () {
if (b.sent == b.used) { if (b.sent == b.used) {
// this can be improved - save the buffer for later? // this can be improved - save the buffer for later?
self.sendQueue.shift(); self.sendQueue.shift();
buffers.free(b);
continue; continue;
} }
@ -328,7 +351,7 @@ Socket.prototype.flush = function () {
} }
if (bytesWritten === null) { if (bytesWritten === null) {
// could not flush everything // could not flush everything
self.writeWatcher.start(); self._writeWatcher.start();
assert(self.sendQueueSize > 0); assert(self.sendQueueSize > 0);
return false; return false;
} }
@ -342,7 +365,7 @@ Socket.prototype.flush = function () {
debug('bytes sent: ' + b.sent); debug('bytes sent: ' + b.sent);
} }
} }
self.writeWatcher.stop(); if (self._writeWatcher) self._writeWatcher.stop();
return true; return true;
}; };
@ -364,23 +387,23 @@ Socket.prototype.connect = function () {
} }
// Don't start the read watcher until connection is established // Don't start the read watcher until connection is established
self.readWatcher.set(self.fd, true, false); self._readWatcher.set(self.fd, true, false);
// How to connect on POSIX: Wait for fd to become writable, then call // How to connect on POSIX: Wait for fd to become writable, then call
// socketError() if there isn't an error, we're connected. AFAIK this a // socketError() if there isn't an error, we're connected. AFAIK this a
// platform independent way determining when a non-blocking connection // platform independent way determining when a non-blocking connection
// is established, but I have only seen it documented in the Linux // is established, but I have only seen it documented in the Linux
// Manual Page connect(2) under the error code EINPROGRESS. // Manual Page connect(2) under the error code EINPROGRESS.
self.writeWatcher.set(self.fd, false, true); self._writeWatcher.set(self.fd, false, true);
self.writeWatcher.start(); self._writeWatcher.start();
self.writeWatcher.callback = function () { self._writeWatcher.callback = function () {
var errno = socketError(self.fd); var errno = socketError(self.fd);
if (errno == 0) { if (errno == 0) {
// connection established // connection established
self.readWatcher.start(); self._readWatcher.start();
self.readable = true; self.readable = true;
self.writable = true; self.writable = true;
self.writeWatcher.callback = self._doFlush; self._writeWatcher.callback = self._doFlush;
self.emit('connect'); self.emit('connect');
} else if (errno != EINPROGRESS) { } else if (errno != EINPROGRESS) {
var e = new Error('connection error'); var e = new Error('connection error');
@ -417,15 +440,28 @@ Socket.prototype.setNoDelay = function (v) {
Socket.prototype.forceClose = function (exception) { Socket.prototype.forceClose = function (exception) {
// recvBuffer is shared between sockets, so don't need to free it here.
var b;
while (this.sendQueue.length) {
b = this.sendQueue.shift();
if (b instanceof process.Buffer) buffers.free(b);
}
if (this._writeWatcher) {
this._writeWatcher.stop();
ioWatchers.free(this._writeWatcher);
this._writeWatcher = null;
}
if (this._readWatcher) {
this._readWatcher.stop();
ioWatchers.free(this._readWatcher);
this._readWatcher = null;
}
if (this.fd) { if (this.fd) {
this.readable = false;
this.writable = false;
this.writeWatcher.stop();
this.readWatcher.stop();
close(this.fd); close(this.fd);
debug('close socket ' + this.fd);
this.fd = null; this.fd = null;
this.emit('close', exception); this.emit('close', exception);
} }
@ -436,6 +472,7 @@ Socket.prototype._shutdown = function () {
if (this.writable) { if (this.writable) {
this.writable = false; this.writable = false;
shutdown(this.fd, 'write'); shutdown(this.fd, 'write');
if (!this.readable) this.forceClose();
} }
}; };