stream: make sure 'readable' is emitted before ending the stream
Fixes: https://github.com/nodejs/node/issues/25810 PR-URL: https://github.com/nodejs/node/pull/26059 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
This commit is contained in:
parent
d38cd82513
commit
e95e7f9af5
@ -505,20 +505,12 @@ function onEofChunk(stream, state) {
|
||||
}
|
||||
}
|
||||
state.ended = true;
|
||||
state.needReadable = false;
|
||||
|
||||
if (state.sync) {
|
||||
// If we are sync, wait until next tick to emit the data.
|
||||
// Otherwise we risk emitting data in the flow()
|
||||
// the readable code triggers during a read() call
|
||||
emitReadable(stream);
|
||||
} else {
|
||||
// Emit 'readable' now to make sure it gets picked up.
|
||||
state.needReadable = false;
|
||||
if (!state.emittedReadable) {
|
||||
state.emittedReadable = true;
|
||||
emitReadable_(stream);
|
||||
}
|
||||
}
|
||||
// We are not protecting if emittedReadable = true,
|
||||
// so 'readable' gets scheduled anyway.
|
||||
state.emittedReadable = true;
|
||||
process.nextTick(emitReadable_, stream);
|
||||
}
|
||||
|
||||
// Don't emit readable right away in sync mode, because this can trigger
|
||||
|
@ -116,9 +116,6 @@ function Transform(options) {
|
||||
writeencoding: null
|
||||
};
|
||||
|
||||
// Start out asking for a readable event once data is transformed.
|
||||
this._readableState.needReadable = true;
|
||||
|
||||
// We have implemented the _read method, and done the other things
|
||||
// that Readable wants before the first _read call, so unset the
|
||||
// sync guard flag.
|
||||
|
@ -29,7 +29,7 @@ const server = http.createServer((req, res) => {
|
||||
};
|
||||
|
||||
const expectedData = [helloWorld, helloAgainLater];
|
||||
const expectedRead = [helloWorld, null, helloAgainLater, null];
|
||||
const expectedRead = [helloWorld, null, helloAgainLater, null, null];
|
||||
|
||||
const req = http.request(opts, (res) => {
|
||||
res.on('error', common.mustNotCall());
|
||||
@ -42,7 +42,7 @@ const server = http.createServer((req, res) => {
|
||||
assert.strictEqual(data, expectedRead.shift());
|
||||
next();
|
||||
} while (data !== null);
|
||||
}, 2));
|
||||
}, 3));
|
||||
|
||||
res.setEncoding('utf8');
|
||||
res.on('data', common.mustCall((data) => {
|
||||
|
146
test/parallel/test-stream-readable-emit-readable-short-stream.js
Normal file
146
test/parallel/test-stream-readable-emit-readable-short-stream.js
Normal file
@ -0,0 +1,146 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const stream = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
const r = new stream.Readable({
|
||||
read: common.mustCall(function() {
|
||||
this.push('content');
|
||||
this.push(null);
|
||||
})
|
||||
});
|
||||
|
||||
const t = new stream.Transform({
|
||||
transform: common.mustCall(function(chunk, encoding, callback) {
|
||||
this.push(chunk);
|
||||
return callback();
|
||||
}),
|
||||
flush: common.mustCall(function(callback) {
|
||||
return callback();
|
||||
})
|
||||
});
|
||||
|
||||
r.pipe(t);
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Transform({
|
||||
transform: common.mustCall(function(chunk, encoding, callback) {
|
||||
this.push(chunk);
|
||||
return callback();
|
||||
}),
|
||||
flush: common.mustCall(function(callback) {
|
||||
return callback();
|
||||
})
|
||||
});
|
||||
|
||||
t.end('content');
|
||||
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Transform({
|
||||
transform: common.mustCall(function(chunk, encoding, callback) {
|
||||
this.push(chunk);
|
||||
return callback();
|
||||
}),
|
||||
flush: common.mustCall(function(callback) {
|
||||
return callback();
|
||||
})
|
||||
});
|
||||
|
||||
t.write('content');
|
||||
t.end();
|
||||
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Readable({
|
||||
read() {
|
||||
}
|
||||
});
|
||||
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
|
||||
t.push('content');
|
||||
t.push(null);
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Readable({
|
||||
read() {
|
||||
}
|
||||
});
|
||||
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
|
||||
process.nextTick(() => {
|
||||
t.push('content');
|
||||
t.push(null);
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Transform({
|
||||
transform: common.mustCall(function(chunk, encoding, callback) {
|
||||
this.push(chunk);
|
||||
return callback();
|
||||
}),
|
||||
flush: common.mustCall(function(callback) {
|
||||
return callback();
|
||||
})
|
||||
});
|
||||
|
||||
t.on('readable', common.mustCall(function() {
|
||||
while (true) {
|
||||
const chunk = t.read();
|
||||
if (!chunk)
|
||||
break;
|
||||
assert.strictEqual(chunk.toString(), 'content');
|
||||
}
|
||||
}, 2));
|
||||
|
||||
t.write('content');
|
||||
t.end();
|
||||
}
|
@ -43,12 +43,23 @@ const noRead = new Readable({
|
||||
read: () => {}
|
||||
});
|
||||
|
||||
noRead.on('readable', common.mustCall(() => {
|
||||
noRead.once('readable', common.mustCall(() => {
|
||||
// emittedReadable should be true when the readable event is emitted
|
||||
assert.strictEqual(noRead._readableState.emittedReadable, true);
|
||||
noRead.read(0);
|
||||
// emittedReadable is not reset during read(0)
|
||||
assert.strictEqual(noRead._readableState.emittedReadable, true);
|
||||
|
||||
noRead.on('readable', common.mustCall(() => {
|
||||
// The second 'readable' is emitted because we are ending
|
||||
|
||||
// emittedReadable should be true when the readable event is emitted
|
||||
assert.strictEqual(noRead._readableState.emittedReadable, false);
|
||||
noRead.read(0);
|
||||
// emittedReadable is not reset during read(0)
|
||||
assert.strictEqual(noRead._readableState.emittedReadable, false);
|
||||
|
||||
}));
|
||||
}));
|
||||
|
||||
noRead.push('foo');
|
||||
|
@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => {
|
||||
// When the readable event fires, needReadable is reset.
|
||||
assert.strictEqual(readable._readableState.needReadable, false);
|
||||
readable.read();
|
||||
}));
|
||||
}, 2));
|
||||
|
||||
// If a readable listener is attached, then a readable event is needed.
|
||||
assert.strictEqual(readable._readableState.needReadable, true);
|
||||
@ -74,12 +74,14 @@ const slowProducer = new Readable({
|
||||
});
|
||||
|
||||
slowProducer.on('readable', common.mustCall(() => {
|
||||
if (slowProducer.read(8) === null) {
|
||||
const chunk = slowProducer.read(8);
|
||||
const state = slowProducer._readableState;
|
||||
if (chunk === null) {
|
||||
// The buffer doesn't have enough data, and the stream is not need,
|
||||
// we need to notify the reader when data arrives.
|
||||
assert.strictEqual(slowProducer._readableState.needReadable, true);
|
||||
assert.strictEqual(state.needReadable, true);
|
||||
} else {
|
||||
assert.strictEqual(slowProducer._readableState.needReadable, false);
|
||||
assert.strictEqual(state.needReadable, false);
|
||||
}
|
||||
}, 4));
|
||||
|
||||
|
@ -31,7 +31,7 @@ const Readable = require('stream').Readable;
|
||||
assert.strictEqual(state.reading, false);
|
||||
}
|
||||
|
||||
const expectedReadingMore = [true, false];
|
||||
const expectedReadingMore = [true, false, false];
|
||||
readable.on('readable', common.mustCall(() => {
|
||||
// There is only one readingMore scheduled from on('data'),
|
||||
// after which everything is governed by the .read() call
|
||||
@ -40,10 +40,12 @@ const Readable = require('stream').Readable;
|
||||
// If the stream has ended, we shouldn't be reading
|
||||
assert.strictEqual(state.ended, !state.reading);
|
||||
|
||||
const data = readable.read();
|
||||
if (data === null) // reached end of stream
|
||||
// consume all the data
|
||||
while (readable.read() !== null) {}
|
||||
|
||||
if (expectedReadingMore.length === 0) // reached end of stream
|
||||
process.nextTick(common.mustCall(onStreamEnd, 1));
|
||||
}, 2));
|
||||
}, 3));
|
||||
|
||||
readable.on('end', common.mustCall(onStreamEnd));
|
||||
readable.push('pushed');
|
||||
|
@ -11,8 +11,11 @@ const server = http.createServer(function(req, res) {
|
||||
let data = '';
|
||||
res.on('readable', common.mustCall(function() {
|
||||
console.log('readable event');
|
||||
data += res.read();
|
||||
}));
|
||||
let chunk;
|
||||
while ((chunk = res.read()) !== null) {
|
||||
data += chunk;
|
||||
}
|
||||
}, 2));
|
||||
res.on('end', common.mustCall(function() {
|
||||
console.log('end event');
|
||||
assert.strictEqual(msg, data);
|
||||
|
@ -321,11 +321,16 @@ const Transform = require('_stream_transform');
|
||||
|
||||
pt.end();
|
||||
|
||||
assert.strictEqual(emits, 1);
|
||||
assert.strictEqual(pt.read(5).toString(), 'l');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
// The next readable is emitted on the next tick.
|
||||
assert.strictEqual(emits, 0);
|
||||
|
||||
assert.strictEqual(emits, 1);
|
||||
process.on('nextTick', function() {
|
||||
assert.strictEqual(emits, 1);
|
||||
assert.strictEqual(pt.read(5).toString(), 'l');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
|
||||
assert.strictEqual(emits, 1);
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user