stream: fix regression introduced in #26059

In #26059, we introduced a bug that caused 'readable' to be nextTicked
on EOF of a ReadableStream. This breaks the dicer module on CITGM.
That change was partially reverted to still fix the bug in #25810 and
not break dicer.

See: https://github.com/nodejs/node/pull/26059
Fixes: https://github.com/nodejs/node/issues/25810

PR-URL: https://github.com/nodejs/node/pull/26643
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Matteo Collina 2019-03-13 22:26:18 +01:00
parent c5e619b8ff
commit 269103a0e5
7 changed files with 28 additions and 33 deletions

View File

@ -512,12 +512,24 @@ function onEofChunk(stream, state) {
} }
} }
state.ended = true; state.ended = true;
state.needReadable = false;
// We are not protecting if emittedReadable = true, if (state.sync) {
// so 'readable' gets scheduled anyway. // If we are sync, wait until next tick to emit the data.
state.emittedReadable = true; // Otherwise we risk emitting data in the flow()
process.nextTick(emitReadable_, stream); // the readable code triggers during a read() call
emitReadable(stream);
} else {
// Emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
state.emittedReadable = true;
// We have to emit readable now that we are EOF. Modules
// in the ecosystem (e.g. dicer) rely on this event being sync.
if (state.ended) {
emitReadable_(stream);
} else {
process.nextTick(emitReadable_, stream);
}
}
} }
// Don't emit readable right away in sync mode, because this can trigger // Don't emit readable right away in sync mode, because this can trigger

View File

@ -54,7 +54,7 @@ const assert = require('assert');
break; break;
assert.strictEqual(chunk.toString(), 'content'); assert.strictEqual(chunk.toString(), 'content');
} }
}, 2)); }));
} }
{ {
@ -78,7 +78,7 @@ const assert = require('assert');
break; break;
assert.strictEqual(chunk.toString(), 'content'); assert.strictEqual(chunk.toString(), 'content');
} }
}, 2)); }));
} }
{ {
@ -94,7 +94,7 @@ const assert = require('assert');
break; break;
assert.strictEqual(chunk.toString(), 'content'); assert.strictEqual(chunk.toString(), 'content');
} }
}, 2)); }));
t.push('content'); t.push('content');
t.push(null); t.push(null);

View File

@ -43,23 +43,12 @@ const noRead = new Readable({
read: () => {} read: () => {}
}); });
noRead.once('readable', common.mustCall(() => { noRead.on('readable', common.mustCall(() => {
// emittedReadable should be true when the readable event is emitted // emittedReadable should be true when the readable event is emitted
assert.strictEqual(noRead._readableState.emittedReadable, true); assert.strictEqual(noRead._readableState.emittedReadable, true);
noRead.read(0); noRead.read(0);
// emittedReadable is not reset during read(0) // emittedReadable is not reset during read(0)
assert.strictEqual(noRead._readableState.emittedReadable, true); assert.strictEqual(noRead._readableState.emittedReadable, true);
noRead.on('readable', common.mustCall(() => {
// The second 'readable' is emitted because we are ending
// emittedReadable should be true when the readable event is emitted
assert.strictEqual(noRead._readableState.emittedReadable, false);
noRead.read(0);
// emittedReadable is not reset during read(0)
assert.strictEqual(noRead._readableState.emittedReadable, false);
}));
})); }));
noRead.push('foo'); noRead.push('foo');

View File

@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => {
// When the readable event fires, needReadable is reset. // When the readable event fires, needReadable is reset.
assert.strictEqual(readable._readableState.needReadable, false); assert.strictEqual(readable._readableState.needReadable, false);
readable.read(); readable.read();
}, 2)); }));
// If a readable listener is attached, then a readable event is needed. // If a readable listener is attached, then a readable event is needed.
assert.strictEqual(readable._readableState.needReadable, true); assert.strictEqual(readable._readableState.needReadable, true);

View File

@ -31,7 +31,7 @@ const Readable = require('stream').Readable;
assert.strictEqual(state.reading, false); assert.strictEqual(state.reading, false);
} }
const expectedReadingMore = [true, false, false]; const expectedReadingMore = [true, true, false];
readable.on('readable', common.mustCall(() => { readable.on('readable', common.mustCall(() => {
// There is only one readingMore scheduled from on('data'), // There is only one readingMore scheduled from on('data'),
// after which everything is governed by the .read() call // after which everything is governed by the .read() call

View File

@ -15,7 +15,7 @@ const server = http.createServer(function(req, res) {
while ((chunk = res.read()) !== null) { while ((chunk = res.read()) !== null) {
data += chunk; data += chunk;
} }
}, 2)); }));
res.on('end', common.mustCall(function() { res.on('end', common.mustCall(function() {
console.log('end event'); console.log('end event');
assert.strictEqual(msg, data); assert.strictEqual(msg, data);

View File

@ -321,16 +321,10 @@ const Transform = require('_stream_transform');
pt.end(); pt.end();
// The next readable is emitted on the next tick. assert.strictEqual(emits, 1);
assert.strictEqual(emits, 0); assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
process.on('nextTick', function() { assert.strictEqual(emits, 1);
assert.strictEqual(emits, 1);
assert.strictEqual(pt.read(5).toString(), 'l');
assert.strictEqual(pt.read(5), null);
assert.strictEqual(emits, 1);
});
} }
{ {