Break up huge function in ClientRequest.onSocket

Conflicts:

	lib/http.js
This commit is contained in:
isaacs 2012-05-10 17:55:28 -07:00
parent 07be9fc3a6
commit a98e845516

View File

@ -1261,6 +1261,7 @@ function freeParser(parser, req) {
if (parser.socket) { if (parser.socket) {
parser.socket.onend = null; parser.socket.onend = null;
parser.socket.ondata = null; parser.socket.ondata = null;
parser.socket.parser = null;
} }
parser.socket = null; parser.socket = null;
parser.incoming = null; parser.incoming = null;
@ -1272,31 +1273,34 @@ function freeParser(parser, req) {
} }
} }
function socketCloseListener() {
ClientRequest.prototype.onSocket = function(socket) { var socket = this;
var req = this; var parser = socket.parser;
process.nextTick(function() { var req = socket._httpMessage;
var parser = parsers.alloc(); debug('HTTP socket close');
req.socket = socket; req.emit('close');
req.connection = socket; if (req.res && req.res.readable) {
parser.reinitialize(HTTPParser.RESPONSE); // Socket closed before we emitted "end" below.
parser.socket = socket; req.res.emit('aborted');
parser.incoming = null; req.res.emit('end');
req.parser = parser; req.res.emit('close');
} else if (!req.res && !req._hadError) {
// Propagate headers limit from request object to parser // This socket error fired before we started to
if (typeof req.maxHeadersCount === 'number') { // receive a response. The error needs to
parser.maxHeaderPairs = req.maxHeadersCount << 1; // fire on the request.
} else { req.emit('error', createHangUpError());
// Set default value because parser may be reused from FreeList
parser.maxHeaderPairs = 2000;
} }
socket._httpMessage = req; if (parser) {
// Setup 'drain' propogation. parser.finish();
httpSocketSetup(socket); freeParser(parser, req);
}
}
function errorListener(err) { function socketErrorListener(err) {
var socket = this;
var parser = socket.parser;
var req = socket._httpMessage;
debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack); debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack);
if (req) { if (req) {
req.emit('error', err); req.emit('error', err);
@ -1311,108 +1315,29 @@ ClientRequest.prototype.onSocket = function(socket) {
socket.destroy(); socket.destroy();
} }
socket.on('error', errorListener); function responseOnEnd() {
var req = this.req;
var socket = req.socket;
var timeoutListener = function() { if (req.shouldKeepAlive) {
req.emit('timeout'); debug('AGENT socket keep-alive');
if (parser) { socket.removeListener('close', socketCloseListener);
parser.finish(); socket.removeListener('error', socketErrorListener);
freeParser(); socket.emit('free');
}
socket.destroy();
}
socket.on('timeout', timeoutListener);
socket.ondata = function(d, start, end) {
var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) {
debug('parse error');
freeParser(parser, req);
socket.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade or CONNECT
var bytesParsed = ret;
var res = parser.incoming;
req.res = res;
socket.ondata = null;
socket.onend = null;
parser.finish();
// This is start + byteParsed
var bodyHead = d.slice(start + bytesParsed, end);
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (req.listeners(eventName).length) {
req.upgradeOrConnect = true;
// detach the socket
socket.emit('agentRemove');
socket.removeListener('close', closeListener);
socket.removeListener('error', errorListener);
socket.removeListener('timeout', timeoutListener);
req.emit(eventName, res, socket, bodyHead);
req.emit('close');
} else { } else {
// Got Upgrade header or CONNECT method, but have no handler. if (socket.writable) {
socket.destroy(); debug('AGENT socket.destroySoon()');
socket.destroySoon();
}
assert(!socket.writable);
} }
freeParser(parser, req);
} else if (parser.incoming && parser.incoming.complete &&
// When the status code is 100 (Continue), the server will
// send a final response after this client sends a request
// body. So, we must not free the parser.
parser.incoming.statusCode !== 100) {
freeParser(parser, req);
} }
};
socket.onend = function() { function parserOnIncomingClient(res, shouldKeepAlive) {
if (!req.res) { var parser = this;
// If we don't have a response then we know that the socket var socket = this.socket;
// ended prematurely and we need to emit an error on the request.
req.emit('error', createHangUpError());
req._hadError = true;
}
if (parser) {
parser.finish();
freeParser(parser, req);
}
socket.destroy();
};
var closeListener = function() {
debug('HTTP socket close');
var req = socket._httpMessage; var req = socket._httpMessage;
req.emit('close');
if (req.res && req.res.readable) {
// Socket closed before we emitted 'end' below.
req.res.emit('aborted');
var res = req.res;
req.res._emitPending(function() {
res._emitEnd();
res.emit('close');
res = null;
});
} else if (!req.res && !req._hadError) {
// This socket error fired before we started to
// receive a response. The error needs to
// fire on the request.
req.emit('error', createHangUpError());
}
// Nothing more to be done with this req, since the socket
// is closed, and we've emitted the appropriate abort/end/close/error
// events. Disavow all knowledge, and break the references to
// the variables trapped by closures and on the socket object.
req = null;
socket._httpMessage = null;
}
socket.on('close', closeListener);
parser.onIncoming = function(res, shouldKeepAlive) {
debug('AGENT incoming response!'); debug('AGENT incoming response!');
if (req.res) { if (req.res) {
@ -1423,12 +1348,6 @@ ClientRequest.prototype.onSocket = function(socket) {
} }
req.res = res; req.res = res;
// Responses to CONNECT request is handled as Upgrade.
if (req.method === 'CONNECT') {
res.upgrade = true;
return true; // skip body
}
// Responses to HEAD requests are crazy. // Responses to HEAD requests are crazy.
// HEAD responses aren't allowed to have an entity-body // HEAD responses aren't allowed to have an entity-body
// but *can* have a content-length which actually corresponds // but *can* have a content-length which actually corresponds
@ -1444,42 +1363,115 @@ ClientRequest.prototype.onSocket = function(socket) {
return true; return true;
} }
if (req.shouldKeepAlive && !shouldKeepAlive && !req.upgradeOrConnect) { if (req.shouldKeepAlive && !shouldKeepAlive && !req.upgraded) {
// Server MUST respond with Connection:keep-alive for us to enable it. // Server MUST respond with Connection:keep-alive for us to enable it.
// If we've been upgraded (via WebSockets) we also shouldn't try to // If we've been upgraded (via WebSockets) we also shouldn't try to
// keep the connection open. // keep the connection open.
req.shouldKeepAlive = false; req.shouldKeepAlive = false;
} }
res.addListener('end', function() {
if (!req.shouldKeepAlive) {
if (socket.writable) {
debug('AGENT socket.destroySoon()');
socket.destroySoon();
}
assert(!socket.writable);
} else {
debug('AGENT socket keep-alive');
}
});
DTRACE_HTTP_CLIENT_RESPONSE(socket, req); DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.emit('response', res); req.emit('response', res);
req.res = res;
res.req = req;
res.on('end', function() { res.on('end', responseOnEnd);
if (req.shouldKeepAlive) {
socket.removeListener('close', closeListener);
socket.removeListener('error', errorListener);
socket.removeListener('timeout', timeoutListener);
socket.emit('free');
}
});
return isHeadResponse; return isHeadResponse;
}; }
function socketOnEnd() {
var socket = this;
var req = this._httpMessage;
var parser = this.parser;
if (!req.res) {
// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
req.emit('error', createHangUpError());
req._hadError = true;
}
if (parser) {
parser.finish();
freeParser(parser, req);
}
socket.destroy();
}
function socketOnData(d, start, end) {
var socket = this;
var req = this._httpMessage;
var parser = this.parser;
var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) {
debug('parse error');
freeParser(parser, req);
socket.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
var bytesParsed = ret;
socket.ondata = null;
socket.onend = null;
var res = parser.incoming;
req.res = res;
// This is start + byteParsed
var upgradeHead = d.slice(start + bytesParsed, end);
if (req.listeners('upgrade').length) {
// Emit 'upgrade' on the Agent.
req.upgraded = true;
req.emit('upgrade', res, socket, upgradeHead);
socket.emit('agentRemove');
} else {
// Got upgrade header, but have no handler.
socket.destroy();
}
freeParser(parser, req);
} else if (parser.incoming && parser.incoming.complete &&
// When the status code is 100 (Continue), the server will
// send a final response after this client sends a request
// body. So, we must not free the parser.
parser.incoming.statusCode !== 100) {
freeParser(parser, req);
}
}
ClientRequest.prototype.onSocket = function(socket) {
var req = this;
process.nextTick(function() {
var parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.socket = socket;
socket.parser = parser;
parser.reinitialize(HTTPParser.RESPONSE);
parser.incoming = null;
req.parser = parser;
// Propagate headers limit from request object to parser
if (typeof req.maxHeadersCount === 'number') {
parser.maxHeaderPairs = req.maxHeadersCount << 1;
} else {
// Set default value because parser may be reused from FreeList
parser.maxHeaderPairs = 2000;
}
socket._httpMessage = req;
// Setup "drain" propogation.
httpSocketSetup(socket);
socket.ondata = socketOnData;
socket.onend = socketOnEnd;
socket.on('error', socketErrorListener);
socket.on('close', socketCloseListener);
parser.onIncoming = parserOnIncomingClient;
req.emit('socket', socket); req.emit('socket', socket);
}); });
}; };
ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) { ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) {