From faa4d9ff5f7ff676db06240e09140f9780e2638c Mon Sep 17 00:00:00 2001 From: isaacs Date: Tue, 15 May 2012 14:19:46 -0700 Subject: [PATCH] Re-apply http fixes from v0.6 branch properly --- lib/http.js | 203 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 87 deletions(-) diff --git a/lib/http.js b/lib/http.js index ff9b3cfc039..354ca70c26a 100644 --- a/lib/http.js +++ b/lib/http.js @@ -1273,17 +1273,23 @@ function freeParser(parser, req) { } } + function socketCloseListener() { var socket = this; var parser = socket.parser; var req = socket._httpMessage; debug('HTTP socket close'); + var req = socket._httpMessage; req.emit('close'); if (req.res && req.res.readable) { - // Socket closed before we emitted "end" below. + // Socket closed before we emitted 'end' below. req.res.emit('aborted'); - req.res._emitEnd(); - req.res.emit('close'); + var res = req.res; + req.res._emitPending(function() { + res._emitEnd(); + res.emit('close'); + res = null; + }); } else if (!req.res && !req._hadError) { // This socket error fired before we started to // receive a response. The error needs to @@ -1302,12 +1308,14 @@ function socketErrorListener(err) { var parser = socket.parser; var req = socket._httpMessage; debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack); + if (req) { req.emit('error', err); // For Safety. Some additional errors might fire later on // and we need to make sure we don't double-fire the error event. req._hadError = true; } + if (parser) { parser.finish(); freeParser(parser, req); @@ -1315,76 +1323,11 @@ function socketErrorListener(err) { socket.destroy(); } - -function responseOnEnd() { - var req = this.req; - var socket = req.socket; - - if (req.shouldKeepAlive) { - debug('AGENT socket keep-alive'); - socket.removeListener('close', socketCloseListener); - socket.removeListener('error', socketErrorListener); - socket.emit('free'); - } else { - if (socket.writable) { - debug('AGENT socket.destroySoon()'); - socket.destroySoon(); - } - assert(!socket.writable); - } -} - -function parserOnIncomingClient(res, shouldKeepAlive) { - var parser = this; - var socket = this.socket; - var req = socket._httpMessage; - - debug('AGENT incoming response!'); - - if (req.res) { - // We already have a response object, this means the server - // sent a double response. - socket.destroy(); - return; - } - req.res = res; - - // Responses to HEAD requests are crazy. - // HEAD responses aren't allowed to have an entity-body - // but *can* have a content-length which actually corresponds - // to the content-length of the entity-body had the request - // been a GET. - var isHeadResponse = req.method == 'HEAD'; - debug('AGENT isHeadResponse ' + isHeadResponse); - - if (res.statusCode == 100) { - // restart the parser, as this is a continue message. - delete req.res; // Clear res so that we don't hit double-responses. - req.emit('continue'); - return true; - } - - if (req.shouldKeepAlive && !shouldKeepAlive && !req.upgraded) { - // Server MUST respond with Connection:keep-alive for us to enable it. - // If we've been upgraded (via WebSockets) we also shouldn't try to - // keep the connection open. - req.shouldKeepAlive = false; - } - - DTRACE_HTTP_CLIENT_RESPONSE(socket, req); - req.emit('response', res); - req.res = res; - res.req = req; - - res.on('end', responseOnEnd); - - return isHeadResponse; -} - function socketOnEnd() { var socket = this; var req = this._httpMessage; var parser = this.parser; + if (!req.res) { // If we don't have a response then we know that the socket // ended prematurely and we need to emit an error on the request. @@ -1409,22 +1352,31 @@ function socketOnData(d, start, end) { freeParser(parser, req); socket.destroy(ret); } else if (parser.incoming && parser.incoming.upgrade) { + // Upgrade or CONNECT var bytesParsed = ret; - socket.ondata = null; - socket.onend = null; - var res = parser.incoming; req.res = res; + socket.ondata = null; + socket.onend = null; + parser.finish(); + // This is start + byteParsed - var upgradeHead = d.slice(start + bytesParsed, end); - if (req.listeners('upgrade').length) { - // Emit 'upgrade' on the Agent. - req.upgraded = true; - req.emit('upgrade', res, socket, upgradeHead); + var bodyHead = d.slice(start + bytesParsed, end); + + var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade'; + if (req.listeners(eventName).length) { + req.upgradeOrConnect = true; + + // detach the socket socket.emit('agentRemove'); + socket.removeListener('close', socketCloseListener); + socket.removeListener('error', socketErrorListener); + + req.emit(eventName, res, socket, bodyHead); + req.emit('close'); } else { - // Got upgrade header, but have no handler. + // Got Upgrade header or CONNECT method, but have no handler. socket.destroy(); } freeParser(parser, req); @@ -1437,20 +1389,100 @@ function socketOnData(d, start, end) { } } + +function parserOnIncomingClient(res, shouldKeepAlive) { + var parser = this; + var socket = this.socket; + var req = socket._httpMessage; + + debug('AGENT incoming response!'); + + if (req.res) { + // We already have a response object, this means the server + // sent a double response. + socket.destroy(); + return; + } + req.res = res; + + // Responses to CONNECT request is handled as Upgrade. + if (req.method === 'CONNECT') { + res.upgrade = true; + return true; // skip body + } + + // Responses to HEAD requests are crazy. + // HEAD responses aren't allowed to have an entity-body + // but *can* have a content-length which actually corresponds + // to the content-length of the entity-body had the request + // been a GET. + var isHeadResponse = req.method == 'HEAD'; + debug('AGENT isHeadResponse ' + isHeadResponse); + + if (res.statusCode == 100) { + // restart the parser, as this is a continue message. + delete req.res; // Clear res so that we don't hit double-responses. + req.emit('continue'); + return true; + } + + if (req.shouldKeepAlive && !shouldKeepAlive && !req.upgradeOrConnect) { + // Server MUST respond with Connection:keep-alive for us to enable it. + // If we've been upgraded (via WebSockets) we also shouldn't try to + // keep the connection open. + req.shouldKeepAlive = false; + } + + + DTRACE_HTTP_CLIENT_RESPONSE(socket, req); + req.emit('response', res); + req.res = res; + res.req = req; + + res.on('end', responseOnEnd); + + return isHeadResponse; +} + +function responseOnEnd() { + var res = this; + var req = res.req; + var socket = req.socket; + + if (!req.shouldKeepAlive) { + if (socket.writable) { + debug('AGENT socket.destroySoon()'); + socket.destroySoon(); + } + assert(!socket.writable); + } else { + debug('AGENT socket keep-alive'); + socket.removeListener('close', socketCloseListener); + socket.removeListener('error', socketErrorListener); + socket.emit('free'); + } +} + ClientRequest.prototype.onSocket = function(socket) { var req = this; process.nextTick(function() { var parser = parsers.alloc(); - req.socket = socket; req.connection = socket; - parser.socket = socket; - socket.parser = parser; parser.reinitialize(HTTPParser.RESPONSE); + parser.socket = socket; parser.incoming = null; req.parser = parser; + parser.socket = socket; + socket.parser = parser; + parser.incoming = null; + socket._httpMessage = req; + + // Setup "drain" propogation. + httpSocketSetup(socket); + // Propagate headers limit from request object to parser if (typeof req.maxHeadersCount === 'number') { parser.maxHeaderPairs = req.maxHeadersCount << 1; @@ -1459,18 +1491,14 @@ ClientRequest.prototype.onSocket = function(socket) { parser.maxHeaderPairs = 2000; } - socket._httpMessage = req; - - // Setup "drain" propogation. - httpSocketSetup(socket); + socket.on('error', socketErrorListener); socket.ondata = socketOnData; socket.onend = socketOnEnd; - socket.on('error', socketErrorListener); socket.on('close', socketCloseListener); parser.onIncoming = parserOnIncomingClient; - req.emit('socket', socket); }); + }; ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) { @@ -1624,6 +1652,7 @@ function connectionListener(socket) { var parser = parsers.alloc(); parser.reinitialize(HTTPParser.REQUEST); parser.socket = socket; + socket.parser = parser; parser.incoming = null; // Propagate headers limit from server instance to parser