http_server: pause socket properly
Account pending response data to decide whether pause the socket or not. Writable stream state is a not reliable measure, because it just says how much data is pending on a **current** request, thus not helping much with problem we are trying to solve here. PR-URL: https://github.com/nodejs/node/pull/3128
This commit is contained in:
parent
05040664c2
commit
b3d96782d4
@ -49,6 +49,7 @@ function OutgoingMessage() {
|
|||||||
this.output = [];
|
this.output = [];
|
||||||
this.outputEncodings = [];
|
this.outputEncodings = [];
|
||||||
this.outputCallbacks = [];
|
this.outputCallbacks = [];
|
||||||
|
this.outputSize = 0;
|
||||||
|
|
||||||
this.writable = true;
|
this.writable = true;
|
||||||
|
|
||||||
@ -71,6 +72,8 @@ function OutgoingMessage() {
|
|||||||
this._header = null;
|
this._header = null;
|
||||||
this._headers = null;
|
this._headers = null;
|
||||||
this._headerNames = {};
|
this._headerNames = {};
|
||||||
|
|
||||||
|
this._onPendingData = null;
|
||||||
}
|
}
|
||||||
util.inherits(OutgoingMessage, Stream);
|
util.inherits(OutgoingMessage, Stream);
|
||||||
|
|
||||||
@ -120,6 +123,9 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
|
|||||||
this.output.unshift(this._header);
|
this.output.unshift(this._header);
|
||||||
this.outputEncodings.unshift('binary');
|
this.outputEncodings.unshift('binary');
|
||||||
this.outputCallbacks.unshift(null);
|
this.outputCallbacks.unshift(null);
|
||||||
|
this.outputSize += this._header.length;
|
||||||
|
if (this._onPendingData !== null)
|
||||||
|
this._onPendingData(this._header.length);
|
||||||
}
|
}
|
||||||
this._headerSent = true;
|
this._headerSent = true;
|
||||||
}
|
}
|
||||||
@ -152,6 +158,9 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
|
|||||||
this.output = [];
|
this.output = [];
|
||||||
this.outputEncodings = [];
|
this.outputEncodings = [];
|
||||||
this.outputCallbacks = [];
|
this.outputCallbacks = [];
|
||||||
|
if (this._onPendingData !== null)
|
||||||
|
this._onPendingData(-this.outputSize);
|
||||||
|
this.outputSize = 0;
|
||||||
} else if (data.length === 0) {
|
} else if (data.length === 0) {
|
||||||
if (typeof callback === 'function')
|
if (typeof callback === 'function')
|
||||||
process.nextTick(callback);
|
process.nextTick(callback);
|
||||||
@ -175,6 +184,9 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
|
|||||||
this.output.push(data);
|
this.output.push(data);
|
||||||
this.outputEncodings.push(encoding);
|
this.outputEncodings.push(encoding);
|
||||||
this.outputCallbacks.push(callback);
|
this.outputCallbacks.push(callback);
|
||||||
|
this.outputSize += data.length;
|
||||||
|
if (this._onPendingData !== null)
|
||||||
|
this._onPendingData(data.length);
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -241,6 +241,8 @@ function Server(requestListener) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.timeout = 2 * 60 * 1000;
|
this.timeout = 2 * 60 * 1000;
|
||||||
|
|
||||||
|
this._pendingResponseData = 0;
|
||||||
}
|
}
|
||||||
util.inherits(Server, net.Server);
|
util.inherits(Server, net.Server);
|
||||||
|
|
||||||
@ -260,6 +262,13 @@ function connectionListener(socket) {
|
|||||||
var self = this;
|
var self = this;
|
||||||
var outgoing = [];
|
var outgoing = [];
|
||||||
var incoming = [];
|
var incoming = [];
|
||||||
|
var outgoingData = 0;
|
||||||
|
|
||||||
|
function updateOutgoingData(delta) {
|
||||||
|
outgoingData += delta;
|
||||||
|
if (socket._paused && outgoingData < socket._writableState.highWaterMark)
|
||||||
|
return socketOnDrain();
|
||||||
|
}
|
||||||
|
|
||||||
function abortIncoming() {
|
function abortIncoming() {
|
||||||
while (incoming.length) {
|
while (incoming.length) {
|
||||||
@ -425,8 +434,10 @@ function connectionListener(socket) {
|
|||||||
|
|
||||||
socket._paused = false;
|
socket._paused = false;
|
||||||
function socketOnDrain() {
|
function socketOnDrain() {
|
||||||
|
var needPause = outgoingData > socket._writableState.highWaterMark;
|
||||||
|
|
||||||
// If we previously paused, then start reading again.
|
// If we previously paused, then start reading again.
|
||||||
if (socket._paused) {
|
if (socket._paused && !needPause) {
|
||||||
socket._paused = false;
|
socket._paused = false;
|
||||||
socket.parser.resume();
|
socket.parser.resume();
|
||||||
socket.resume();
|
socket.resume();
|
||||||
@ -440,7 +451,8 @@ function connectionListener(socket) {
|
|||||||
// so that we don't become overwhelmed by a flood of
|
// so that we don't become overwhelmed by a flood of
|
||||||
// pipelined requests that may never be resolved.
|
// pipelined requests that may never be resolved.
|
||||||
if (!socket._paused) {
|
if (!socket._paused) {
|
||||||
var needPause = socket._writableState.needDrain;
|
var needPause = socket._writableState.needDrain ||
|
||||||
|
outgoingData >= socket._writableState.highWaterMark;
|
||||||
if (needPause) {
|
if (needPause) {
|
||||||
socket._paused = true;
|
socket._paused = true;
|
||||||
// We also need to pause the parser, but don't do that until after
|
// We also need to pause the parser, but don't do that until after
|
||||||
@ -451,6 +463,7 @@ function connectionListener(socket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var res = new ServerResponse(req);
|
var res = new ServerResponse(req);
|
||||||
|
res._onPendingData = updateOutgoingData;
|
||||||
|
|
||||||
res.shouldKeepAlive = shouldKeepAlive;
|
res.shouldKeepAlive = shouldKeepAlive;
|
||||||
DTRACE_HTTP_SERVER_REQUEST(req, socket);
|
DTRACE_HTTP_SERVER_REQUEST(req, socket);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user