Sockets should never be attached to a ClientRequest before nextTick().
This way the API for interacting directly with the socket object is consistent before and after the Agent pool is exhausted. Fixes #1601.
This commit is contained in:
parent
1a0edbca86
commit
7c87e092fb
250
lib/http2.js
250
lib/http2.js
@ -1070,148 +1070,148 @@ ClientRequest.prototype.abort = function() {
|
||||
};
|
||||
|
||||
ClientRequest.prototype.onSocket = function(socket) {
|
||||
var parser = parsers.alloc();
|
||||
var req = this;
|
||||
process.nextTick(function () {
|
||||
var parser = parsers.alloc();
|
||||
req.socket = socket;
|
||||
req.connection = socket;
|
||||
parser.reinitialize('response');
|
||||
parser.socket = socket;
|
||||
parser.incoming = null;
|
||||
req.parser = parser;
|
||||
|
||||
req.socket = socket;
|
||||
req.connection = socket;
|
||||
parser.reinitialize('response');
|
||||
parser.socket = socket;
|
||||
parser.incoming = null;
|
||||
req.parser = parser;
|
||||
socket._httpMessage = req;
|
||||
// Setup "drain" propogation.
|
||||
httpSocketSetup(socket);
|
||||
|
||||
socket._httpMessage = req;
|
||||
// Setup "drain" propogation.
|
||||
httpSocketSetup(socket);
|
||||
var errorListener = function(err) {
|
||||
debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack);
|
||||
req.emit('error', err);
|
||||
// For Safety. Some additional errors might fire later on
|
||||
// and we need to make sure we don't double-fire the error event.
|
||||
req._hadError = true;
|
||||
parser.finish();
|
||||
socket.destroy();
|
||||
}
|
||||
socket.on('error', errorListener);
|
||||
|
||||
var errorListener = function(err) {
|
||||
debug('HTTP SOCKET ERROR: ' + err.message + '\n' + err.stack);
|
||||
req.emit('error', err);
|
||||
// For Safety. Some additional errors might fire later on
|
||||
// and we need to make sure we don't double-fire the error event.
|
||||
req._hadError = true;
|
||||
parser.finish();
|
||||
socket.destroy();
|
||||
}
|
||||
socket.on('error', errorListener);
|
||||
socket.ondata = function(d, start, end) {
|
||||
var ret = parser.execute(d, start, end - start);
|
||||
if (ret instanceof Error) {
|
||||
debug('parse error');
|
||||
socket.destroy(ret);
|
||||
} else if (parser.incoming && parser.incoming.upgrade) {
|
||||
var bytesParsed = ret;
|
||||
socket.ondata = null;
|
||||
socket.onend = null;
|
||||
|
||||
socket.ondata = function(d, start, end) {
|
||||
var ret = parser.execute(d, start, end - start);
|
||||
if (ret instanceof Error) {
|
||||
debug('parse error');
|
||||
socket.destroy(ret);
|
||||
} else if (parser.incoming && parser.incoming.upgrade) {
|
||||
var bytesParsed = ret;
|
||||
socket.ondata = null;
|
||||
socket.onend = null;
|
||||
var res = parser.incoming;
|
||||
req.res = res;
|
||||
|
||||
var res = parser.incoming;
|
||||
// This is start + byteParsed
|
||||
var upgradeHead = d.slice(start + bytesParsed, end);
|
||||
if (req.listeners('upgrade').length) {
|
||||
// Emit 'upgrade' on the Agent.
|
||||
req.upgraded = true;
|
||||
req.emit('upgrade', res, socket, upgradeHead);
|
||||
socket.emit('agentRemove');
|
||||
} else {
|
||||
// Got upgrade header, but have no handler.
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
socket.onend = function() {
|
||||
if (!req.res) {
|
||||
// If we don't have a response then we know that the socket
|
||||
// ended prematurely and we need to emit an error on the request.
|
||||
req.emit('error', new Error("Request ended prematurely."));
|
||||
req._hadError = true;
|
||||
}
|
||||
parser.finish();
|
||||
parsers.free(parser); // I don't know if this is necessary --Mikeal
|
||||
socket.destroy();
|
||||
};
|
||||
|
||||
var closeListener = function() {
|
||||
debug('HTTP socket close');
|
||||
req.emit('close');
|
||||
if (req.res && req.res.readable) {
|
||||
// Socket closed before we emitted "end" below.
|
||||
req.res.emit('aborted');
|
||||
req.res.emit('end');
|
||||
req.res.emit('close');
|
||||
} else if (!req.res && !req._hadError) {
|
||||
// This socket error fired before we started to
|
||||
// receive a response. The error needs to
|
||||
// fire on the request.
|
||||
req.emit('error', new Error('socket hang up'));
|
||||
}
|
||||
}
|
||||
socket.on('close', closeListener);
|
||||
|
||||
parser.onIncoming = function(res, shouldKeepAlive) {
|
||||
debug('AGENT incoming response!');
|
||||
|
||||
if (req.res) {
|
||||
// We already have a response object, this means the server
|
||||
// sent a double response.
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
req.res = res;
|
||||
|
||||
// This is start + byteParsed
|
||||
var upgradeHead = d.slice(start + bytesParsed, end);
|
||||
if (req.listeners('upgrade').length) {
|
||||
// Emit 'upgrade' on the Agent.
|
||||
req.upgraded = true;
|
||||
req.emit('upgrade', res, socket, upgradeHead);
|
||||
socket.emit('agentRemove');
|
||||
} else {
|
||||
// Got upgrade header, but have no handler.
|
||||
socket.destroy();
|
||||
// Responses to HEAD requests are crazy.
|
||||
// HEAD responses aren't allowed to have an entity-body
|
||||
// but *can* have a content-length which actually corresponds
|
||||
// to the content-length of the entity-body had the request
|
||||
// been a GET.
|
||||
var isHeadResponse = req.method == 'HEAD';
|
||||
debug('AGENT isHeadResponse ' + isHeadResponse);
|
||||
|
||||
if (res.statusCode == 100) {
|
||||
// restart the parser, as this is a continue message.
|
||||
delete req.res; // Clear res so that we don't hit double-responses.
|
||||
req.emit('continue');
|
||||
return true;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
socket.onend = function() {
|
||||
if (!req.res) {
|
||||
// If we don't have a response then we know that the socket
|
||||
// ended prematurely and we need to emit an error on the request.
|
||||
req.emit('error', new Error("Request ended prematurely."));
|
||||
req._hadError = true;
|
||||
}
|
||||
parser.finish();
|
||||
parsers.free(parser); // I don't know if this is necessary --Mikeal
|
||||
socket.destroy();
|
||||
};
|
||||
if (req.shouldKeepAlive && res.headers.connection !== 'keep-alive' && !req.upgraded) {
|
||||
// Server MUST respond with Connection:keep-alive for us to enable it.
|
||||
// If we've been upgraded (via WebSockets) we also shouldn't try to
|
||||
// keep the connection open.
|
||||
req.shouldKeepAlive = false;
|
||||
}
|
||||
|
||||
var closeListener = function() {
|
||||
debug('HTTP socket close');
|
||||
req.emit('close');
|
||||
if (req.res && req.res.readable) {
|
||||
// Socket closed before we emitted "end" below.
|
||||
req.res.emit('aborted');
|
||||
req.res.emit('end');
|
||||
req.res.emit('close');
|
||||
} else if (!req.res && !req._hadError) {
|
||||
// This socket error fired before we started to
|
||||
// receive a response. The error needs to
|
||||
// fire on the request.
|
||||
req.emit('error', new Error('socket hang up'));
|
||||
}
|
||||
}
|
||||
socket.on('close', closeListener);
|
||||
|
||||
parser.onIncoming = function(res, shouldKeepAlive) {
|
||||
debug('AGENT incoming response!');
|
||||
|
||||
if (req.res) {
|
||||
// We already have a response object, this means the server
|
||||
// sent a double response.
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
req.res = res;
|
||||
|
||||
// Responses to HEAD requests are crazy.
|
||||
// HEAD responses aren't allowed to have an entity-body
|
||||
// but *can* have a content-length which actually corresponds
|
||||
// to the content-length of the entity-body had the request
|
||||
// been a GET.
|
||||
var isHeadResponse = req.method == 'HEAD';
|
||||
debug('AGENT isHeadResponse ' + isHeadResponse);
|
||||
|
||||
if (res.statusCode == 100) {
|
||||
// restart the parser, as this is a continue message.
|
||||
delete req.res; // Clear res so that we don't hit double-responses.
|
||||
req.emit('continue');
|
||||
return true;
|
||||
}
|
||||
|
||||
if (req.shouldKeepAlive && res.headers.connection !== 'keep-alive' && !req.upgraded) {
|
||||
// Server MUST respond with Connection:keep-alive for us to enable it.
|
||||
// If we've been upgraded (via WebSockets) we also shouldn't try to
|
||||
// keep the connection open.
|
||||
req.shouldKeepAlive = false;
|
||||
}
|
||||
|
||||
res.addListener('end', function() {
|
||||
if (!req.shouldKeepAlive) {
|
||||
if (socket.writable) {
|
||||
debug('AGENT socket.destroySoon()');
|
||||
socket.destroySoon();
|
||||
res.addListener('end', function() {
|
||||
if (!req.shouldKeepAlive) {
|
||||
if (socket.writable) {
|
||||
debug('AGENT socket.destroySoon()');
|
||||
socket.destroySoon();
|
||||
}
|
||||
assert(!socket.writable);
|
||||
} else {
|
||||
debug('AGENT socket keep-alive');
|
||||
}
|
||||
assert(!socket.writable);
|
||||
} else {
|
||||
debug('AGENT socket keep-alive');
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
|
||||
req.emit('response', res);
|
||||
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
|
||||
req.emit('response', res);
|
||||
|
||||
res.on('end', function() {
|
||||
if (req.shouldKeepAlive) {
|
||||
socket.removeListener('close', closeListener);
|
||||
socket.removeListener('error', errorListener);
|
||||
socket.emit('free');
|
||||
}
|
||||
});
|
||||
res.on('end', function() {
|
||||
if (req.shouldKeepAlive) {
|
||||
socket.removeListener('close', closeListener);
|
||||
socket.removeListener('error', errorListener);
|
||||
socket.emit('free');
|
||||
}
|
||||
});
|
||||
|
||||
return isHeadResponse;
|
||||
};
|
||||
process.nextTick(function() {
|
||||
return isHeadResponse;
|
||||
};
|
||||
req.emit('socket', socket);
|
||||
});
|
||||
|
||||
};
|
||||
ClientRequest.prototype._deferToConnect = function(method, arguments, cb) {
|
||||
// This function is for calls that need to happen once the socket is
|
||||
|
Loading…
x
Reference in New Issue
Block a user