stream: add flow and buffer properties to streams

This adds computed properties to readable and writable streams to
allow access to the readable buffer, the writable buffer, and flow
state without accessing the readable or writable state.

These are the only uses of readable and writable state in the docs
so adding these work arounds allows them to be removed from the docs.

This also updates net, http_client and http_server to use the new
methods instead of manipulating readable and writable state directly.

See: https://github.com/nodejs/node/issues/445
PR-URL: https://github.com/nodejs/node/pull/12855
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Calvin Metcalf 2017-05-16 11:08:49 -04:00 committed by Matteo Collina
parent 31addac8bb
commit e20af3371b
13 changed files with 118 additions and 24 deletions

53
benchmark/http/upgrade.js Normal file
View File

@ -0,0 +1,53 @@
'use strict';
const common = require('../common.js');
const PORT = common.PORT;
const net = require('net');
const bench = common.createBenchmark(main, {
n: [5, 1000]
});
const reqData = 'GET / HTTP/1.1\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n' +
'WjN}|M(6';
const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n\r\n';
function main({ n }) {
process.env.PORT = PORT;
var server = require('../fixtures/simple-http-server.js')
.listen(common.PORT)
.on('listening', function() {
bench.start();
doBench(server.address(), n, function() {
bench.end(n);
server.close();
});
})
.on('upgrade', function(req, socket, upgradeHead) {
socket.resume();
socket.write(resData);
socket.end();
});
}
function doBench(address, count, done) {
if (count === 0) {
done();
return;
}
const conn = net.createConnection(address.port);
conn.write(reqData);
conn.resume();
conn.on('end', function() {
doBench(address, count - 1, done);
});
}

View File

@ -63,8 +63,8 @@ object mode is not safe.
<!--type=misc-->
Both [Writable][] and [Readable][] streams will store data in an internal
buffer that can be retrieved using `writable._writableState.getBuffer()` or
`readable._readableState.buffer`, respectively.
buffer that can be retrieved using `writable.writableBuffer` or
`readable.readableBuffer`, respectively.
The amount of data potentially buffered depends on the `highWaterMark` option
passed into the streams constructor. For normal streams, the `highWaterMark`
@ -602,22 +602,22 @@ Readable stream implementation.
Specifically, at any given point in time, every Readable is in one of three
possible states:
* `readable._readableState.flowing = null`
* `readable._readableState.flowing = false`
* `readable._readableState.flowing = true`
* `readable.readableFlowing = null`
* `readable.readableFlowing = false`
* `readable.readableFlowing = true`
When `readable._readableState.flowing` is `null`, no mechanism for consuming the
When `readable.readableFlowing` is `null`, no mechanism for consuming the
streams data is provided so the stream will not generate its data. While in this
state, attaching a listener for the `'data'` event, calling the `readable.pipe()`
method, or calling the `readable.resume()` method will switch
`readable._readableState.flowing` to `true`, causing the Readable to begin
`readable.readableFlowing` to `true`, causing the Readable to begin
actively emitting events as data is generated.
Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
will cause the `readable._readableState.flowing` to be set as `false`,
will cause the `readable.readableFlowing` to be set as `false`,
temporarily halting the flowing of events but *not* halting the generation of
data. While in this state, attaching a listener for the `'data'` event
would not cause `readable._readableState.flowing` to switch to `true`.
would not cause `readable.readableFlowing` to switch to `true`.
```js
const { PassThrough, Writable } = require('stream');
@ -626,14 +626,14 @@ const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// flowing is now false
// readableFlowing is now false
pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // will not emit 'data'
pass.resume(); // must be called to make 'data' being emitted
```
While `readable._readableState.flowing` is `false`, data may be accumulating
While `readable.readableFlowing` is `false`, data may be accumulating
within the streams internal buffer.
#### Choose One

View File

@ -432,9 +432,7 @@ function socketOnData(d) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
socket.readableFlowing = null;
req.emit(eventName, res, socket, bodyHead);
req.emit('close');

View File

@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);
// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
socket.readableFlowing = null;
server.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.

View File

@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});
Object.defineProperty(Duplex.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.getBuffer();
}
});
// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,

View File

@ -925,6 +925,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
}
});
Object.defineProperty(Readable.prototype, 'readableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState && this._readableState.buffer;
}
});
Object.defineProperty(Readable.prototype, 'readableFlowing', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState.flowing;
},
set: function(state) {
if (this._readableState) {
this._readableState.flowing = state;
}
}
});
// exposed for testing purposes only.
Readable._fromList = fromList;

View File

@ -324,6 +324,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
return this;
};
Object.defineProperty(Writable.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.getBuffer();
}
});
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&

View File

@ -254,7 +254,7 @@ function Socket(options) {
// stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
this._readableState.flowing = false;
this.readableFlowing = false;
} else if (!options.manualStart) {
this.read(0);
}
@ -819,7 +819,7 @@ protoGetter('bytesWritten', function bytesWritten() {
if (!state)
return undefined;
state.getBuffer().forEach(function(el) {
this.writableBuffer.forEach(function(el) {
if (el.chunk instanceof Buffer)
bytes += el.chunk.length;
else

View File

@ -47,6 +47,6 @@ s.read(0);
// ACTUALLY [1, 3, 5, 6, 4, 2]
process.on('exit', function() {
assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6');
assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6');
console.log('ok');
});

View File

@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false);
readable.on('data', common.mustCall((data) => {
// while in a flowing state, should try to read more.
if (state.flowing)
if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true);
// reading as long as we've not ended

View File

@ -46,7 +46,7 @@ const Transform = require('_stream_transform');
assert.strictEqual(tx._readableState.length, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) {
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}

View File

@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0);
console.error(src._readableState);
process.on('exit', function() {
src._readableState.buffer.length = 0;
src.readableBuffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok');

View File

@ -68,7 +68,7 @@ function readn(n, then) {
r.once('readable', read);
else {
assert.strictEqual(c.length, n);
assert(!r._readableState.flowing);
assert(!r.readableFlowing);
then();
}
})();