test: tests for _readableStream.awaitDrain
Fixes: https://github.com/nodejs/node/issues/8684 PR-URL: https://github.com/nodejs/node/pull/8914 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
55c42bc6e5
commit
21a077ae9a
@ -1,6 +1,7 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
// A consumer stream with a very low highWaterMark, which starts in a state
|
// A consumer stream with a very low highWaterMark, which starts in a state
|
||||||
// where it buffers the chunk it receives rather than indicating that they
|
// where it buffers the chunk it receives rather than indicating that they
|
||||||
@ -26,6 +27,11 @@ const readable = new stream.Readable({
|
|||||||
readable.pipe(writable);
|
readable.pipe(writable);
|
||||||
|
|
||||||
readable.once('pause', common.mustCall(() => {
|
readable.once('pause', common.mustCall(() => {
|
||||||
|
assert.strictEqual(
|
||||||
|
readable._readableState.awaitDrain,
|
||||||
|
1,
|
||||||
|
'awaitDrain doesn\'t increase'
|
||||||
|
);
|
||||||
// First pause, resume manually. The next write() to writable will still
|
// First pause, resume manually. The next write() to writable will still
|
||||||
// return false, because chunks are still being buffered, so it will increase
|
// return false, because chunks are still being buffered, so it will increase
|
||||||
// the awaitDrain counter again.
|
// the awaitDrain counter again.
|
||||||
@ -34,6 +40,11 @@ readable.once('pause', common.mustCall(() => {
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
readable.once('pause', common.mustCall(() => {
|
readable.once('pause', common.mustCall(() => {
|
||||||
|
assert.strictEqual(
|
||||||
|
readable._readableState.awaitDrain,
|
||||||
|
1,
|
||||||
|
'.resume() does not reset counter'
|
||||||
|
);
|
||||||
// Second pause, handle all chunks from now on. Once all callbacks that
|
// Second pause, handle all chunks from now on. Once all callbacks that
|
||||||
// are currently queued up are handled, the awaitDrain drain counter should
|
// are currently queued up are handled, the awaitDrain drain counter should
|
||||||
// fall back to 0 and all chunks that are pending on the readable side
|
// fall back to 0 and all chunks that are pending on the readable side
|
||||||
@ -50,5 +61,10 @@ readable.push(Buffer.alloc(100)); // Should get through to the writable.
|
|||||||
readable.push(null);
|
readable.push(null);
|
||||||
|
|
||||||
writable.on('finish', common.mustCall(() => {
|
writable.on('finish', common.mustCall(() => {
|
||||||
|
assert.strictEqual(
|
||||||
|
readable._readableState.awaitDrain,
|
||||||
|
0,
|
||||||
|
'awaitDrain not 0 after all chunks are written'
|
||||||
|
);
|
||||||
// Everything okay, all chunks were written.
|
// Everything okay, all chunks were written.
|
||||||
}));
|
}));
|
||||||
|
@ -1,16 +1,34 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const awaitDrainStates = [
|
||||||
|
1, // after first chunk before callback
|
||||||
|
1, // after second chunk before callback
|
||||||
|
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
|
||||||
|
];
|
||||||
|
|
||||||
// A writable stream which pushes data onto the stream which pipes into it,
|
// A writable stream which pushes data onto the stream which pipes into it,
|
||||||
// but only the first time it's written to. Since it's not paused at this time,
|
// but only the first time it's written to. Since it's not paused at this time,
|
||||||
// a second write will occur. If the pipe increases awaitDrain twice, we'll
|
// a second write will occur. If the pipe increases awaitDrain twice, we'll
|
||||||
// never get subsequent chunks because 'drain' is only emitted once.
|
// never get subsequent chunks because 'drain' is only emitted once.
|
||||||
const writable = new stream.Writable({
|
const writable = new stream.Writable({
|
||||||
write: common.mustCall((chunk, encoding, cb) => {
|
write: common.mustCall(function(chunk, encoding, cb) {
|
||||||
if (chunk.length === 32 * 1024) { // first chunk
|
if (chunk.length === 32 * 1024) { // first chunk
|
||||||
readable.push(new Buffer(33 * 1024)); // above hwm
|
const beforePush = readable._readableState.awaitDrain;
|
||||||
|
readable.push(new Buffer(34 * 1024)); // above hwm
|
||||||
|
// We should check if awaitDrain counter is increased.
|
||||||
|
const afterPush = readable._readableState.awaitDrain;
|
||||||
|
assert.strictEqual(afterPush - beforePush, 1,
|
||||||
|
'Counter is not increased for awaitDrain');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert.strictEqual(
|
||||||
|
awaitDrainStates.shift(),
|
||||||
|
readable._readableState.awaitDrain,
|
||||||
|
'State variable awaitDrain is not correct.'
|
||||||
|
);
|
||||||
cb();
|
cb();
|
||||||
}, 3)
|
}, 3)
|
||||||
});
|
});
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
// This is very similar to test-stream-pipe-cleanup-pause.js.
|
// This is very similar to test-stream-pipe-cleanup-pause.js.
|
||||||
|
|
||||||
const reader = new stream.Readable();
|
const reader = new stream.Readable();
|
||||||
const writer1 = new stream.Writable();
|
const writer1 = new stream.Writable();
|
||||||
const writer2 = new stream.Writable();
|
const writer2 = new stream.Writable();
|
||||||
|
const writer3 = new stream.Writable();
|
||||||
|
|
||||||
// 560000 is chosen here because it is larger than the (default) highWaterMark
|
// 560000 is chosen here because it is larger than the (default) highWaterMark
|
||||||
// and will cause `.write()` to return false
|
// and will cause `.write()` to return false
|
||||||
@ -19,7 +21,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
|
|||||||
this.emit('chunk-received');
|
this.emit('chunk-received');
|
||||||
cb();
|
cb();
|
||||||
}, 1);
|
}, 1);
|
||||||
|
|
||||||
writer1.once('chunk-received', function() {
|
writer1.once('chunk-received', function() {
|
||||||
|
assert.strictEqual(reader._readableState.awaitDrain, 0,
|
||||||
|
'initial value is not 0');
|
||||||
setImmediate(function() {
|
setImmediate(function() {
|
||||||
// This one should *not* get through to writer1 because writer2 is not
|
// This one should *not* get through to writer1 because writer2 is not
|
||||||
// "done" processing.
|
// "done" processing.
|
||||||
@ -29,12 +34,26 @@ writer1.once('chunk-received', function() {
|
|||||||
|
|
||||||
// A "slow" consumer:
|
// A "slow" consumer:
|
||||||
writer2._write = common.mustCall(function(chunk, encoding, cb) {
|
writer2._write = common.mustCall(function(chunk, encoding, cb) {
|
||||||
|
assert.strictEqual(
|
||||||
|
reader._readableState.awaitDrain, 1,
|
||||||
|
'awaitDrain isn\'t 1 after first push'
|
||||||
|
);
|
||||||
// Not calling cb here to "simulate" slow stream.
|
// Not calling cb here to "simulate" slow stream.
|
||||||
|
// This should be called exactly once, since the first .write() call
|
||||||
|
// will return false.
|
||||||
|
}, 1);
|
||||||
|
|
||||||
|
writer3._write = common.mustCall(function(chunk, encoding, cb) {
|
||||||
|
assert.strictEqual(
|
||||||
|
reader._readableState.awaitDrain, 2,
|
||||||
|
'awaitDrain isn\'t 2 after second push'
|
||||||
|
);
|
||||||
|
// Not calling cb here to "simulate" slow stream.
|
||||||
// This should be called exactly once, since the first .write() call
|
// This should be called exactly once, since the first .write() call
|
||||||
// will return false.
|
// will return false.
|
||||||
}, 1);
|
}, 1);
|
||||||
|
|
||||||
reader.pipe(writer1);
|
reader.pipe(writer1);
|
||||||
reader.pipe(writer2);
|
reader.pipe(writer2);
|
||||||
|
reader.pipe(writer3);
|
||||||
reader.push(buffer);
|
reader.push(buffer);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user