diff --git a/lib/http2.js b/lib/http2.js index 846a4c31f20..cdbbbf69750 100644 --- a/lib/http2.js +++ b/lib/http2.js @@ -270,78 +270,6 @@ ClientRequest.prototype.finish = function (responseListener) { }; -function createIncomingMessageStream (socket, cb) { - var incoming, field, value; - - socket._parser.onMessageBegin = function () { - incoming = new IncomingMessage(socket); - field = null; - value = null; - }; - - // Only servers will get URL events. - socket._parser.onURL = function (b, start, len) { - var slice = b.asciiSlice(start, start+len); - if (incoming.url) { - incoming.url += slice; - } else { - // Almost always will branch here. - incoming.url = slice; - } - }; - - socket._parser.onHeaderField = function (b, start, len) { - var slice = b.asciiSlice(start, start+len).toLowerCase(); - if (value) { - incoming._addHeaderLine(field, value); - field = null; - value = null; - } - if (field) { - field += slice; - } else { - field = slice; - } - }; - - socket._parser.onHeaderValue = function (b, start, len) { - var slice = b.asciiSlice(start, start+len); - if (value) { - value += slice; - } else { - value = slice; - } - }; - - socket._parser.onHeadersComplete = function (info) { - if (field && value) { - incoming._addHeaderLine(field, value); - } - - incoming.httpVersionMajor = info.versionMajor; - incoming.httpVersionMinor = info.versionMinor; - - if (info.method) { - // server only - incoming.method = info.method; - } else { - // client only - incoming.statusCode = info.statusCode; - } - - cb(incoming, info.shouldKeepAlive); - }; - - socket._parser.onBody = function (b, start, len) { - incoming.emit("data", b.slice(start, start+len)); - }; - - socket._parser.onMessageComplete = function () { - incoming.emit("eof"); - }; -} - - /* Returns true if the message queue is finished and the socket * should be closed. */ function flushMessageQueue (socket, queue) { @@ -368,24 +296,106 @@ function flushMessageQueue (socket, queue) { } +var parserFreeList = []; + +function newParser (type) { + var parser; + if (parserFreeList.length) { + parser = parserFreeList.shift(); + parser.reinitialize(type); + } else { + parser = new process.HTTPParser(type); + + parser.onMessageBegin = function () { + parser.incoming = new IncomingMessage(parser.socket); + parser.field = null; + parser.value = null; + }; + + // Only servers will get URL events. + parser.onURL = function (b, start, len) { + var slice = b.asciiSlice(start, start+len); + if (parser.incoming.url) { + parser.incoming.url += slice; + } else { + // Almost always will branch here. + parser.incoming.url = slice; + } + }; + + parser.onHeaderField = function (b, start, len) { + var slice = b.asciiSlice(start, start+len).toLowerCase(); + if (parser.value) { + parser.incoming._addHeaderLine(parser.field, parser.value); + parser.field = null; + parser.value = null; + } + if (parser.field) { + parser.field += slice; + } else { + parser.field = slice; + } + }; + + parser.onHeaderValue = function (b, start, len) { + var slice = b.asciiSlice(start, start+len); + if (parser.value) { + parser.value += slice; + } else { + parser.value = slice; + } + }; + + parser.onHeadersComplete = function (info) { + if (parser.field && parser.value) { + parser.incoming._addHeaderLine(parser.field, parser.value); + } + + parser.incoming.httpVersionMajor = info.versionMajor; + parser.incoming.httpVersionMinor = info.versionMinor; + + if (info.method) { + // server only + parser.incoming.method = info.method; + } else { + // client only + parser.incoming.statusCode = info.statusCode; + } + + parser.onIncoming(parser.incoming, info.shouldKeepAlive); + }; + + parser.onBody = function (b, start, len) { + parser.incoming.emit("data", b.slice(start, start+len)); + }; + + parser.onMessageComplete = function () { + parser.incoming.emit("eof"); + }; + } + return parser; +} + +function freeParser (parser) { + if (parserFreeList.length < 1000) parserFreeList.push(parser); +} + function connectionListener (socket) { var self = this; - if (socket._parser) throw new Error("socket already has a parser?"); - socket._parser = new process.HTTPParser('request'); + var parser = newParser('request'); // An array of responses for each socket. In pipelined connections // we need to keep track of the order they were sent. var responses = []; - socket.addListener('data', function (d) { - socket._parser.execute(d, 0, d.length); + socket.addListener('dataLite', function (d, start, end) { + parser.execute(d, start, end - start); }); // is this really needed? socket.addListener('eof', function () { - socket._parser.finish(); + parser.finish(); // unref the parser for easy gc - socket._parser.host = null; - socket._parser = null; + freeParser(parser); if (responses.length == 0) { socket.close(); @@ -394,10 +404,14 @@ function connectionListener (socket) { } }); - createIncomingMessageStream(socket, function (incoming, shouldKeepAlive) { + parser.socket = socket; + // The following callback is issued after the headers have been read on a + // new message. In this callback we setup the response object and pass it + // to the user. + parser.onIncoming = function (incoming, shouldKeepAlive) { var req = incoming; - var res = new ServerResponse(req); + res.shouldKeepAlive = shouldKeepAlive; res.addListener('flush', function () { if (flushMessageQueue(socket, responses)) { @@ -407,7 +421,7 @@ function connectionListener (socket) { responses.push(res); self.emit('request', req, res); - }); + }; } diff --git a/lib/net.js b/lib/net.js index 955c482dc0d..ab79493bc2a 100644 --- a/lib/net.js +++ b/lib/net.js @@ -28,57 +28,105 @@ var getaddrinfo = process.getaddrinfo; var needsLookup = process.needsLookup; var EINPROGRESS = process.EINPROGRESS; var ENOENT = process.ENOENT; -var END_OF_FILE = 0; +var END_OF_FILE = 0; + +// This is a free list to avoid creating so many of the same object. + +function FreeList (name, max, constructor) { + this.name = name; + this.constructor = constructor; + this.max = max; + this.list = []; +} + +FreeList.prototype.alloc = function () { + debug("alloc " + this.name + " " + this.list.length); + return this.list.length ? this.list.shift() + : this.constructor.apply(this, arguments); +} + +FreeList.prototype.free = function (obj) { + debug("free " + this.name + " " + this.list.length); + if (this.list.length < this.max) { + this.list.push(obj); + } +} + + +var ioWatchers = new FreeList("iowatcher", 100, function () { + return new IOWatcher(); +}); + +var buffers = new FreeList("buffer", 100, function (l) { + return new process.Buffer(1000); +}); + +// Allocated on demand. +var recvBuffer = null; + +function allocRecvBuffer () { + recvBuffer = new process.Buffer(8*1024); + recvBuffer.used = 0; +} function Socket (peerInfo) { process.EventEmitter.call(this); var self = this; - // Allocated on demand. - self.recvBuffer = null; - - self.readWatcher = new IOWatcher(); - self.readWatcher.host = this; - self.readWatcher.callback = function () { + self._readWatcher = ioWatchers.alloc(); + self._readWatcher.callback = function () { // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. - if (!self.recvBuffer || - self.recvBuffer.length - self.recvBuffer.used < 128) { - self._allocateNewRecvBuf(); + if (recvBuffer) { + if (recvBuffer.length - recvBuffer.used < 128) { + // discard the old recvBuffer. Can't add to the free list because + // users might have refernces to slices on it. + recvBuffer = null; + allocRecvBuffer(); + } + } else { + allocRecvBuffer(); } - debug('recvBuffer.used ' + self.recvBuffer.used); + debug('recvBuffer.used ' + recvBuffer.used); var bytesRead; if (self.type == "unix") { bytesRead = recvMsg(self.fd, - self.recvBuffer, - self.recvBuffer.used, - self.recvBuffer.length - self.recvBuffer.used); + recvBuffer, + recvBuffer.used, + recvBuffer.length - recvBuffer.used); debug('recvMsg.fd ' + recvMsg.fd); if (recvMsg.fd) { self.emit('fd', recvMsg.fd); } } else { bytesRead = read(self.fd, - self.recvBuffer, - self.recvBuffer.used, - self.recvBuffer.length - self.recvBuffer.used); + recvBuffer, + recvBuffer.used, + recvBuffer.length - recvBuffer.used); } debug('bytesRead ' + bytesRead + '\n'); if (!recvMsg.fd && bytesRead == 0) { self.readable = false; - self.readWatcher.stop(); + self._readWatcher.stop(); self.emit('eof'); if (!self.writable) self.forceClose(); } else if (bytesRead > 0) { - var slice = self.recvBuffer.slice(self.recvBuffer.used, - self.recvBuffer.used + bytesRead); - self.recvBuffer.used += bytesRead; - self.emit('data', slice); + var start = recvBuffer.used; + var end = recvBuffer.used + bytesRead; + if (self.listeners('data').length) { + // emit a slice + self.emit('data', recvBuffer.slice(start, end)); + } + if (self.listeners('dataLite').length) { + // emit the original buffer with end points. + self.emit('dataLite', recvBuffer, start, end); + } + recvBuffer.used += bytesRead; } }; self.readable = false; @@ -99,9 +147,8 @@ function Socket (peerInfo) { self.emit("drain"); } }; - self.writeWatcher = new IOWatcher(); - self.writeWatcher.host = this; - self.writeWatcher.callback = self._doFlush; + self._writeWatcher = ioWatchers.alloc(); + self._writeWatcher.callback = self._doFlush; self.writable = false; if (peerInfo) { @@ -109,11 +156,11 @@ function Socket (peerInfo) { self.remoteAddress = peerInfo.remoteAddress; self.remotePort = peerInfo.remotePort; - self.readWatcher.set(self.fd, true, false); - self.readWatcher.start(); + self._readWatcher.set(self.fd, true, false); + self._readWatcher.start(); self.readable = true; - self.writeWatcher.set(self.fd, false, true); + self._writeWatcher.set(self.fd, false, true); self.writable = true; } }; @@ -128,33 +175,8 @@ exports.createConnection = function (port, host) { }; -Socket.prototype._allocateNewRecvBuf = function () { - var self = this; - - var newBufferSize = 128; // TODO make this adjustable from user API - - if (toRead) { - // Is the extra system call even worth it? - var bytesToRead = toRead(self.fd); - if (bytesToRead > 1024) { - newBufferSize = 4*1024; - } else if (bytesToRead == 0) { - // Probably getting an EOF - so let's not allocate so much. - if (self.recvBuffer && - self.recvBuffer.length - self.recvBuffer.used > 0) { - return; // just recv the eof on the old buf. - } - newBufferSize = 128; - } - } - - self.recvBuffer = new process.Buffer(newBufferSize); - self.recvBuffer.used = 0; -}; - - Socket.prototype._allocateSendBuffer = function () { - var b = new process.Buffer(1024); + var b = buffers.alloc(1024); b.used = 0; b.sent = 0; b.isMsg = false; @@ -313,6 +335,7 @@ Socket.prototype.flush = function () { if (b.sent == b.used) { // this can be improved - save the buffer for later? self.sendQueue.shift(); + buffers.free(b); continue; } @@ -328,7 +351,7 @@ Socket.prototype.flush = function () { } if (bytesWritten === null) { // could not flush everything - self.writeWatcher.start(); + self._writeWatcher.start(); assert(self.sendQueueSize > 0); return false; } @@ -342,7 +365,7 @@ Socket.prototype.flush = function () { debug('bytes sent: ' + b.sent); } } - self.writeWatcher.stop(); + if (self._writeWatcher) self._writeWatcher.stop(); return true; }; @@ -364,23 +387,23 @@ Socket.prototype.connect = function () { } // Don't start the read watcher until connection is established - self.readWatcher.set(self.fd, true, false); + self._readWatcher.set(self.fd, true, false); // How to connect on POSIX: Wait for fd to become writable, then call // socketError() if there isn't an error, we're connected. AFAIK this a // platform independent way determining when a non-blocking connection // is established, but I have only seen it documented in the Linux // Manual Page connect(2) under the error code EINPROGRESS. - self.writeWatcher.set(self.fd, false, true); - self.writeWatcher.start(); - self.writeWatcher.callback = function () { + self._writeWatcher.set(self.fd, false, true); + self._writeWatcher.start(); + self._writeWatcher.callback = function () { var errno = socketError(self.fd); if (errno == 0) { // connection established - self.readWatcher.start(); + self._readWatcher.start(); self.readable = true; self.writable = true; - self.writeWatcher.callback = self._doFlush; + self._writeWatcher.callback = self._doFlush; self.emit('connect'); } else if (errno != EINPROGRESS) { var e = new Error('connection error'); @@ -417,15 +440,28 @@ Socket.prototype.setNoDelay = function (v) { Socket.prototype.forceClose = function (exception) { + // recvBuffer is shared between sockets, so don't need to free it here. + + var b; + while (this.sendQueue.length) { + b = this.sendQueue.shift(); + if (b instanceof process.Buffer) buffers.free(b); + } + + if (this._writeWatcher) { + this._writeWatcher.stop(); + ioWatchers.free(this._writeWatcher); + this._writeWatcher = null; + } + + if (this._readWatcher) { + this._readWatcher.stop(); + ioWatchers.free(this._readWatcher); + this._readWatcher = null; + } + if (this.fd) { - this.readable = false; - this.writable = false; - - this.writeWatcher.stop(); - this.readWatcher.stop(); - close(this.fd); - debug('close socket ' + this.fd); this.fd = null; this.emit('close', exception); } @@ -436,6 +472,7 @@ Socket.prototype._shutdown = function () { if (this.writable) { this.writable = false; shutdown(this.fd, 'write'); + if (!this.readable) this.forceClose(); } };