http: reafactor incoming message destroy

Destroy the underlying socket only if it is not ready destroyed. Wait
for the stream to finish in that case.

PR-URL: https://github.com/nodejs/node/pull/33035
Refs: https://github.com/nodejs/node/issues/30625
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Daniele Belardi 2020-11-24 08:59:17 +01:00 committed by Node.js GitHub Bot
parent 55e83cbe95
commit 6120028ee3

View File

@ -27,7 +27,7 @@ const {
Symbol Symbol
} = primordials; } = primordials;
const Stream = require('stream'); const { Readable, finished } = require('stream');
const kHeaders = Symbol('kHeaders'); const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount'); const kHeadersCount = Symbol('kHeadersCount');
@ -54,7 +54,7 @@ function IncomingMessage(socket) {
}; };
} }
Stream.Readable.call(this, streamOptions); Readable.call(this, streamOptions);
this._readableState.readingMore = true; this._readableState.readingMore = true;
@ -89,8 +89,8 @@ function IncomingMessage(socket) {
// read by the user, so there's no point continuing to handle it. // read by the user, so there's no point continuing to handle it.
this._dumped = false; this._dumped = false;
} }
ObjectSetPrototypeOf(IncomingMessage.prototype, Stream.Readable.prototype); ObjectSetPrototypeOf(IncomingMessage.prototype, Readable.prototype);
ObjectSetPrototypeOf(IncomingMessage, Stream.Readable); ObjectSetPrototypeOf(IncomingMessage, Readable);
ObjectDefineProperty(IncomingMessage.prototype, 'connection', { ObjectDefineProperty(IncomingMessage.prototype, 'connection', {
get: function() { get: function() {
@ -168,10 +168,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
this.aborted = true; this.aborted = true;
this.emit('aborted'); this.emit('aborted');
} }
if (this.socket && !this.readableEnded) {
// If aborted and the underlying socket not already destroyed,
// destroy it.
if (this.socket && !this.socket.destroyed && this.aborted) {
this.socket.destroy(err); this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
cleanup();
onError(this, cb, e || err);
});
} else {
onError(this, cb, err);
} }
this.listenerCount('error') > 0 ? cb(err) : cb();
}; };
IncomingMessage.prototype._addHeaderLines = _addHeaderLines; IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
@ -350,6 +358,10 @@ IncomingMessage.prototype._dump = function _dump() {
} }
}; };
function onError(instance, cb, error) {
instance.listenerCount('error') > 0 ? cb(error) : cb();
}
module.exports = { module.exports = {
IncomingMessage, IncomingMessage,
readStart, readStart,