diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a3603f42a63..40917de7abf 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -110,14 +110,28 @@ Transform.prototype._transform = function(chunk, output, cb) { Transform.prototype._write = function(chunk, cb) { var ts = this._transformState; + var rs = this._readableState; ts.buffer.push([chunk, cb]); + // no need for auto-pull if already in the midst of one. + if (ts.transforming) + return; + // now we have something to transform, if we were waiting for it. - if (ts.pendingReadCb && !ts.transforming) { + // kick off a _read to pull it in. + if (ts.pendingReadCb) { var readcb = ts.pendingReadCb; ts.pendingReadCb = null; this._read(-1, readcb); } + + // if we weren't waiting for it, but nothing is queued up, then + // still kick off a transform, just so it's there when the user asks. + if (rs.length === 0) { + var ret = this.read(); + if (ret !== null) + return cb(new Error('invalid stream transform state')); + } }; Transform.prototype._read = function(n, readcb) {