stream: Remove output function from _transform

Just use stream.push(outputChunk) instead.
This commit is contained in:
isaacs 2013-03-03 19:05:44 -08:00
parent 049903e333
commit cd68d86c32
8 changed files with 69 additions and 69 deletions

View File

@ -589,11 +589,9 @@ In classes that extend the Transform class, make sure to call the
constructor so that the buffering settings can be properly constructor so that the buffering settings can be properly
initialized. initialized.
### transform.\_transform(chunk, outputFn, callback) ### transform.\_transform(chunk, callback)
* `chunk` {Buffer} The chunk to be transformed. * `chunk` {Buffer} The chunk to be transformed.
* `outputFn` {Function} Call this function with any output data to be
passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error * `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk. argument) when you are done processing the supplied chunk.
@ -609,20 +607,21 @@ Transform class, to handle the bytes being written, and pass them off
to the readable portion of the interface. Do asynchronous I/O, to the readable portion of the interface. Do asynchronous I/O,
process things, and so on. process things, and so on.
Call `transform.push(outputChunk)` 0 or more times to generate output
from this input chunk, depending on how much data you want to output
as a result of this chunk.
Call the callback function only when the current chunk is completely Call the callback function only when the current chunk is completely
consumed. Note that this may mean that you call the `outputFn` zero consumed. Note that there may or may not be output as a result of any
or more times, depending on how much data you want to output as a particular input chunk.
result of this chunk.
This method is prefixed with an underscore because it is internal to This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user the class that defines it, and should not be called directly by user
programs. However, you **are** expected to override this method in programs. However, you **are** expected to override this method in
your own extension classes. your own extension classes.
### transform.\_flush(outputFn, callback) ### transform.\_flush(callback)
* `outputFn` {Function} Call this function with any output data to be
passed to the readable interface.
* `callback` {Function} Call this function (optionally with an error * `callback` {Function} Call this function (optionally with an error
argument) when you are done flushing any remaining data. argument) when you are done flushing any remaining data.
@ -639,8 +638,9 @@ can with what is left, so that the data will be complete.
In those cases, you can implement a `_flush` method, which will be In those cases, you can implement a `_flush` method, which will be
called at the very end, after all the written data is consumed, but called at the very end, after all the written data is consumed, but
before emitting `end` to signal the end of the readable side. Just before emitting `end` to signal the end of the readable side. Just
like with `_transform`, call `outputFn` zero or more times, as like with `_transform`, call `transform.push(chunk)` zero or more
appropriate, and call `callback` when the flush operation is complete. times, as appropriate, and call `callback` when the flush operation is
complete.
This method is prefixed with an underscore because it is internal to This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user the class that defines it, and should not be called directly by user
@ -671,7 +671,7 @@ function SimpleProtocol(options) {
SimpleProtocol.prototype = Object.create( SimpleProtocol.prototype = Object.create(
Transform.prototype, { constructor: { value: SimpleProtocol }}); Transform.prototype, { constructor: { value: SimpleProtocol }});
SimpleProtocol.prototype._transform = function(chunk, output, done) { SimpleProtocol.prototype._transform = function(chunk, done) {
if (!this._inBody) { if (!this._inBody) {
// check if the chunk has a \n\n // check if the chunk has a \n\n
var split = -1; var split = -1;
@ -707,11 +707,11 @@ SimpleProtocol.prototype._transform = function(chunk, output, done) {
this.emit('header', this.header); this.emit('header', this.header);
// now, because we got some extra data, emit this first. // now, because we got some extra data, emit this first.
output(b); this.push(b);
} }
} else { } else {
// from there on, just provide the data to our consumer as-is. // from there on, just provide the data to our consumer as-is.
output(b); this.push(b);
} }
done(); done();
}; };

View File

@ -36,6 +36,6 @@ function PassThrough(options) {
Transform.call(this, options); Transform.call(this, options);
} }
PassThrough.prototype._transform = function(chunk, output, cb) { PassThrough.prototype._transform = function(chunk, cb) {
cb(null, chunk); cb(null, chunk);
}; };

View File

@ -71,10 +71,6 @@ util.inherits(Transform, Duplex);
function TransformState(options, stream) { function TransformState(options, stream) {
var ts = this; var ts = this;
this.output = function(chunk) {
ts.needTransform = false;
stream.push(chunk);
};
this.afterTransform = function(er, data) { this.afterTransform = function(er, data) {
return afterTransform(stream, er, data); return afterTransform(stream, er, data);
@ -99,7 +95,7 @@ function afterTransform(stream, er, data) {
ts.writecb = null; ts.writecb = null;
if (data !== null && data !== undefined) if (data !== null && data !== undefined)
ts.output(data); stream.push(data);
if (cb) if (cb)
cb(er); cb(er);
@ -132,7 +128,7 @@ function Transform(options) {
this.once('finish', function() { this.once('finish', function() {
if ('function' === typeof this._flush) if ('function' === typeof this._flush)
this._flush(ts.output, function(er) { this._flush(function(er) {
done(stream, er); done(stream, er);
}); });
else else
@ -140,12 +136,17 @@ function Transform(options) {
}); });
} }
Transform.prototype.push = function(chunk) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk);
};
// This is the part where you do stuff! // This is the part where you do stuff!
// override this function in implementation classes. // override this function in implementation classes.
// 'chunk' is an input chunk. // 'chunk' is an input chunk.
// //
// Call `output(newChunk)` to pass along transformed output // Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'output' zero or more times. // to the readable side. You may call 'push' zero or more times.
// //
// Call `cb(err)` when you are done with this chunk. If you pass // Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you // an error, then that'll put the hurt on the whole operation. If you
@ -158,11 +159,13 @@ Transform.prototype._write = function(chunk, cb) {
var ts = this._transformState; var ts = this._transformState;
ts.writecb = cb; ts.writecb = cb;
ts.writechunk = chunk; ts.writechunk = chunk;
if (ts.transforming) if (!ts.transforming) {
return; var rs = this._readableState;
var rs = this._readableState; if (ts.needTransform ||
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) rs.needReadable ||
this._read(rs.bufferSize); rs.length < rs.highWaterMark)
this._read(rs.bufferSize);
}
}; };
// Doesn't matter what the args are here. // Doesn't matter what the args are here.
@ -173,13 +176,12 @@ Transform.prototype._read = function(n) {
if (ts.writechunk && ts.writecb && !ts.transforming) { if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true; ts.transforming = true;
this._transform(ts.writechunk, ts.output, ts.afterTransform); this._transform(ts.writechunk, ts.afterTransform);
return; } else {
// mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
} }
// mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
}; };

View File

@ -160,13 +160,13 @@ function Hash(algorithm, options) {
util.inherits(Hash, stream.Transform); util.inherits(Hash, stream.Transform);
Hash.prototype._transform = function(chunk, output, callback) { Hash.prototype._transform = function(chunk, callback) {
this._binding.update(chunk); this._binding.update(chunk);
callback(); callback();
}; };
Hash.prototype._flush = function(output, callback) { Hash.prototype._flush = function(callback) {
output(this._binding.digest()); this.push(this._binding.digest());
callback(); callback();
}; };
@ -226,13 +226,13 @@ function Cipher(cipher, password, options) {
util.inherits(Cipher, stream.Transform); util.inherits(Cipher, stream.Transform);
Cipher.prototype._transform = function(chunk, output, callback) { Cipher.prototype._transform = function(chunk, callback) {
output(this._binding.update(chunk)); this.push(this._binding.update(chunk));
callback(); callback();
}; };
Cipher.prototype._flush = function(output, callback) { Cipher.prototype._flush = function(callback) {
output(this._binding.final()); this.push(this._binding.final());
callback(); callback();
}; };

View File

@ -308,8 +308,8 @@ Zlib.prototype.reset = function reset() {
return this._binding.reset(); return this._binding.reset();
}; };
Zlib.prototype._flush = function(output, callback) { Zlib.prototype._flush = function(callback) {
this._transform(null, output, callback); this._transform(null, callback);
}; };
Zlib.prototype.flush = function(callback) { Zlib.prototype.flush = function(callback) {
@ -320,12 +320,10 @@ Zlib.prototype.flush = function(callback) {
ws.needDrain = true; ws.needDrain = true;
var self = this; var self = this;
this.once('drain', function() { this.once('drain', function() {
self._flush(ts.output, callback); self._flush(callback);
}); });
return; } else
} this._flush(callback || function() {});
this._flush(ts.output, callback || function() {});
}; };
Zlib.prototype.close = function(callback) { Zlib.prototype.close = function(callback) {
@ -345,7 +343,7 @@ Zlib.prototype.close = function(callback) {
}); });
}; };
Zlib.prototype._transform = function(chunk, output, cb) { Zlib.prototype._transform = function(chunk, cb) {
var flushFlag; var flushFlag;
var ws = this._writableState; var ws = this._writableState;
var ending = ws.ending || ws.ended; var ending = ws.ending || ws.ended;
@ -392,7 +390,7 @@ Zlib.prototype._transform = function(chunk, output, cb) {
var out = self._buffer.slice(self._offset, self._offset + have); var out = self._buffer.slice(self._offset, self._offset + have);
self._offset += have; self._offset += have;
// serve some output to the consumer. // serve some output to the consumer.
output(out); self.push(out);
} }
// exhausted the output buffer, or used all the input create a new one. // exhausted the output buffer, or used all the input create a new one.

View File

@ -67,9 +67,9 @@ test('writable side consumption', function(t) {
}); });
var transformed = 0; var transformed = 0;
tx._transform = function(chunk, output, cb) { tx._transform = function(chunk, cb) {
transformed += chunk.length; transformed += chunk.length;
output(chunk); tx.push(chunk);
cb(); cb();
}; };
@ -106,10 +106,10 @@ test('passthrough', function(t) {
test('simple transform', function(t) { test('simple transform', function(t) {
var pt = new Transform; var pt = new Transform;
pt._transform = function(c, output, cb) { pt._transform = function(c, cb) {
var ret = new Buffer(c.length); var ret = new Buffer(c.length);
ret.fill('x'); ret.fill('x');
output(ret); pt.push(ret);
cb(); cb();
}; };
@ -128,9 +128,9 @@ test('simple transform', function(t) {
test('async passthrough', function(t) { test('async passthrough', function(t) {
var pt = new Transform; var pt = new Transform;
pt._transform = function(chunk, output, cb) { pt._transform = function(chunk, cb) {
setTimeout(function() { setTimeout(function() {
output(chunk); pt.push(chunk);
cb(); cb();
}, 10); }, 10);
}; };
@ -154,11 +154,11 @@ test('assymetric transform (expand)', function(t) {
var pt = new Transform; var pt = new Transform;
// emit each chunk 2 times. // emit each chunk 2 times.
pt._transform = function(chunk, output, cb) { pt._transform = function(chunk, cb) {
setTimeout(function() { setTimeout(function() {
output(chunk); pt.push(chunk);
setTimeout(function() { setTimeout(function() {
output(chunk); pt.push(chunk);
cb(); cb();
}, 10) }, 10)
}, 10); }, 10);
@ -189,24 +189,24 @@ test('assymetric transform (compress)', function(t) {
// or whatever's left. // or whatever's left.
pt.state = ''; pt.state = '';
pt._transform = function(chunk, output, cb) { pt._transform = function(chunk, cb) {
if (!chunk) if (!chunk)
chunk = ''; chunk = '';
var s = chunk.toString(); var s = chunk.toString();
setTimeout(function() { setTimeout(function() {
this.state += s.charAt(0); this.state += s.charAt(0);
if (this.state.length === 3) { if (this.state.length === 3) {
output(new Buffer(this.state)); pt.push(new Buffer(this.state));
this.state = ''; this.state = '';
} }
cb(); cb();
}.bind(this), 10); }.bind(this), 10);
}; };
pt._flush = function(output, cb) { pt._flush = function(cb) {
// just output whatever we have. // just output whatever we have.
setTimeout(function() { setTimeout(function() {
output(new Buffer(this.state)); pt.push(new Buffer(this.state));
this.state = ''; this.state = '';
cb(); cb();
}.bind(this), 10); }.bind(this), 10);
@ -359,9 +359,9 @@ test('passthrough facaded', function(t) {
test('object transform (json parse)', function(t) { test('object transform (json parse)', function(t) {
console.error('json parse stream'); console.error('json parse stream');
var jp = new Transform({ objectMode: true }); var jp = new Transform({ objectMode: true });
jp._transform = function(data, output, cb) { jp._transform = function(data, cb) {
try { try {
output(JSON.parse(data)); jp.push(JSON.parse(data));
cb(); cb();
} catch (er) { } catch (er) {
cb(er); cb(er);
@ -399,9 +399,9 @@ test('object transform (json parse)', function(t) {
test('object transform (json stringify)', function(t) { test('object transform (json stringify)', function(t) {
console.error('json parse stream'); console.error('json parse stream');
var js = new Transform({ objectMode: true }); var js = new Transform({ objectMode: true });
js._transform = function(data, output, cb) { js._transform = function(data, cb) {
try { try {
output(JSON.stringify(data)); js.push(JSON.stringify(data));
cb(); cb();
} catch (er) { } catch (er) {
cb(er); cb(er);

View File

@ -32,7 +32,7 @@ function TestWriter() {
} }
util.inherits(TestWriter, stream.Writable); util.inherits(TestWriter, stream.Writable);
TestWriter.prototype._write = function (buffer, callback) { TestWriter.prototype._write = function (buffer, encoding, callback) {
console.log('write called'); console.log('write called');
// super slow write stream (callback never called) // super slow write stream (callback never called)
}; };

View File

@ -31,7 +31,7 @@ function TestWriter() {
} }
util.inherits(TestWriter, stream.Writable); util.inherits(TestWriter, stream.Writable);
TestWriter.prototype._write = function(buffer, callback) { TestWriter.prototype._write = function(buffer, encoding, callback) {
callback(null); callback(null);
}; };