Add parser to agent

This commit is contained in:
Ryan Dahl 2011-01-20 10:29:54 -08:00
parent 60aea96f84
commit e576d4ec79

View File

@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) {
}; };
OutgoingMessage.prototype.detachSocket = function(socket) {
assert(socket._httpMessage == this);
socket._httpMessage = null;
this.socket = this.connection = null;
};
OutgoingMessage.prototype.destroy = function(error) { OutgoingMessage.prototype.destroy = function(error) {
this.socket.destroy(error); this.socket.destroy(error);
}; };
@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) {
OutgoingMessage.prototype._writeRaw = function(data, encoding) { OutgoingMessage.prototype._writeRaw = function(data, encoding) {
if (this.connection._httpMessage === this && this.connection.writable) { if (this.connection &&
this.connection._httpMessage === this &&
this.connection.writable) {
// There might be pending data in the this.output buffer. // There might be pending data in the this.output buffer.
while (this.output.length) { while (this.output.length) {
if (!this.connection.writable) { if (!this.connection.writable) {
@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) {
OutgoingMessage.prototype._finish = function() { OutgoingMessage.prototype._finish = function() {
this.socket._httpMessage = null;
this.socket = this.connection = null;
this.emit('finish'); this.emit('finish');
}; };
@ -868,6 +875,8 @@ function connectionListener(socket) {
// When we're finished writing the response, check if this is the last // When we're finished writing the response, check if this is the last
// respose, if so destroy the socket. // respose, if so destroy the socket.
res.on('finish', function() { res.on('finish', function() {
res.detachSocket(socket);
if (res._last) { if (res._last) {
socket.destroySoon(); socket.destroySoon();
} else { } else {
@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) {
var req = new ClientRequest(options); var req = new ClientRequest(options);
this.queue.push(req); this.queue.push(req);
/*
req.on('finish', function () { req.on('finish', function () {
self._cycle(); self._cycle();
}); });
*/
this._cycle(); this._cycle();
return req;
}; };
Agent.prototype._establishNewConnection = function(socket, message) { Agent.prototype._establishNewConnection = function() {
var self = this; var self = this;
assert(this.sockets.length < this.maxSockets); assert(this.sockets.length < this.maxSockets);
// Grab a new "socket". Depending on the implementation of _getConnection // Grab a new "socket". Depending on the implementation of _getConnection
// this could either be a raw TCP socket or a TLS stream. // this could either be a raw TCP socket or a TLS stream.
var socket = this._getConnection(this.host, this.port, function () { var socket = this._getConnection(this.host, this.port, function () {
debug("Agent _getConnection callback");
self._cycle(); self._cycle();
}); });
this.sockets.push(socket); this.sockets.push(socket);
// Add a parser to the socket.
var parser = parsers.alloc();
parser.reinitialize('response');
parser.socket = socket;
socket.ondata = function(d, start, end) {
var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
} else if (parser.incoming && parser.incoming.upgrade) {
var bytesParsed = ret;
socket.ondata = null;
socket.onend = null;
var res = parser.incoming;
// This is start + byteParsed + 1 due to the error of getting \n
// in the upgradeHead from the closing lines of the headers
var upgradeHead = d.slice(start + bytesParsed + 1, end);
if (self.listeners('upgrade').length) {
self.emit('upgrade', res, res.socket, upgradeHead);
} else {
// Got upgrade header, but have no handler.
socket.destroy();
}
}
};
socket.onend = function() {
parser.finish();
socket.destroy();
};
// When the socket closes remove it from the list of available sockets. // When the socket closes remove it from the list of available sockets.
socket.on('close', function() { socket.on('close', function() {
var i = self.sockets.indexOf(socket); var i = self.sockets.indexOf(socket);
if (i >= 0) self.sockets.splice(i, 1); if (i >= 0) self.sockets.splice(i, 1);
// unref the parser for easy gc
parsers.free(parser);
}); });
parser.onIncoming = function(res, shouldKeepAlive) {
debug('AGENT incoming response!');
var req = socket._httpMessage;
assert(req);
// Responses to HEAD requests are AWFUL. Ask Ryan.
// A major oversight in HTTP. Hence this nastiness.
var isHeadResponse = req.method == 'HEAD';
debug('AGENT isHeadResponse ' + isHeadResponse);
if (res.statusCode == 100) {
// restart the parser, as this is a continue message.
req.emit('continue');
return true;
}
if (req.shouldKeepAlive && res.headers.connection === 'close') {
req.shouldKeepAlive = false;
}
res.addListener('end', function() {
debug('AGENT request complete disconnecting.');
// For the moment we reconnect for every request. FIXME!
// All that should be required for keep-alive is to not reconnect,
// but outgoingFlush instead.
if (!req.shouldKeepAlive) socket.end();
req.detachSocket(socket);
self._cycle();
});
req.emit('response', res);
return isHeadResponse;
};
}; };
// Sub-classes can overwrite this method with e.g. something that supplies // Sub-classes can overwrite this method with e.g. something that supplies
// TLS streams. // TLS streams.
Agent.prototype._getConnection = function(host, port, cb) { Agent.prototype._getConnection = function(host, port, cb) {
debug("Agent connected!");
var c = net.createConnection(port, host); var c = net.createConnection(port, host);
c.on('connect', cb); c.on('connect', cb);
return c; return c;
@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) {
// waiting sockets. If a waiting socket cannot be found, it will // waiting sockets. If a waiting socket cannot be found, it will
// start the process of establishing one. // start the process of establishing one.
Agent.prototype._cycle = function() { Agent.prototype._cycle = function() {
debug("Agent _cycle");
var first = this.queue[0]; var first = this.queue[0];
if (!first) return; if (!first) return;
@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() {
// If the socket doesn't already have a message it's sending out // If the socket doesn't already have a message it's sending out
// and the socket is available for writing... // and the socket is available for writing...
if (!socket._httpMessage && (socket.writable && socket.readable)) { if (!socket._httpMessage && (socket.writable && socket.readable)) {
debug("Agent found socket, shift");
// We found an available connection! // We found an available connection!
this.queue.shift(); // remove first from queue. this.queue.shift(); // remove first from queue.
first.assignSocket(socket); first.assignSocket(socket);