stream: Use push() for Transform._output()
This also slightly changes the semantics, in that a 'readable' event may be triggered by the first write() call, even if a user has not yet called read(). This happens because the Transform _write() handler is calling read(0) to start the flow of data. Technically, the new behavior is more 'correct', since it is more in line with the semantics of the 'readable' event in other streams.
This commit is contained in:
parent
530585b2d1
commit
b43e544140
@ -75,7 +75,7 @@ function TransformState(stream) {
|
|||||||
this.transforming = false;
|
this.transforming = false;
|
||||||
this.pendingReadCb = null;
|
this.pendingReadCb = null;
|
||||||
this.output = function(chunk) {
|
this.output = function(chunk) {
|
||||||
stream._output(chunk);
|
stream.push(chunk);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,9 +148,6 @@ Transform.prototype._read = function(n, readcb) {
|
|||||||
var rs = this._readableState;
|
var rs = this._readableState;
|
||||||
var ts = this._transformState;
|
var ts = this._transformState;
|
||||||
|
|
||||||
if (ts.pendingReadCb)
|
|
||||||
throw new Error('_read while _read already in progress');
|
|
||||||
|
|
||||||
ts.pendingReadCb = readcb;
|
ts.pendingReadCb = readcb;
|
||||||
|
|
||||||
// if there's nothing pending, then we just wait.
|
// if there's nothing pending, then we just wait.
|
||||||
@ -173,31 +170,6 @@ Transform.prototype._read = function(n, readcb) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Transform.prototype._output = function(chunk) {
|
|
||||||
if (!chunk || !chunk.length)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// if we've got a pending readcb, then just call that,
|
|
||||||
// and let Readable take care of it. If not, then we fill
|
|
||||||
// the readable buffer ourselves, and emit whatever's needed.
|
|
||||||
var ts = this._transformState;
|
|
||||||
var readcb = ts.pendingReadCb;
|
|
||||||
if (readcb) {
|
|
||||||
ts.pendingReadCb = null;
|
|
||||||
readcb(null, chunk);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise, it's up to us to fill the rs buffer.
|
|
||||||
var rs = this._readableState;
|
|
||||||
var len = rs.length;
|
|
||||||
rs.buffer.push(chunk);
|
|
||||||
rs.length += chunk.length;
|
|
||||||
if (rs.needReadable) {
|
|
||||||
rs.needReadable = false;
|
|
||||||
this.emit('readable');
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
function done(stream, er) {
|
function done(stream, er) {
|
||||||
if (er)
|
if (er)
|
||||||
@ -215,17 +187,5 @@ function done(stream, er) {
|
|||||||
if (ts.transforming)
|
if (ts.transforming)
|
||||||
throw new Error('calling transform done when still transforming');
|
throw new Error('calling transform done when still transforming');
|
||||||
|
|
||||||
// if we were waiting on a read, let them know that it isn't coming.
|
return stream.push(null);
|
||||||
var readcb = ts.pendingReadCb;
|
|
||||||
if (readcb)
|
|
||||||
return readcb();
|
|
||||||
|
|
||||||
rs.ended = true;
|
|
||||||
// we may have gotten a 'null' read before, and since there is
|
|
||||||
// no more data coming from the writable side, we need to emit
|
|
||||||
// now so that the consumer knows to pick up the tail bits.
|
|
||||||
if (rs.length && rs.needReadable)
|
|
||||||
stream.emit('readable');
|
|
||||||
else if (rs.length === 0)
|
|
||||||
stream.emit('end');
|
|
||||||
}
|
}
|
||||||
|
@ -215,35 +215,40 @@ test('passthrough event emission', function(t) {
|
|||||||
var i = 0;
|
var i = 0;
|
||||||
|
|
||||||
pt.write(new Buffer('foog'));
|
pt.write(new Buffer('foog'));
|
||||||
|
|
||||||
|
console.error('need emit 0');
|
||||||
pt.write(new Buffer('bark'));
|
pt.write(new Buffer('bark'));
|
||||||
|
|
||||||
|
console.error('should have emitted readable now 1 === %d', emits);
|
||||||
|
t.equal(emits, 1);
|
||||||
|
|
||||||
t.equal(pt.read(5).toString(), 'foogb');
|
t.equal(pt.read(5).toString(), 'foogb');
|
||||||
t.equal(pt.read(5) + '', 'null');
|
t.equal(pt.read(5) + '', 'null');
|
||||||
|
|
||||||
console.error('need emit 0');
|
console.error('need emit 1');
|
||||||
|
|
||||||
pt.write(new Buffer('bazy'));
|
pt.write(new Buffer('bazy'));
|
||||||
console.error('should have emitted, but not again');
|
console.error('should have emitted, but not again');
|
||||||
pt.write(new Buffer('kuel'));
|
pt.write(new Buffer('kuel'));
|
||||||
|
|
||||||
console.error('should have emitted readable now 1 === %d', emits);
|
console.error('should have emitted readable now 2 === %d', emits);
|
||||||
t.equal(emits, 1);
|
t.equal(emits, 2);
|
||||||
|
|
||||||
t.equal(pt.read(5).toString(), 'arkba');
|
t.equal(pt.read(5).toString(), 'arkba');
|
||||||
t.equal(pt.read(5).toString(), 'zykue');
|
t.equal(pt.read(5).toString(), 'zykue');
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
|
|
||||||
console.error('need emit 1');
|
console.error('need emit 2');
|
||||||
|
|
||||||
pt.end();
|
pt.end();
|
||||||
|
|
||||||
t.equal(emits, 2);
|
t.equal(emits, 3);
|
||||||
|
|
||||||
t.equal(pt.read(5).toString(), 'l');
|
t.equal(pt.read(5).toString(), 'l');
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
|
|
||||||
console.error('should not have emitted again');
|
console.error('should not have emitted again');
|
||||||
t.equal(emits, 2);
|
t.equal(emits, 3);
|
||||||
t.end();
|
t.end();
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -256,25 +261,28 @@ test('passthrough event emission reordered', function(t) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
pt.write(new Buffer('foog'));
|
pt.write(new Buffer('foog'));
|
||||||
|
console.error('need emit 0');
|
||||||
pt.write(new Buffer('bark'));
|
pt.write(new Buffer('bark'));
|
||||||
|
console.error('should have emitted readable now 1 === %d', emits);
|
||||||
|
t.equal(emits, 1);
|
||||||
|
|
||||||
t.equal(pt.read(5).toString(), 'foogb');
|
t.equal(pt.read(5).toString(), 'foogb');
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
|
|
||||||
console.error('need emit 0');
|
console.error('need emit 1');
|
||||||
pt.once('readable', function() {
|
pt.once('readable', function() {
|
||||||
t.equal(pt.read(5).toString(), 'arkba');
|
t.equal(pt.read(5).toString(), 'arkba');
|
||||||
|
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
|
|
||||||
console.error('need emit 1');
|
console.error('need emit 2');
|
||||||
pt.once('readable', function() {
|
pt.once('readable', function() {
|
||||||
t.equal(pt.read(5).toString(), 'zykue');
|
t.equal(pt.read(5).toString(), 'zykue');
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
pt.once('readable', function() {
|
pt.once('readable', function() {
|
||||||
t.equal(pt.read(5).toString(), 'l');
|
t.equal(pt.read(5).toString(), 'l');
|
||||||
t.equal(pt.read(5), null);
|
t.equal(pt.read(5), null);
|
||||||
t.equal(emits, 3);
|
t.equal(emits, 4);
|
||||||
t.end();
|
t.end();
|
||||||
});
|
});
|
||||||
pt.end();
|
pt.end();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user