stream: improve Transform performance
PR-URL: https://github.com/nodejs/node/pull/13322 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
e5a494857a
commit
e5dc934ef6
23
benchmark/streams/transform-creation.js
Normal file
23
benchmark/streams/transform-creation.js
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
'use strict';
|
||||||
|
var common = require('../common.js');
|
||||||
|
var Transform = require('stream').Transform;
|
||||||
|
var inherits = require('util').inherits;
|
||||||
|
|
||||||
|
var bench = common.createBenchmark(main, {
|
||||||
|
n: [1e6]
|
||||||
|
});
|
||||||
|
|
||||||
|
function MyTransform() {
|
||||||
|
Transform.call(this);
|
||||||
|
}
|
||||||
|
inherits(MyTransform, Transform);
|
||||||
|
MyTransform.prototype._transform = function() {};
|
||||||
|
|
||||||
|
function main(conf) {
|
||||||
|
var n = +conf.n;
|
||||||
|
|
||||||
|
bench.start();
|
||||||
|
for (var i = 0; i < n; ++i)
|
||||||
|
new MyTransform();
|
||||||
|
bench.end(n);
|
||||||
|
}
|
@ -70,41 +70,29 @@ const util = require('util');
|
|||||||
util.inherits(Transform, Duplex);
|
util.inherits(Transform, Duplex);
|
||||||
|
|
||||||
|
|
||||||
function TransformState(stream) {
|
function afterTransform(er, data) {
|
||||||
this.afterTransform = function(er, data) {
|
var ts = this._transformState;
|
||||||
return afterTransform(stream, er, data);
|
|
||||||
};
|
|
||||||
|
|
||||||
this.needTransform = false;
|
|
||||||
this.transforming = false;
|
|
||||||
this.writecb = null;
|
|
||||||
this.writechunk = null;
|
|
||||||
this.writeencoding = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
function afterTransform(stream, er, data) {
|
|
||||||
var ts = stream._transformState;
|
|
||||||
ts.transforming = false;
|
ts.transforming = false;
|
||||||
|
|
||||||
var cb = ts.writecb;
|
var cb = ts.writecb;
|
||||||
|
|
||||||
if (!cb) {
|
if (!cb) {
|
||||||
return stream.emit('error',
|
return this.emit('error',
|
||||||
new Error('write callback called multiple times'));
|
new Error('write callback called multiple times'));
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.writechunk = null;
|
ts.writechunk = null;
|
||||||
ts.writecb = null;
|
ts.writecb = null;
|
||||||
|
|
||||||
if (data !== null && data !== undefined)
|
if (data != null) // single equals check for both `null` and `undefined`
|
||||||
stream.push(data);
|
this.push(data);
|
||||||
|
|
||||||
cb(er);
|
cb(er);
|
||||||
|
|
||||||
var rs = stream._readableState;
|
var rs = this._readableState;
|
||||||
rs.reading = false;
|
rs.reading = false;
|
||||||
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
if (rs.needReadable || rs.length < rs.highWaterMark) {
|
||||||
stream._read(rs.highWaterMark);
|
this._read(rs.highWaterMark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,9 +103,14 @@ function Transform(options) {
|
|||||||
|
|
||||||
Duplex.call(this, options);
|
Duplex.call(this, options);
|
||||||
|
|
||||||
this._transformState = new TransformState(this);
|
this._transformState = {
|
||||||
|
afterTransform: afterTransform.bind(this),
|
||||||
var stream = this;
|
needTransform: false,
|
||||||
|
transforming: false,
|
||||||
|
writecb: null,
|
||||||
|
writechunk: null,
|
||||||
|
writeencoding: null
|
||||||
|
};
|
||||||
|
|
||||||
// start out asking for a readable event once data is transformed.
|
// start out asking for a readable event once data is transformed.
|
||||||
this._readableState.needReadable = true;
|
this._readableState.needReadable = true;
|
||||||
@ -136,14 +129,17 @@ function Transform(options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// When the writable side finishes, then flush out anything remaining.
|
// When the writable side finishes, then flush out anything remaining.
|
||||||
this.once('prefinish', function() {
|
this.on('prefinish', prefinish);
|
||||||
if (typeof this._flush === 'function')
|
}
|
||||||
this._flush(function(er, data) {
|
|
||||||
done(stream, er, data);
|
function prefinish() {
|
||||||
});
|
if (typeof this._flush === 'function') {
|
||||||
else
|
this._flush((er, data) => {
|
||||||
done(stream);
|
done(this, er, data);
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
done(this, null, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Transform.prototype.push = function(chunk, encoding) {
|
Transform.prototype.push = function(chunk, encoding) {
|
||||||
@ -208,18 +204,15 @@ function done(stream, er, data) {
|
|||||||
if (er)
|
if (er)
|
||||||
return stream.emit('error', er);
|
return stream.emit('error', er);
|
||||||
|
|
||||||
if (data !== null && data !== undefined)
|
if (data != null) // single equals check for both `null` and `undefined`
|
||||||
stream.push(data);
|
stream.push(data);
|
||||||
|
|
||||||
// if there's nothing in the write buffer, then that means
|
// if there's nothing in the write buffer, then that means
|
||||||
// that nothing more will ever be provided
|
// that nothing more will ever be provided
|
||||||
var ws = stream._writableState;
|
if (stream._writableState.length)
|
||||||
var ts = stream._transformState;
|
|
||||||
|
|
||||||
if (ws.length)
|
|
||||||
throw new Error('Calling transform done when ws.length != 0');
|
throw new Error('Calling transform done when ws.length != 0');
|
||||||
|
|
||||||
if (ts.transforming)
|
if (stream._transformState.transforming)
|
||||||
throw new Error('Calling transform done when still transforming');
|
throw new Error('Calling transform done when still transforming');
|
||||||
|
|
||||||
return stream.push(null);
|
return stream.push(null);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user