http: simplify drain()
Simplify and slightly optimize draining outgoing http streams. Avoid extra event listener and inline with rest of the drain logic. PR-URL: https://github.com/nodejs/node/pull/29081 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
a0cc62f345
commit
bdf07f4317
@ -30,7 +30,6 @@ const {
|
||||
_checkIsHttpToken: checkIsHttpToken,
|
||||
debug,
|
||||
freeParser,
|
||||
httpSocketSetup,
|
||||
parsers,
|
||||
HTTPParser,
|
||||
prepareError,
|
||||
@ -40,7 +39,7 @@ const Agent = require('_http_agent');
|
||||
const { Buffer } = require('buffer');
|
||||
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
|
||||
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
|
||||
const { kOutHeaders, ondrain } = require('internal/http');
|
||||
const { kOutHeaders, kNeedDrain } = require('internal/http');
|
||||
const { connResetException, codes } = require('internal/errors');
|
||||
const {
|
||||
ERR_HTTP_HEADERS_SENT,
|
||||
@ -335,6 +334,14 @@ function emitAbortNT() {
|
||||
this.emit('abort');
|
||||
}
|
||||
|
||||
function ondrain() {
|
||||
const msg = this._httpMessage;
|
||||
if (msg && !msg.finished && msg[kNeedDrain]) {
|
||||
msg[kNeedDrain] = false;
|
||||
msg.emit('drain');
|
||||
}
|
||||
}
|
||||
|
||||
function socketCloseListener() {
|
||||
const socket = this;
|
||||
const req = socket._httpMessage;
|
||||
@ -652,9 +659,6 @@ function tickOnSocket(req, socket) {
|
||||
socket.parser = parser;
|
||||
socket._httpMessage = req;
|
||||
|
||||
// Setup "drain" propagation.
|
||||
httpSocketSetup(socket);
|
||||
|
||||
// Propagate headers limit from request object to parser
|
||||
if (typeof req.maxHeadersCount === 'number') {
|
||||
parser.maxHeaderPairs = req.maxHeadersCount << 1;
|
||||
@ -666,6 +670,7 @@ function tickOnSocket(req, socket) {
|
||||
socket.on('data', socketOnData);
|
||||
socket.on('end', socketOnEnd);
|
||||
socket.on('close', socketCloseListener);
|
||||
socket.on('drain', ondrain);
|
||||
|
||||
if (
|
||||
req.timeout !== undefined ||
|
||||
|
@ -31,7 +31,6 @@ const { methods, HTTPParser } =
|
||||
internalBinding('http_parser') : internalBinding('http_parser_llhttp');
|
||||
|
||||
const FreeList = require('internal/freelist');
|
||||
const { ondrain } = require('internal/http');
|
||||
const incoming = require('_http_incoming');
|
||||
const {
|
||||
IncomingMessage,
|
||||
@ -201,12 +200,6 @@ function freeParser(parser, req, socket) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function httpSocketSetup(socket) {
|
||||
socket.removeListener('drain', ondrain);
|
||||
socket.on('drain', ondrain);
|
||||
}
|
||||
|
||||
const tokenRegExp = /^[\^_`a-zA-Z\-0-9!#$%&'*+.|~]+$/;
|
||||
/**
|
||||
* Verifies that the given val is a valid HTTP token
|
||||
@ -253,7 +246,6 @@ module.exports = {
|
||||
CRLF: '\r\n',
|
||||
debug,
|
||||
freeParser,
|
||||
httpSocketSetup,
|
||||
methods,
|
||||
parsers,
|
||||
kIncomingMessage,
|
||||
|
@ -27,7 +27,7 @@ const { getDefaultHighWaterMark } = require('internal/streams/state');
|
||||
const assert = require('internal/assert');
|
||||
const Stream = require('stream');
|
||||
const internalUtil = require('internal/util');
|
||||
const { kOutHeaders, utcDate } = require('internal/http');
|
||||
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
|
||||
const { Buffer } = require('buffer');
|
||||
const common = require('_http_common');
|
||||
const checkIsHttpToken = common._checkIsHttpToken;
|
||||
@ -96,6 +96,7 @@ function OutgoingMessage() {
|
||||
this._contentLength = null;
|
||||
this._hasBody = true;
|
||||
this._trailer = '';
|
||||
this[kNeedDrain] = false;
|
||||
|
||||
this.finished = false;
|
||||
this._headerSent = false;
|
||||
@ -590,7 +591,10 @@ Object.defineProperty(OutgoingMessage.prototype, 'writableEnded', {
|
||||
|
||||
const crlf_buf = Buffer.from('\r\n');
|
||||
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
|
||||
return write_(this, chunk, encoding, callback, false);
|
||||
const ret = write_(this, chunk, encoding, callback, false);
|
||||
if (!ret)
|
||||
this[kNeedDrain] = true;
|
||||
return ret;
|
||||
};
|
||||
|
||||
function write_(msg, chunk, encoding, callback, fromEnd) {
|
||||
@ -790,8 +794,8 @@ OutgoingMessage.prototype._flush = function _flush() {
|
||||
if (this.finished) {
|
||||
// This is a queue to the server or client to bring in the next this.
|
||||
this._finish();
|
||||
} else if (ret) {
|
||||
// This is necessary to prevent https from breaking
|
||||
} else if (ret && this[kNeedDrain]) {
|
||||
this[kNeedDrain] = false;
|
||||
this.emit('drain');
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,6 @@ const {
|
||||
CRLF,
|
||||
continueExpression,
|
||||
chunkExpression,
|
||||
httpSocketSetup,
|
||||
kIncomingMessage,
|
||||
HTTPParser,
|
||||
_checkInvalidHeaderChar: checkInvalidHeaderChar,
|
||||
@ -41,7 +40,7 @@ const {
|
||||
const { OutgoingMessage } = require('_http_outgoing');
|
||||
const {
|
||||
kOutHeaders,
|
||||
ondrain,
|
||||
kNeedDrain,
|
||||
nowDate,
|
||||
emitStatistics
|
||||
} = require('internal/http');
|
||||
@ -359,8 +358,6 @@ function connectionListener(socket) {
|
||||
function connectionListenerInternal(server, socket) {
|
||||
debug('SERVER new http connection');
|
||||
|
||||
httpSocketSetup(socket);
|
||||
|
||||
// Ensure that the server property of the socket is correctly set.
|
||||
// See https://github.com/nodejs/node/issues/13435
|
||||
if (socket.server === null)
|
||||
@ -455,6 +452,12 @@ function socketOnDrain(socket, state) {
|
||||
socket.parser.resume();
|
||||
socket.resume();
|
||||
}
|
||||
|
||||
const msg = socket._httpMessage;
|
||||
if (msg && !msg.finished && msg[kNeedDrain]) {
|
||||
msg[kNeedDrain] = false;
|
||||
msg.emit('drain');
|
||||
}
|
||||
}
|
||||
|
||||
function socketOnTimeout() {
|
||||
@ -581,7 +584,6 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
|
||||
socket.removeListener('end', state.onEnd);
|
||||
socket.removeListener('close', state.onClose);
|
||||
socket.removeListener('drain', state.onDrain);
|
||||
socket.removeListener('drain', ondrain);
|
||||
socket.removeListener('error', socketOnError);
|
||||
socket.removeListener('timeout', socketOnTimeout);
|
||||
unconsume(parser, socket);
|
||||
|
@ -28,10 +28,6 @@ function resetCache() {
|
||||
utcCache = undefined;
|
||||
}
|
||||
|
||||
function ondrain() {
|
||||
if (this._httpMessage) this._httpMessage.emit('drain');
|
||||
}
|
||||
|
||||
class HttpRequestTiming extends PerformanceEntry {
|
||||
constructor(statistics) {
|
||||
super();
|
||||
@ -50,7 +46,7 @@ function emitStatistics(statistics) {
|
||||
|
||||
module.exports = {
|
||||
kOutHeaders: Symbol('kOutHeaders'),
|
||||
ondrain,
|
||||
kNeedDrain: Symbol('kNeedDrain'),
|
||||
nowDate,
|
||||
utcDate,
|
||||
emitStatistics
|
||||
|
Loading…
x
Reference in New Issue
Block a user