diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c598c91306f..e045ead396d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -62,7 +62,8 @@ function ReadableState(options, stream) { this.buffer = []; this.length = 0; - this.pipes = []; + this.pipes = null; + this.pipesCount = 0; this.flowing = false; this.ended = false; this.endEmitted = false; @@ -282,7 +283,19 @@ Readable.prototype.pipe = function(dest, pipeOpts) { var state = this._readableState; if (!pipeOpts) pipeOpts = {}; - state.pipes.push(dest); + + switch (state.pipesCount) { + case 0: + state.pipes = dest; + break; + case 1: + state.pipes = [ state.pipes, dest ]; + break; + default: + state.pipes.push(dest); + break; + } + state.pipesCount += 1; if ((!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && @@ -320,15 +333,22 @@ function flow(src, pipeOpts) { flow(src, pipeOpts); } - while (state.pipes.length && + function write(dest, i, list) { + var written = dest.write(chunk); + if (false === written) { + needDrain++; + dest.once('drain', ondrain); + } + } + + while (state.pipesCount && null !== (chunk = src.read(pipeOpts.chunkSize))) { - state.pipes.forEach(function(dest, i, list) { - var written = dest.write(chunk); - if (false === written) { - needDrain++; - dest.once('drain', ondrain); - } - }); + + if (state.pipesCount === 1) + write(state.pipes, 0, null); + else + state.pipes.forEach(write); + src.emit('data', chunk); // if anyone needs a drain, then we have to wait for that. @@ -340,7 +360,7 @@ function flow(src, pipeOpts) { // function, or in the while loop, then stop flowing. // // NB: This is a pretty rare edge case. - if (state.pipes.length === 0) { + if (state.pipesCount === 0) { state.flowing = false; // if there were data event listeners added, then switch to old mode. @@ -356,19 +376,55 @@ function flow(src, pipeOpts) { Readable.prototype.unpipe = function(dest) { var state = this._readableState; - if (!dest) { - // remove all of them. - state.pipes.forEach(function(dest, i, list) { + + // if we're not piping anywhere, then do nothing. + if (state.pipesCount === 0) + return this; + + // just one destination. most common case. + if (state.pipesCount === 1) { + // passed in one, but it's not the right one. + if (dest && dest !== state.pipes) + return this; + + if (!dest) + dest = state.pipes; + + // got a match. + state.pipes = null; + state.pipesCount = 0; + if (dest) dest.emit('unpipe', this); - }, this); - state.pipes.length = 0; - } else { - var i = state.pipes.indexOf(dest); - if (i !== -1) { - dest.emit('unpipe', this); - state.pipes.splice(i, 1); - } + return this; } + + // slow case. multiple pipe destinations. + + if (!dest) { + // remove all. + var dests = state.pipes; + var len = state.pipesCount; + state.pipes = null; + state.pipesCount = 0; + + for (var i = 0; i < len; i++) + dests[i].emit('unpipe', this); + + return this; + } + + // try to find the right one. + var i = state.pipes.indexOf(dest); + if (i === -1) + return this; + + state.pipes.splice(i, 1); + state.pipesCount -= 1; + if (state.pipesCount === 1) + state.pipes = state.pipes[0]; + + dest.emit('unpipe', this); + return this; }; diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index c28cdc917a6..0b4f4cf2b5e 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -209,19 +209,27 @@ test('pipe', function(t) { w[0].on('write', function() { if (--writes === 0) { r.unpipe(); + t.equal(r._readableState.pipes, null); w[0].end(); r.pipe(w[1]); + t.equal(r._readableState.pipes, w[1]); } }); var ended = 0; + var ended0 = false; + var ended1 = false; w[0].on('end', function(results) { + t.equal(ended0, false); + ended0 = true; ended++; t.same(results, expect[0]); }); w[1].on('end', function(results) { + t.equal(ended1, false); + ended1 = true; ended++; t.equal(ended, 2); t.same(results, expect[1]);