stream: 'readable' have precedence over flowing

In Streams3 the 'readable' event/.read() method had a lower precedence
than the `'data'` event that made them impossible to use them together.
This make `.resume()` a no-op if there is a listener for the
`'readable'` event, making the stream non-flowing if there is a
`'data'`  listener.

Fixes: https://github.com/nodejs/node/issues/18058

PR-URL: https://github.com/nodejs/node/pull/18994
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Matteo Collina 2018-02-26 09:24:30 +01:00
parent 1e07acd476
commit cf5f9867ff
5 changed files with 271 additions and 51 deletions

View File

@ -762,6 +762,9 @@ changes:
description: > description: >
'readable' is always emitted in the next tick after 'readable' is always emitted in the next tick after
.push() is called .push() is called
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Using 'readable' requires calling .read().
--> -->
The `'readable'` event is emitted when there is data available to be read from The `'readable'` event is emitted when there is data available to be read from
@ -770,10 +773,16 @@ cause some amount of data to be read into an internal buffer.
```javascript ```javascript
const readable = getReadableStreamSomehow(); const readable = getReadableStreamSomehow();
readable.on('readable', () => { readable.on('readable', function() {
// there is some data to read now // there is some data to read now
let data;
while (data = this.read()) {
console.log(data);
}
}); });
``` ```
The `'readable'` event will also be emitted once the end of the stream data The `'readable'` event will also be emitted once the end of the stream data
has been reached but before the `'end'` event is emitted. has been reached but before the `'end'` event is emitted.
@ -806,6 +815,10 @@ In general, the `readable.pipe()` and `'data'` event mechanisms are easier to
understand than the `'readable'` event. However, handling `'readable'` might understand than the `'readable'` event. However, handling `'readable'` might
result in increased throughput. result in increased throughput.
If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
##### readable.destroy([error]) ##### readable.destroy([error])
<!-- YAML <!-- YAML
added: v8.0.0 added: v8.0.0
@ -997,6 +1010,10 @@ the status of the `highWaterMark`.
##### readable.resume() ##### readable.resume()
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Resume has no effect if there is a 'readable' event listening
--> -->
* Returns: {this} * Returns: {this}
@ -1016,6 +1033,9 @@ getReadableStreamSomehow()
}); });
``` ```
The `readable.resume()` method has no effect if there is a `'readable'`
event listener.
##### readable.setEncoding(encoding) ##### readable.setEncoding(encoding)
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4

View File

@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) {
}; };
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
var state = stream._readableState; var state = stream._readableState;
if (chunk === null) { if (chunk === null) {
state.reading = false; state.reading = false;
@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) {
// Ensure readable listeners eventually get something // Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) { Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn); const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;
if (ev === 'data') { if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused // update readableListening so that resume() may be a no-op
if (this._readableState.flowing !== false) // a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;
// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
this.resume(); this.resume();
} else if (ev === 'readable') { } else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) { if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true; state.readableListening = state.needReadable = true;
state.emittedReadable = false; state.emittedReadable = false;
if (!state.reading) { if (state.length) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this); emitReadable(this);
} else if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} }
} }
} }
@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) {
}; };
Readable.prototype.addListener = Readable.prototype.on; Readable.prototype.addListener = Readable.prototype.on;
Readable.prototype.removeListener = function(ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn);
if (ev === 'readable') {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}
return res;
};
Readable.prototype.removeAllListeners = function(ev) {
const res = Stream.prototype.removeAllListeners.call(this, ev);
if (ev === 'readable' || ev === undefined) {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}
return res;
};
function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
}
function nReadingNextTick(self) { function nReadingNextTick(self) {
debug('readable nexttick read 0'); debug('readable nexttick read 0');
self.read(0); self.read(0);
@ -832,7 +873,9 @@ Readable.prototype.resume = function() {
var state = this._readableState; var state = this._readableState;
if (!state.flowing) { if (!state.flowing) {
debug('resume'); debug('resume');
state.flowing = true; // we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
resume(this, state); resume(this, state);
} }
return this; return this;

View File

@ -0,0 +1,52 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const helloWorld = 'Hello World!';
const helloAgainLater = 'Hello again later!';
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Length': '' + (helloWorld.length + helloAgainLater.length)
});
res.write(helloWorld);
// we need to make sure the data is flushed
setTimeout(() => {
res.end(helloAgainLater);
}, common.platformTimeout(10));
}).listen(0, function() {
const opts = {
hostname: 'localhost',
port: server.address().port,
path: '/'
};
const expectedData = [helloWorld, helloAgainLater];
const expectedRead = [helloWorld, null, helloAgainLater, null];
const req = http.request(opts, (res) => {
res.on('error', common.mustNotCall);
res.on('readable', common.mustCall(() => {
let data;
do {
data = res.read();
assert.strictEqual(data, expectedRead.shift());
} while (data !== null);
}, 2));
res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
assert.strictEqual(data, expectedData.shift());
}, 2));
res.on('end', common.mustCall(() => {
server.close();
}));
});
req.end();
});

View File

@ -3,33 +3,35 @@ const common = require('../common');
const assert = require('assert'); const assert = require('assert');
const Readable = require('stream').Readable; const Readable = require('stream').Readable;
const readable = new Readable({ {
const readable = new Readable({
read(size) {} read(size) {}
}); });
const state = readable._readableState; const state = readable._readableState;
// Starting off with false initially. // Starting off with false initially.
assert.strictEqual(state.reading, false); assert.strictEqual(state.reading, false);
assert.strictEqual(state.readingMore, false); assert.strictEqual(state.readingMore, false);
readable.on('data', common.mustCall((data) => { readable.on('data', common.mustCall((data) => {
// while in a flowing state, should try to read more. // while in a flowing state with a 'readable' listener
// we should not be reading more
if (readable.readableFlowing) if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true); assert.strictEqual(state.readingMore, true);
// reading as long as we've not ended // reading as long as we've not ended
assert.strictEqual(state.reading, !state.ended); assert.strictEqual(state.reading, !state.ended);
}, 2)); }, 2));
function onStreamEnd() { function onStreamEnd() {
// End of stream; state.reading is false // End of stream; state.reading is false
// And so should be readingMore. // And so should be readingMore.
assert.strictEqual(state.readingMore, false); assert.strictEqual(state.readingMore, false);
assert.strictEqual(state.reading, false); assert.strictEqual(state.reading, false);
} }
readable.on('readable', common.mustCall(() => { readable.on('readable', common.mustCall(() => {
// 'readable' always gets called before 'end' // 'readable' always gets called before 'end'
// since 'end' hasn't been emitted, more data could be incoming // since 'end' hasn't been emitted, more data could be incoming
assert.strictEqual(state.readingMore, true); assert.strictEqual(state.readingMore, true);
@ -40,27 +42,127 @@ readable.on('readable', common.mustCall(() => {
const data = readable.read(); const data = readable.read();
if (data === null) // reached end of stream if (data === null) // reached end of stream
process.nextTick(common.mustCall(onStreamEnd, 1)); process.nextTick(common.mustCall(onStreamEnd, 1));
}, 2)); }, 2));
readable.on('end', common.mustCall(onStreamEnd)); readable.on('end', common.mustCall(onStreamEnd));
readable.push('pushed');
readable.push('pushed'); readable.read(6);
// stop emitting 'data' events // reading
readable.pause(); assert.strictEqual(state.reading, true);
assert.strictEqual(state.readingMore, true);
// read() should only be called while operating in paused mode // add chunk to front
readable.read(6); readable.unshift('unshifted');
// reading // end
assert.strictEqual(state.reading, true); readable.push(null);
assert.strictEqual(state.readingMore, true); }
// resume emitting 'data' events {
readable.resume(); const readable = new Readable({
read(size) {}
});
// add chunk to front const state = readable._readableState;
readable.unshift('unshifted');
// end // Starting off with false initially.
readable.push(null); assert.strictEqual(state.reading, false);
assert.strictEqual(state.readingMore, false);
readable.on('data', common.mustCall((data) => {
// while in a flowing state without a 'readable' listener
// we should be reading more
if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true);
// reading as long as we've not ended
assert.strictEqual(state.reading, !state.ended);
}, 2));
function onStreamEnd() {
// End of stream; state.reading is false
// And so should be readingMore.
assert.strictEqual(state.readingMore, false);
assert.strictEqual(state.reading, false);
}
readable.on('end', common.mustCall(onStreamEnd));
readable.push('pushed');
// stop emitting 'data' events
assert.strictEqual(state.flowing, true);
readable.pause();
// paused
assert.strictEqual(state.reading, false);
assert.strictEqual(state.flowing, false);
readable.resume();
assert.strictEqual(state.reading, false);
assert.strictEqual(state.flowing, true);
// add chunk to front
readable.unshift('unshifted');
// end
readable.push(null);
}
{
const readable = new Readable({
read(size) {}
});
const state = readable._readableState;
// Starting off with false initially.
assert.strictEqual(state.reading, false);
assert.strictEqual(state.readingMore, false);
const onReadable = common.mustNotCall;
readable.on('readable', onReadable);
readable.on('data', common.mustCall((data) => {
// reading as long as we've not ended
assert.strictEqual(state.reading, !state.ended);
}, 2));
readable.removeListener('readable', onReadable);
function onStreamEnd() {
// End of stream; state.reading is false
// And so should be readingMore.
assert.strictEqual(state.readingMore, false);
assert.strictEqual(state.reading, false);
}
readable.on('end', common.mustCall(onStreamEnd));
readable.push('pushed');
// we are still not flowing, we will be resuming in the next tick
assert.strictEqual(state.flowing, false);
// wait for nextTick, so the readableListener flag resets
process.nextTick(function() {
readable.resume();
// stop emitting 'data' events
assert.strictEqual(state.flowing, true);
readable.pause();
// paused
assert.strictEqual(state.flowing, false);
readable.resume();
assert.strictEqual(state.flowing, true);
// add chunk to front
readable.unshift('unshifted');
// end
readable.push(null);
});
}

View File

@ -35,6 +35,7 @@ let expectEndingData = expectTotalData;
const r = new Readable({ highWaterMark: 1000 }); const r = new Readable({ highWaterMark: 1000 });
let chunks = totalChunks; let chunks = totalChunks;
r._read = function(n) { r._read = function(n) {
console.log('_read called', chunks);
if (!(chunks % 2)) if (!(chunks % 2))
setImmediate(push); setImmediate(push);
else if (!(chunks % 3)) else if (!(chunks % 3))
@ -49,6 +50,7 @@ function push() {
if (chunk) { if (chunk) {
totalPushed += chunk.length; totalPushed += chunk.length;
} }
console.log('chunks', chunks);
r.push(chunk); r.push(chunk);
} }
@ -64,6 +66,7 @@ function readn(n, then) {
expectEndingData -= n; expectEndingData -= n;
(function read() { (function read() {
const c = r.read(n); const c = r.read(n);
console.error('c', c);
if (!c) if (!c)
r.once('readable', read); r.once('readable', read);
else { else {