stream: Throw on 'error' if listeners removed

In this situation:

    writable.on('error', handler);
    readable.pipe(writable);
    writable.removeListener('error', handler);
    writable.emit('error', new Error('boom'));

there is actually no error handler, but it doesn't throw, because of the
fix for stream.once('error', handler), in 23d92ec.

Note that simply reverting that change is not valid either, because
otherwise this will emit twice, being handled the first time, and then
throwing the second:

    writable.once('error', handler);
    readable.pipe(writable);
    writable.emit('error', new Error('boom'));

Fix this with a horrible hack to make the stream pipe onerror handler
added before any other userland handlers, so that our handler is not
affected by adding or removing any userland handlers.

Closes #6007.
This commit is contained in:
isaacs 2013-08-19 07:59:39 -07:00
parent 0c2960ef4a
commit 545807918e
2 changed files with 85 additions and 4 deletions

View File

@ -511,14 +511,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
// check for listeners before emit removes one-time listeners.
var errListeners = EE.listenerCount(dest, 'error');
function onerror(er) {
unpipe();
if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0)
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
}
dest.once('error', onerror);
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS.
if (!dest._events.error)
dest.on('error', onerror);
else if (Array.isArray(dest._events.error))
dest._events.error.unshift(onerror);
else
dest._events.error = [onerror, dest._events.error];
// Both close and finish should trigger unpipe, but only once.
function onclose() {

View File

@ -56,3 +56,76 @@ var Stream = require('stream').Stream;
assert.strictEqual(gotErr, err);
})();
(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;
var r = new R;
var w = new W;
var removed = false;
var didTest = false;
process.on('exit', function() {
assert(didTest);
console.log('ok');
});
r._read = function() {
setTimeout(function() {
assert(removed);
assert.throws(function() {
w.emit('error', new Error('fail'));
});
didTest = true;
});
};
w.on('error', myOnError);
r.pipe(w);
w.removeListener('error', myOnError);
removed = true;
function myOnError(er) {
throw new Error('this should not happen');
}
})();
(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;
var r = new R;
var w = new W;
var removed = false;
var didTest = false;
var caught = false;
process.on('exit', function() {
assert(didTest);
console.log('ok');
});
r._read = function() {
setTimeout(function() {
assert(removed);
w.emit('error', new Error('fail'));
didTest = true;
});
};
w.on('error', myOnError);
w._write = function() {};
r.pipe(w);
// Removing some OTHER random listener should not do anything
w.removeListener('error', function() {});
removed = true;
function myOnError(er) {
assert(!caught);
caught = true;
}
})();