diff --git a/lib/stream.js b/lib/stream.js index 632c87d2e26..ca7fc28c024 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -28,13 +28,9 @@ function Stream() { util.inherits(Stream, events.EventEmitter); exports.Stream = Stream; -var pipes = []; - Stream.prototype.pipe = function(dest, options) { var source = this; - pipes.push(dest); - function ondata(chunk) { if (dest.writable) { if (false === dest.write(chunk)) source.pause(); @@ -49,31 +45,48 @@ Stream.prototype.pipe = function(dest, options) { dest.on('drain', ondrain); - /* - * If the 'end' option is not supplied, dest.end() will be called when - * source gets the 'end' event. - */ - + // If the 'end' option is not supplied, dest.end() will be called when + // source gets the 'end' or 'close' events. Only dest.end() once, and + // only when all sources have ended. if (!options || options.end !== false) { - function onend() { - var index = pipes.indexOf(dest); - pipes.splice(index, 1); - - if (pipes.indexOf(dest) > -1) { - return; - } - - dest.end(); - } + dest._pipeCount = dest._pipeCount || 0; + dest._pipeCount++; source.on('end', onend); source.on('close', onend); } - /* - * Questionable: - */ + var didOnEnd = false; + function onend() { + if (didOnEnd) return; + didOnEnd = true; + dest._pipeCount--; + + // remove the listeners + cleanup(); + + if (dest._pipeCount > 0) { + // waiting for other incoming streams to end. + return; + } + + dest.end(); + } + + // don't leave dangling pipes when there are errors. + function onerror(er) { + cleanup(); + if (this.listeners('error').length === 1) { + throw er; // Unhandled stream error in pipe. + } + } + + source.on('error', onerror); + dest.on('error', onerror); + + // guarantee that source streams can be paused and resumed, even + // if the only effect is to proxy the event back up the pipe chain. if (!source.pause) { source.pause = function() { source.emit('pause'); @@ -86,27 +99,32 @@ Stream.prototype.pipe = function(dest, options) { }; } - var onpause = function() { + function onpause() { source.pause(); } dest.on('pause', onpause); - var onresume = function() { + function onresume() { if (source.readable) source.resume(); - }; + } dest.on('resume', onresume); - var cleanup = function () { + // remove all the event listeners that were added. + function cleanup() { source.removeListener('data', ondata); dest.removeListener('drain', ondrain); + source.removeListener('end', onend); source.removeListener('close', onend); dest.removeListener('pause', onpause); dest.removeListener('resume', onresume); + source.removeListener('error', onerror); + dest.removeListener('error', onerror); + source.removeListener('end', cleanup); source.removeListener('close', cleanup);