stream: need to cleanup event listeners if last stream is readable

fix: https://github.com/nodejs/node/issues/35452

PR-URL: https://github.com/nodejs/node/pull/41954
Fixes: https://github.com/nodejs/node/issues/35452
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Xuguang Mei 2022-02-15 18:15:14 +08:00 committed by GitHub
parent cc505a505a
commit 9fb7ac3bbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 120 additions and 12 deletions

View File

@ -2575,7 +2575,9 @@ run().catch(console.error);
`stream.pipeline()` leaves dangling event listeners on the streams
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.
`stream.pipeline()` closes all the streams when an error is raised.
The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior

View File

@ -31,6 +31,7 @@ const {
const {
isIterable,
isReadable,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');
@ -45,14 +46,17 @@ function destroyer(stream, reading, writing) {
finished = true;
});
eos(stream, { readable: reading, writable: writing }, (err) => {
const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;
});
return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
return {
destroy: (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
},
cleanup
};
}
@ -159,6 +163,10 @@ function pipelineImpl(streams, callback, opts) {
const signal = ac.signal;
const outerSignal = opts?.signal;
// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = [];
validateAbortSignal(outerSignal, 'options.signal');
function abort() {
@ -194,6 +202,9 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();
if (final) {
if (!error) {
lastStreamCleanup.forEach((fn) => fn());
}
process.nextTick(callback, error, value);
}
}
@ -204,14 +215,20 @@ function pipelineImpl(streams, callback, opts) {
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;
if (isNodeStream(stream)) {
if (end) {
destroys.push(destroyer(stream, reading, writing));
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
function onError(err) {
if (
err &&
err.name !== 'AbortError' &&
@ -219,7 +236,13 @@ function pipelineImpl(streams, callback, opts) {
) {
finish(err);
}
});
}
stream.on('error', onError);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
}
if (i === 0) {
@ -285,12 +308,19 @@ function pipelineImpl(streams, callback, opts) {
ret = pt;
destroys.push(destroyer(ret, false, true));
const { destroy, cleanup } = destroyer(ret, false, true);
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
@ -345,7 +375,7 @@ function pipe(src, dst, finish, { end }) {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
return eos(dst, { readable: false, writable: true }, finish);
}
module.exports = { pipelineImpl, pipeline };

View File

@ -0,0 +1,76 @@
'use strict';
const common = require('../common');
const { pipeline, Duplex, PassThrough, Writable } = require('stream');
const assert = require('assert');
process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 2));
// Ensure that listeners is removed if last stream is readble
// And other stream's listeners unchanged
const a = new PassThrough();
a.end('foobar');
const b = new Duplex({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(a, b, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(a.listenerCount('error') > 0);
assert.strictEqual(b.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
b.destroy(new Error('no way'));
}, 100);
}));
// Async generators
const c = new PassThrough();
c.end('foobar');
const d = pipeline(
c,
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(c.listenerCount('error') > 0);
assert.strictEqual(d.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
d.destroy(new Error('no way'));
}, 100);
})
);
// If last stream is not readable, will not throw and remove listeners
const e = new PassThrough();
e.end('foobar');
const f = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(e, f, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}
assert(e.listenerCount('error') > 0);
assert(f.listenerCount('error') > 0);
setTimeout(() => {
assert(f.listenerCount('error') > 0);
f.destroy(new Error('no way'));
}, 100);
}));