http: avoid retaining unneeded memory

Prevent the events listeners of the sockets obtained with the HTTP
upgrade mechanism from retaining unneeded memory.

Ref: https://github.com/nodejs/node/issues/11868
PR-URL: https://github.com/nodejs/node/pull/11926
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
Luigi Pinca 2017-03-19 20:58:31 +01:00 committed by James M Snell
parent 4eb194a2b1
commit e0a9ad1af2
3 changed files with 29 additions and 27 deletions

View File

@ -26,6 +26,7 @@ const methods = binding.methods;
const HTTPParser = binding.HTTPParser; const HTTPParser = binding.HTTPParser;
const FreeList = require('internal/freelist').FreeList; const FreeList = require('internal/freelist').FreeList;
const ondrain = require('internal/http').ondrain;
const incoming = require('_http_incoming'); const incoming = require('_http_incoming');
const IncomingMessage = incoming.IncomingMessage; const IncomingMessage = incoming.IncomingMessage;
const readStart = incoming.readStart; const readStart = incoming.readStart;
@ -222,11 +223,6 @@ function freeParser(parser, req, socket) {
} }
function ondrain() {
if (this._httpMessage) this._httpMessage.emit('drain');
}
function httpSocketSetup(socket) { function httpSocketSetup(socket) {
socket.removeListener('drain', ondrain); socket.removeListener('drain', ondrain);
socket.on('drain', ondrain); socket.on('drain', ondrain);

View File

@ -34,7 +34,7 @@ const continueExpression = common.continueExpression;
const chunkExpression = common.chunkExpression; const chunkExpression = common.chunkExpression;
const httpSocketSetup = common.httpSocketSetup; const httpSocketSetup = common.httpSocketSetup;
const OutgoingMessage = require('_http_outgoing').OutgoingMessage; const OutgoingMessage = require('_http_outgoing').OutgoingMessage;
const outHeadersKey = require('internal/http').outHeadersKey; const { outHeadersKey, ondrain } = require('internal/http');
const STATUS_CODES = { const STATUS_CODES = {
100: 'Continue', 100: 'Continue',
@ -296,7 +296,7 @@ function connectionListener(socket) {
// otherwise, destroy on timeout by default // otherwise, destroy on timeout by default
if (this.timeout) if (this.timeout)
socket.setTimeout(this.timeout); socket.setTimeout(this.timeout);
socket.on('timeout', socketOnTimeout.bind(undefined, this, socket)); socket.on('timeout', socketOnTimeout);
var parser = parsers.alloc(); var parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST); parser.reinitialize(HTTPParser.REQUEST);
@ -314,9 +314,9 @@ function connectionListener(socket) {
var state = { var state = {
onData: null, onData: null,
onError: null,
onEnd: null, onEnd: null,
onClose: null, onClose: null,
onDrain: null,
outgoing: [], outgoing: [],
incoming: [], incoming: [],
// `outgoingData` is an approximate amount of bytes queued through all // `outgoingData` is an approximate amount of bytes queued through all
@ -326,21 +326,20 @@ function connectionListener(socket) {
outgoingData: 0 outgoingData: 0
}; };
state.onData = socketOnData.bind(undefined, this, socket, parser, state); state.onData = socketOnData.bind(undefined, this, socket, parser, state);
state.onError = socketOnError.bind(undefined, this, socket, state);
state.onEnd = socketOnEnd.bind(undefined, this, socket, parser, state); state.onEnd = socketOnEnd.bind(undefined, this, socket, parser, state);
state.onClose = socketOnClose.bind(undefined, socket, state); state.onClose = socketOnClose.bind(undefined, socket, state);
state.onDrain = socketOnDrain.bind(undefined, socket, state);
socket.on('data', state.onData); socket.on('data', state.onData);
socket.on('error', state.onError); socket.on('error', socketOnError);
socket.on('end', state.onEnd); socket.on('end', state.onEnd);
socket.on('close', state.onClose); socket.on('close', state.onClose);
socket.on('drain', state.onDrain);
parser.onIncoming = parserOnIncoming.bind(undefined, this, socket, state); parser.onIncoming = parserOnIncoming.bind(undefined, this, socket, state);
// We are consuming socket, so it won't get any actual data // We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume); socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause); socket.on('pause', onSocketPause);
socket.on('drain', socketOnDrain.bind(undefined, socket, state));
// Override on to unconsume on `data`, `readable` listeners // Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap; socket.on = socketOnWrap;
@ -378,15 +377,15 @@ function socketOnDrain(socket, state) {
} }
} }
function socketOnTimeout(server, socket) { function socketOnTimeout() {
var req = socket.parser && socket.parser.incoming; var req = this.parser && this.parser.incoming;
var reqTimeout = req && !req.complete && req.emit('timeout', socket); var reqTimeout = req && !req.complete && req.emit('timeout', this);
var res = socket._httpMessage; var res = this._httpMessage;
var resTimeout = res && res.emit('timeout', socket); var resTimeout = res && res.emit('timeout', this);
var serverTimeout = server.emit('timeout', socket); var serverTimeout = this.server.emit('timeout', this);
if (!reqTimeout && !resTimeout && !serverTimeout) if (!reqTimeout && !resTimeout && !serverTimeout)
socket.destroy(); this.destroy();
} }
function socketOnClose(socket, state) { function socketOnClose(socket, state) {
@ -413,7 +412,7 @@ function socketOnEnd(server, socket, parser, state) {
if (ret instanceof Error) { if (ret instanceof Error) {
debug('parse error'); debug('parse error');
state.onError(ret); socketOnError.call(socket, ret);
return; return;
} }
@ -443,19 +442,19 @@ function onParserExecute(server, socket, parser, state, ret, d) {
onParserExecuteCommon(server, socket, parser, state, ret, undefined); onParserExecuteCommon(server, socket, parser, state, ret, undefined);
} }
function socketOnError(server, socket, state, e) { function socketOnError(e) {
// Ignore further errors // Ignore further errors
socket.removeListener('error', state.onError); this.removeListener('error', socketOnError);
socket.on('error', () => {}); this.on('error', () => {});
if (!server.emit('clientError', e, socket)) if (!this.server.emit('clientError', e, this))
socket.destroy(e); this.destroy(e);
} }
function onParserExecuteCommon(server, socket, parser, state, ret, d) { function onParserExecuteCommon(server, socket, parser, state, ret, d) {
if (ret instanceof Error) { if (ret instanceof Error) {
debug('parse error'); debug('parse error');
state.onError(ret); socketOnError.call(socket, ret);
} else if (parser.incoming && parser.incoming.upgrade) { } else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade or CONNECT // Upgrade or CONNECT
var bytesParsed = ret; var bytesParsed = ret;
@ -468,6 +467,8 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
socket.removeListener('data', state.onData); socket.removeListener('data', state.onData);
socket.removeListener('end', state.onEnd); socket.removeListener('end', state.onEnd);
socket.removeListener('close', state.onClose); socket.removeListener('close', state.onClose);
socket.removeListener('drain', state.onDrain);
socket.removeListener('drain', ondrain);
unconsume(parser, socket); unconsume(parser, socket);
parser.finish(); parser.finish();
freeParser(parser, req, null); freeParser(parser, req, null);

View File

@ -1,5 +1,10 @@
'use strict'; 'use strict';
function ondrain() {
if (this._httpMessage) this._httpMessage.emit('drain');
}
module.exports = { module.exports = {
outHeadersKey: Symbol('outHeadersKey') outHeadersKey: Symbol('outHeadersKey'),
ondrain,
}; };