stream: remove usage of *State.highWaterMark

Replaced _readableState.highWaterMark with a .readableHighWaterMark
getter and _writableState.highWaterMark with a .writableHighWaterMark
getter.
The getters are non-enumerable because they break some prototype
manipulation that happen in the ecosystem.

Ref: https://github.com/nodejs/node/issues/445.

PR-URL: https://github.com/nodejs/node/pull/12860
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
Calvin Metcalf 2017-05-05 15:57:57 +02:00 committed by Matteo Collina
parent 9c1b18a59f
commit 157df5a47c
11 changed files with 73 additions and 17 deletions

View File

@ -437,6 +437,14 @@ process.nextTick(() => {
See also: [`writable.cork()`][]. See also: [`writable.cork()`][].
##### writable.writableHighWaterMark
<!-- YAML
added: REPLACEME
-->
Return the value of `highWaterMark` passed when constructing this
`Writable`.
##### writable.write(chunk[, encoding][, callback]) ##### writable.write(chunk[, encoding][, callback])
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4
@ -879,6 +887,14 @@ to prevent memory leaks.
never closed until the Node.js process exits, regardless of the specified never closed until the Node.js process exits, regardless of the specified
options. options.
##### readable.readableHighWaterMark
<!-- YAML
added: REPLACEME
-->
Return the value of `highWaterMark` passed when constructing this
`Readable`.
##### readable.read([size]) ##### readable.read([size])
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4

View File

@ -373,13 +373,13 @@ function connectionListener(socket) {
function updateOutgoingData(socket, state, delta) { function updateOutgoingData(socket, state, delta) {
state.outgoingData += delta; state.outgoingData += delta;
if (socket._paused && if (socket._paused &&
state.outgoingData < socket._writableState.highWaterMark) { state.outgoingData < socket.writeHWM) {
return socketOnDrain(socket, state); return socketOnDrain(socket, state);
} }
} }
function socketOnDrain(socket, state) { function socketOnDrain(socket, state) {
var needPause = state.outgoingData > socket._writableState.highWaterMark; var needPause = state.outgoingData > socket.writeHWM;
// If we previously paused, then start reading again. // If we previously paused, then start reading again.
if (socket._paused && !needPause) { if (socket._paused && !needPause) {
@ -569,7 +569,7 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
// pipelined requests that may never be resolved. // pipelined requests that may never be resolved.
if (!socket._paused) { if (!socket._paused) {
var ws = socket._writableState; var ws = socket._writableState;
if (ws.needDrain || state.outgoingData >= ws.highWaterMark) { if (ws.needDrain || state.outgoingData >= socket.writableHighWaterMark) {
socket._paused = true; socket._paused = true;
// We also need to pause the parser, but don't do that until after // We also need to pause the parser, but don't do that until after
// the call to execute, because we may still be processing the last // the call to execute, because we may still be processing the last

View File

@ -34,11 +34,14 @@ const Writable = require('_stream_writable');
util.inherits(Duplex, Readable); util.inherits(Duplex, Readable);
var keys = Object.keys(Writable.prototype); {
for (var v = 0; v < keys.length; v++) { // avoid scope creep, the keys array can then be collected
var method = keys[v]; const keys = Object.keys(Writable.prototype);
if (!Duplex.prototype[method]) for (var v = 0; v < keys.length; v++) {
Duplex.prototype[method] = Writable.prototype[method]; const method = keys[v];
if (!Duplex.prototype[method])
Duplex.prototype[method] = Writable.prototype[method];
}
} }
function Duplex(options) { function Duplex(options) {
@ -61,6 +64,16 @@ function Duplex(options) {
this.once('end', onend); this.once('end', onend);
} }
Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState.highWaterMark;
}
});
// the no-half-open enforcer // the no-half-open enforcer
function onend() { function onend() {
// if we allow half-open state, or if the writable side ended, // if we allow half-open state, or if the writable side ended,

View File

@ -916,6 +916,15 @@ Readable.prototype.wrap = function(stream) {
return self; return self;
}; };
Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState.highWaterMark;
}
});
// exposed for testing purposes only. // exposed for testing purposes only.
Readable._fromList = fromList; Readable._fromList = fromList;

View File

@ -334,6 +334,16 @@ function decodeChunk(state, chunk, encoding) {
return chunk; return chunk;
} }
Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState.highWaterMark;
}
});
// if we're already writing something, then just put this // if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write // in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag. // If we return false, then we need a drain event, so set that flag.

View File

@ -2072,7 +2072,7 @@ ReadStream.prototype._read = function(n) {
if (!pool || pool.length - pool.used < kMinPoolSpace) { if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. // discard the old pool.
allocNewPool(this._readableState.highWaterMark); allocNewPool(this.readableHighWaterMark);
} }
// Grab another reference to the pool in the case that while we're // Grab another reference to the pool in the case that while we're

View File

@ -30,7 +30,7 @@ const server = http.createServer(function(req, res) {
res.end(chunk); res.end(chunk);
} }
size += res.outputSize; size += res.outputSize;
if (size <= req.socket._writableState.highWaterMark) { if (size <= req.socket.writableHighWaterMark) {
more(); more();
return; return;
} }

View File

@ -44,7 +44,7 @@ s1.pipe(s3);
s2.pipe(s3, { end: false }); s2.pipe(s3, { end: false });
// We must write a buffer larger than highWaterMark // We must write a buffer larger than highWaterMark
const big = Buffer.alloc(s1._writableState.highWaterMark + 1, 'x'); const big = Buffer.alloc(s1.writableHighWaterMark + 1, 'x');
// Since big is larger than highWaterMark, it will be buffered internally. // Since big is larger than highWaterMark, it will be buffered internally.
assert(!s1.write(big)); assert(!s1.write(big));

View File

@ -68,7 +68,7 @@ flow(stream, 5000, function() {
process.on('exit', function(code) { process.on('exit', function(code) {
assert.strictEqual(reads, 2); assert.strictEqual(reads, 2);
// we pushed up the high water mark // we pushed up the high water mark
assert.strictEqual(stream._readableState.highWaterMark, 8192); assert.strictEqual(stream.readableHighWaterMark, 8192);
// length is 0 right now, because we pulled it all out. // length is 0 right now, because we pulled it all out.
assert.strictEqual(stream._readableState.length, 0); assert.strictEqual(stream._readableState.length, 0);
assert(!code); assert(!code);

View File

@ -29,8 +29,12 @@ const parser = new Transform({ readableObjectMode: true });
assert(parser._readableState.objectMode); assert(parser._readableState.objectMode);
assert(!parser._writableState.objectMode); assert(!parser._writableState.objectMode);
assert.strictEqual(parser._readableState.highWaterMark, 16); assert.strictEqual(parser.readableHighWaterMark, 16);
assert.strictEqual(parser._writableState.highWaterMark, 16 * 1024); assert.strictEqual(parser.writableHighWaterMark, 16 * 1024);
assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark);
assert.strictEqual(parser.writableHighWaterMark,
parser._writableState.highWaterMark);
parser._transform = function(chunk, enc, callback) { parser._transform = function(chunk, enc, callback) {
callback(null, { val: chunk[0] }); callback(null, { val: chunk[0] });
@ -53,8 +57,12 @@ const serializer = new Transform({ writableObjectMode: true });
assert(!serializer._readableState.objectMode); assert(!serializer._readableState.objectMode);
assert(serializer._writableState.objectMode); assert(serializer._writableState.objectMode);
assert.strictEqual(serializer._readableState.highWaterMark, 16 * 1024); assert.strictEqual(serializer.readableHighWaterMark, 16 * 1024);
assert.strictEqual(serializer._writableState.highWaterMark, 16); assert.strictEqual(serializer.writableHighWaterMark, 16);
assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark);
assert.strictEqual(parser.writableHighWaterMark,
parser._writableState.highWaterMark);
serializer._transform = function(obj, _, callback) { serializer._transform = function(obj, _, callback) {
callback(null, Buffer.from([obj.val])); callback(null, Buffer.from([obj.val]));

View File

@ -68,6 +68,6 @@ console.error(src._readableState);
process.on('exit', function() { process.on('exit', function() {
src._readableState.buffer.length = 0; src._readableState.buffer.length = 0;
console.error(src._readableState); console.error(src._readableState);
assert(src._readableState.length >= src._readableState.highWaterMark); assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok'); console.log('ok');
}); });