stream: always defer 'readable' with nextTick
Emit 'readable' always in the next tick, resulting in a single call to _read() per microtick. This removes the need for the user to implement buffering if they wanted to call this.push() multiple times in an asynchronous fashion, as this.push() triggers this._read() call. PR-URL: https://github.com/nodejs/node/pull/17979 Fixes: https://github.com/nodejs/node/issues/3203 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
800caac236
commit
1e0f3315c7
@ -747,6 +747,12 @@ The listener callback will be passed a single `Error` object.
|
||||
##### Event: 'readable'
|
||||
<!-- YAML
|
||||
added: v0.9.4
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/17979
|
||||
description: >
|
||||
'readable' is always emitted in the next tick after
|
||||
.push() is called
|
||||
-->
|
||||
|
||||
The `'readable'` event is emitted when there is data available to be read from
|
||||
@ -1647,6 +1653,13 @@ const myReadable = new Readable({
|
||||
```
|
||||
|
||||
#### readable.\_read(size)
|
||||
<!-- YAML
|
||||
added: v0.9.4
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/17979
|
||||
description: call _read() only once per microtick
|
||||
-->
|
||||
|
||||
* `size` {number} Number of bytes to read asynchronously
|
||||
|
||||
@ -1666,6 +1679,8 @@ additional data onto the queue.
|
||||
|
||||
*Note*: Once the `readable._read()` method has been called, it will not be
|
||||
called again until the [`readable.push()`][stream-push] method is called.
|
||||
`readable._read()` is guaranteed to be called only once within a
|
||||
synchronous execution, i.e. a microtick.
|
||||
|
||||
The `size` argument is advisory. For implementations where a "read" is a
|
||||
single operation that returns data can use the `size` argument to determine how
|
||||
|
@ -267,7 +267,6 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
|
||||
function addChunk(stream, state, chunk, addToFront) {
|
||||
if (state.flowing && state.length === 0 && !state.sync) {
|
||||
stream.emit('data', chunk);
|
||||
stream.read(0);
|
||||
} else {
|
||||
// update the buffer info.
|
||||
state.length += state.objectMode ? 1 : chunk.length;
|
||||
@ -496,7 +495,11 @@ function onEofChunk(stream, state) {
|
||||
state.ended = true;
|
||||
|
||||
// emit 'readable' now to make sure it gets picked up.
|
||||
emitReadable(stream);
|
||||
state.needReadable = false;
|
||||
if (!state.emittedReadable) {
|
||||
state.emittedReadable = true;
|
||||
emitReadable_(stream);
|
||||
}
|
||||
}
|
||||
|
||||
// Don't emit readable right away in sync mode, because this can trigger
|
||||
@ -508,16 +511,15 @@ function emitReadable(stream) {
|
||||
if (!state.emittedReadable) {
|
||||
debug('emitReadable', state.flowing);
|
||||
state.emittedReadable = true;
|
||||
if (state.sync)
|
||||
process.nextTick(emitReadable_, stream);
|
||||
else
|
||||
emitReadable_(stream);
|
||||
process.nextTick(emitReadable_, stream);
|
||||
}
|
||||
}
|
||||
|
||||
function emitReadable_(stream) {
|
||||
var state = stream._readableState;
|
||||
debug('emit readable');
|
||||
stream.emit('readable');
|
||||
state.needReadable = !state.flowing && !state.ended;
|
||||
flow(stream);
|
||||
}
|
||||
|
||||
@ -537,7 +539,7 @@ function maybeReadMore(stream, state) {
|
||||
|
||||
function maybeReadMore_(stream, state) {
|
||||
var len = state.length;
|
||||
while (!state.reading && !state.flowing && !state.ended &&
|
||||
while (!state.reading && !state.ended &&
|
||||
state.length < state.highWaterMark) {
|
||||
debug('maybeReadMore read 0');
|
||||
stream.read(0);
|
||||
@ -644,6 +646,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||
debug('ondata');
|
||||
increasedAwaitDrain = false;
|
||||
var ret = dest.write(chunk);
|
||||
debug('dest.write', ret);
|
||||
if (false === ret && !increasedAwaitDrain) {
|
||||
// If the user unpiped during `dest.write()`, it is possible
|
||||
// to get stuck in a permanently paused state if that write
|
||||
@ -824,8 +827,8 @@ function resume(stream, state) {
|
||||
}
|
||||
|
||||
function resume_(stream, state) {
|
||||
debug('resume', state.reading);
|
||||
if (!state.reading) {
|
||||
debug('resume read 0');
|
||||
stream.read(0);
|
||||
}
|
||||
|
||||
@ -1087,6 +1090,7 @@ function copyFromBuffer(n, list) {
|
||||
function endReadable(stream) {
|
||||
var state = stream._readableState;
|
||||
|
||||
debug('endReadable', state.endEmitted);
|
||||
if (!state.endEmitted) {
|
||||
state.ended = true;
|
||||
process.nextTick(endReadableNT, state, stream);
|
||||
@ -1094,6 +1098,8 @@ function endReadable(stream) {
|
||||
}
|
||||
|
||||
function endReadableNT(state, stream) {
|
||||
debug('endReadableNT', state.endEmitted, state.length);
|
||||
|
||||
// Check that we didn't get one last unshift.
|
||||
if (!state.endEmitted && state.length === 0) {
|
||||
state.endEmitted = true;
|
||||
|
@ -8,9 +8,9 @@ const uv = process.binding('uv');
|
||||
const s = new net.Socket({
|
||||
handle: {
|
||||
readStart: function() {
|
||||
process.nextTick(() => this.onread(uv.UV_EOF, null));
|
||||
setImmediate(() => this.onread(uv.UV_EOF, null));
|
||||
},
|
||||
close: (cb) => process.nextTick(cb)
|
||||
close: (cb) => setImmediate(cb)
|
||||
},
|
||||
writable: false
|
||||
});
|
||||
@ -18,8 +18,12 @@ assert.strictEqual(s, s.resume());
|
||||
|
||||
const events = [];
|
||||
|
||||
s.on('end', () => events.push('end'));
|
||||
s.on('close', () => events.push('close'));
|
||||
s.on('end', () => {
|
||||
events.push('end');
|
||||
});
|
||||
s.on('close', () => {
|
||||
events.push('close');
|
||||
});
|
||||
|
||||
process.on('exit', () => {
|
||||
assert.deepStrictEqual(events, [ 'end', 'close' ]);
|
||||
|
@ -3,32 +3,24 @@ const common = require('../common');
|
||||
const stream = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
const awaitDrainStates = [
|
||||
1, // after first chunk before callback
|
||||
1, // after second chunk before callback
|
||||
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
|
||||
];
|
||||
|
||||
// A writable stream which pushes data onto the stream which pipes into it,
|
||||
// but only the first time it's written to. Since it's not paused at this time,
|
||||
// a second write will occur. If the pipe increases awaitDrain twice, we'll
|
||||
// never get subsequent chunks because 'drain' is only emitted once.
|
||||
const writable = new stream.Writable({
|
||||
write: common.mustCall(function(chunk, encoding, cb) {
|
||||
if (chunk.length === 32 * 1024) { // first chunk
|
||||
const beforePush = readable._readableState.awaitDrain;
|
||||
readable.push(Buffer.alloc(34 * 1024)); // above hwm
|
||||
// We should check if awaitDrain counter is increased.
|
||||
const afterPush = readable._readableState.awaitDrain;
|
||||
assert.strictEqual(afterPush - beforePush, 1,
|
||||
'Counter is not increased for awaitDrain');
|
||||
}
|
||||
|
||||
assert.strictEqual(
|
||||
awaitDrainStates.shift(),
|
||||
readable._readableState.awaitDrain,
|
||||
0,
|
||||
'State variable awaitDrain is not correct.'
|
||||
);
|
||||
|
||||
if (chunk.length === 32 * 1024) { // first chunk
|
||||
readable.push(Buffer.alloc(34 * 1024)); // above hwm
|
||||
// We should check if awaitDrain counter is increased in the next
|
||||
// tick, because awaitDrain is incremented after this method finished
|
||||
process.nextTick(() => {
|
||||
assert.strictEqual(readable._readableState.awaitDrain, 1,
|
||||
'Counter is not increased for awaitDrain');
|
||||
});
|
||||
}
|
||||
|
||||
cb();
|
||||
}, 3)
|
||||
});
|
||||
|
@ -10,30 +10,33 @@ const readable = new Readable({
|
||||
// Initialized to false.
|
||||
assert.strictEqual(readable._readableState.emittedReadable, false);
|
||||
|
||||
const expected = [Buffer.from('foobar'), Buffer.from('quo'), null];
|
||||
readable.on('readable', common.mustCall(() => {
|
||||
// emittedReadable should be true when the readable event is emitted
|
||||
assert.strictEqual(readable._readableState.emittedReadable, true);
|
||||
readable.read();
|
||||
assert.deepStrictEqual(readable.read(), expected.shift());
|
||||
// emittedReadable is reset to false during read()
|
||||
assert.strictEqual(readable._readableState.emittedReadable, false);
|
||||
}, 4));
|
||||
}, 3));
|
||||
|
||||
// When the first readable listener is just attached,
|
||||
// emittedReadable should be false
|
||||
assert.strictEqual(readable._readableState.emittedReadable, false);
|
||||
|
||||
// Each one of these should trigger a readable event.
|
||||
// These trigger a single 'readable', as things are batched up
|
||||
process.nextTick(common.mustCall(() => {
|
||||
readable.push('foo');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
readable.push('bar');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
|
||||
// these triggers two readable events
|
||||
setImmediate(common.mustCall(() => {
|
||||
readable.push('quo');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
readable.push(null);
|
||||
process.nextTick(common.mustCall(() => {
|
||||
readable.push(null);
|
||||
}));
|
||||
}));
|
||||
|
||||
const noRead = new Readable({
|
||||
|
@ -38,7 +38,7 @@ asyncReadable.on('readable', common.mustCall(() => {
|
||||
// then we need to notify the reader on future changes.
|
||||
assert.strictEqual(asyncReadable._readableState.needReadable, true);
|
||||
}
|
||||
}, 3));
|
||||
}, 2));
|
||||
|
||||
process.nextTick(common.mustCall(() => {
|
||||
asyncReadable.push('foooo');
|
||||
@ -46,8 +46,9 @@ process.nextTick(common.mustCall(() => {
|
||||
process.nextTick(common.mustCall(() => {
|
||||
asyncReadable.push('bar');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
setImmediate(common.mustCall(() => {
|
||||
asyncReadable.push(null);
|
||||
assert.strictEqual(asyncReadable._readableState.needReadable, false);
|
||||
}));
|
||||
|
||||
const flowing = new Readable({
|
||||
@ -84,13 +85,13 @@ slowProducer.on('readable', common.mustCall(() => {
|
||||
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push('foo');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push('foo');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push('foo');
|
||||
}));
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push(null);
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push('foo');
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push('foo');
|
||||
process.nextTick(common.mustCall(() => {
|
||||
slowProducer.push(null);
|
||||
}));
|
||||
}));
|
||||
}));
|
||||
}));
|
||||
|
183
test/parallel/test-stream-readable-object-multi-push-async.js
Normal file
183
test/parallel/test-stream-readable-object-multi-push-async.js
Normal file
@ -0,0 +1,183 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Readable } = require('stream');
|
||||
|
||||
const MAX = 42;
|
||||
const BATCH = 10;
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read: common.mustCall(function() {
|
||||
console.log('>> READ');
|
||||
fetchData((err, data) => {
|
||||
if (err) {
|
||||
this.destroy(err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.length === 0) {
|
||||
console.log('pushing null');
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('pushing');
|
||||
data.forEach((d) => this.push(d));
|
||||
});
|
||||
}, Math.floor(MAX / BATCH) + 2)
|
||||
});
|
||||
|
||||
let i = 0;
|
||||
function fetchData(cb) {
|
||||
if (i > MAX) {
|
||||
setTimeout(cb, 10, null, []);
|
||||
} else {
|
||||
const array = [];
|
||||
const max = i + BATCH;
|
||||
for (; i < max; i++) {
|
||||
array.push(i);
|
||||
}
|
||||
setTimeout(cb, 10, null, array);
|
||||
}
|
||||
}
|
||||
|
||||
readable.on('readable', () => {
|
||||
let data;
|
||||
console.log('readable emitted');
|
||||
while (data = readable.read()) {
|
||||
console.log(data);
|
||||
}
|
||||
});
|
||||
|
||||
readable.on('end', common.mustCall(() => {
|
||||
assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read: common.mustCall(function() {
|
||||
console.log('>> READ');
|
||||
fetchData((err, data) => {
|
||||
if (err) {
|
||||
this.destroy(err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.length === 0) {
|
||||
console.log('pushing null');
|
||||
this.push(null);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('pushing');
|
||||
data.forEach((d) => this.push(d));
|
||||
});
|
||||
}, Math.floor(MAX / BATCH) + 2)
|
||||
});
|
||||
|
||||
let i = 0;
|
||||
function fetchData(cb) {
|
||||
if (i > MAX) {
|
||||
setTimeout(cb, 10, null, []);
|
||||
} else {
|
||||
const array = [];
|
||||
const max = i + BATCH;
|
||||
for (; i < max; i++) {
|
||||
array.push(i);
|
||||
}
|
||||
setTimeout(cb, 10, null, array);
|
||||
}
|
||||
}
|
||||
|
||||
readable.on('data', (data) => {
|
||||
console.log('data emitted', data);
|
||||
});
|
||||
|
||||
readable.on('end', common.mustCall(() => {
|
||||
assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read: common.mustCall(function() {
|
||||
console.log('>> READ');
|
||||
fetchData((err, data) => {
|
||||
if (err) {
|
||||
this.destroy(err);
|
||||
return;
|
||||
}
|
||||
|
||||
console.log('pushing');
|
||||
data.forEach((d) => this.push(d));
|
||||
|
||||
if (data[BATCH - 1] >= MAX) {
|
||||
console.log('pushing null');
|
||||
this.push(null);
|
||||
}
|
||||
});
|
||||
}, Math.floor(MAX / BATCH) + 1)
|
||||
});
|
||||
|
||||
let i = 0;
|
||||
function fetchData(cb) {
|
||||
const array = [];
|
||||
const max = i + BATCH;
|
||||
for (; i < max; i++) {
|
||||
array.push(i);
|
||||
}
|
||||
setTimeout(cb, 10, null, array);
|
||||
}
|
||||
|
||||
readable.on('data', (data) => {
|
||||
console.log('data emitted', data);
|
||||
});
|
||||
|
||||
readable.on('end', common.mustCall(() => {
|
||||
assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read: common.mustNotCall()
|
||||
});
|
||||
|
||||
readable.on('data', common.mustNotCall());
|
||||
|
||||
readable.push(null);
|
||||
|
||||
let nextTickPassed = false;
|
||||
process.nextTick(() => {
|
||||
nextTickPassed = true;
|
||||
});
|
||||
|
||||
readable.on('end', common.mustCall(() => {
|
||||
assert.strictEqual(nextTickPassed, false);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const readable = new Readable({
|
||||
objectMode: true,
|
||||
read: common.mustCall()
|
||||
});
|
||||
|
||||
readable.on('data', (data) => {
|
||||
console.log('data emitted', data);
|
||||
});
|
||||
|
||||
readable.on('end', common.mustCall());
|
||||
|
||||
setImmediate(() => {
|
||||
readable.push('aaa');
|
||||
readable.push(null);
|
||||
});
|
||||
}
|
@ -37,7 +37,8 @@ readable.on('readable', common.mustCall(() => {
|
||||
// if the stream has ended, we shouldn't be reading
|
||||
assert.strictEqual(state.ended, !state.reading);
|
||||
|
||||
if (readable.read() === null) // reached end of stream
|
||||
const data = readable.read();
|
||||
if (data === null) // reached end of stream
|
||||
process.nextTick(common.mustCall(onStreamEnd, 1));
|
||||
}, 2));
|
||||
|
||||
|
@ -306,25 +306,26 @@ const Transform = require('_stream_transform');
|
||||
pt.write(Buffer.from('foog'));
|
||||
pt.write(Buffer.from('bark'));
|
||||
|
||||
assert.strictEqual(emits, 1);
|
||||
assert.strictEqual(emits, 0);
|
||||
assert.strictEqual(pt.read(5).toString(), 'foogb');
|
||||
assert.strictEqual(String(pt.read(5)), 'null');
|
||||
assert.strictEqual(emits, 0);
|
||||
|
||||
pt.write(Buffer.from('bazy'));
|
||||
pt.write(Buffer.from('kuel'));
|
||||
|
||||
assert.strictEqual(emits, 2);
|
||||
assert.strictEqual(emits, 0);
|
||||
assert.strictEqual(pt.read(5).toString(), 'arkba');
|
||||
assert.strictEqual(pt.read(5).toString(), 'zykue');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
|
||||
pt.end();
|
||||
|
||||
assert.strictEqual(emits, 3);
|
||||
assert.strictEqual(emits, 1);
|
||||
assert.strictEqual(pt.read(5).toString(), 'l');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
|
||||
assert.strictEqual(emits, 3);
|
||||
assert.strictEqual(emits, 1);
|
||||
}
|
||||
|
||||
{
|
||||
@ -338,7 +339,7 @@ const Transform = require('_stream_transform');
|
||||
pt.write(Buffer.from('foog'));
|
||||
pt.write(Buffer.from('bark'));
|
||||
|
||||
assert.strictEqual(emits, 1);
|
||||
assert.strictEqual(emits, 0);
|
||||
assert.strictEqual(pt.read(5).toString(), 'foogb');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
|
||||
@ -352,7 +353,7 @@ const Transform = require('_stream_transform');
|
||||
pt.once('readable', common.mustCall(function() {
|
||||
assert.strictEqual(pt.read(5).toString(), 'l');
|
||||
assert.strictEqual(pt.read(5), null);
|
||||
assert.strictEqual(emits, 4);
|
||||
assert.strictEqual(emits, 3);
|
||||
}));
|
||||
pt.end();
|
||||
}));
|
||||
|
Loading…
x
Reference in New Issue
Block a user