stream: add setter & getter for default highWaterMark (#46929)
Adds stream.(get|set)DefaultHighWaterMark to read or update the default hwm. PR-URL: https://github.com/nodejs/node/pull/46929 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Moshe Atlow <moshe@atlow.co.il> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Michael Dawson <midawson@redhat.com> Reviewed-By: Erick Wendel <erick.workspace@gmail.com>
This commit is contained in:
parent
9b104be502
commit
e7b5c0ed47
@ -3348,6 +3348,29 @@ reader.read().then(({ value, done }) => {
|
|||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### `stream.getDefaultHighWaterMark(objectMode)`
|
||||||
|
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* {boolean} objectMode
|
||||||
|
* Returns: {integer}
|
||||||
|
|
||||||
|
Returns the default highWaterMark used by streams.
|
||||||
|
Defaults to `16384` (16 KiB), or `16` for `objectMode`.
|
||||||
|
|
||||||
|
### `stream.setDefaultHighWaterMark(objectMode, value)`
|
||||||
|
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* {boolean} objectMode
|
||||||
|
* {integer} highWaterMark value
|
||||||
|
|
||||||
|
Sets the default highWaterMark used by streams.
|
||||||
|
|
||||||
## API for stream implementers
|
## API for stream implementers
|
||||||
|
|
||||||
<!--type=misc-->
|
<!--type=misc-->
|
||||||
|
@ -80,12 +80,11 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
|
|||||||
debug = fn;
|
debug = fn;
|
||||||
});
|
});
|
||||||
|
|
||||||
const HIGH_WATER_MARK = getDefaultHighWaterMark();
|
|
||||||
|
|
||||||
const kCorked = Symbol('corked');
|
const kCorked = Symbol('corked');
|
||||||
const kUniqueHeaders = Symbol('kUniqueHeaders');
|
const kUniqueHeaders = Symbol('kUniqueHeaders');
|
||||||
const kBytesWritten = Symbol('kBytesWritten');
|
const kBytesWritten = Symbol('kBytesWritten');
|
||||||
const kErrored = Symbol('errored');
|
const kErrored = Symbol('errored');
|
||||||
|
const kHighWaterMark = Symbol('kHighWaterMark');
|
||||||
|
|
||||||
const nop = () => {};
|
const nop = () => {};
|
||||||
|
|
||||||
@ -150,6 +149,7 @@ function OutgoingMessage() {
|
|||||||
this._onPendingData = nop;
|
this._onPendingData = nop;
|
||||||
|
|
||||||
this[kErrored] = null;
|
this[kErrored] = null;
|
||||||
|
this[kHighWaterMark] = getDefaultHighWaterMark();
|
||||||
}
|
}
|
||||||
ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
|
ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
|
||||||
ObjectSetPrototypeOf(OutgoingMessage, Stream);
|
ObjectSetPrototypeOf(OutgoingMessage, Stream);
|
||||||
@ -196,7 +196,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
|
|||||||
ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
|
ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
get() {
|
get() {
|
||||||
return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK;
|
return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark];
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -403,7 +403,7 @@ function _writeRaw(data, encoding, callback, size) {
|
|||||||
this.outputData.push({ data, encoding, callback });
|
this.outputData.push({ data, encoding, callback });
|
||||||
this.outputSize += data.length;
|
this.outputSize += data.length;
|
||||||
this._onPendingData(data.length);
|
this._onPendingData(data.length);
|
||||||
return this.outputSize < HIGH_WATER_MARK;
|
return this.outputSize < this[kHighWaterMark];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,16 +4,29 @@ const {
|
|||||||
MathFloor,
|
MathFloor,
|
||||||
NumberIsInteger,
|
NumberIsInteger,
|
||||||
} = primordials;
|
} = primordials;
|
||||||
|
const { validateInteger } = require('internal/validators');
|
||||||
|
|
||||||
const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes;
|
const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes;
|
||||||
|
|
||||||
|
let defaultHighWaterMarkBytes = 16 * 1024;
|
||||||
|
let defaultHighWaterMarkObjectMode = 16;
|
||||||
|
|
||||||
function highWaterMarkFrom(options, isDuplex, duplexKey) {
|
function highWaterMarkFrom(options, isDuplex, duplexKey) {
|
||||||
return options.highWaterMark != null ? options.highWaterMark :
|
return options.highWaterMark != null ? options.highWaterMark :
|
||||||
isDuplex ? options[duplexKey] : null;
|
isDuplex ? options[duplexKey] : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
function getDefaultHighWaterMark(objectMode) {
|
function getDefaultHighWaterMark(objectMode) {
|
||||||
return objectMode ? 16 : 16 * 1024;
|
return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
function setDefaultHighWaterMark(objectMode, value) {
|
||||||
|
validateInteger(value, 'value', 0);
|
||||||
|
if (objectMode) {
|
||||||
|
defaultHighWaterMarkObjectMode = value;
|
||||||
|
} else {
|
||||||
|
defaultHighWaterMarkBytes = value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function getHighWaterMark(state, options, duplexKey, isDuplex) {
|
function getHighWaterMark(state, options, duplexKey, isDuplex) {
|
||||||
@ -33,4 +46,5 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) {
|
|||||||
module.exports = {
|
module.exports = {
|
||||||
getHighWaterMark,
|
getHighWaterMark,
|
||||||
getDefaultHighWaterMark,
|
getDefaultHighWaterMark,
|
||||||
|
setDefaultHighWaterMark,
|
||||||
};
|
};
|
||||||
|
@ -42,6 +42,7 @@ const {
|
|||||||
},
|
},
|
||||||
} = require('internal/errors');
|
} = require('internal/errors');
|
||||||
const compose = require('internal/streams/compose');
|
const compose = require('internal/streams/compose');
|
||||||
|
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state');
|
||||||
const { pipeline } = require('internal/streams/pipeline');
|
const { pipeline } = require('internal/streams/pipeline');
|
||||||
const { destroyer } = require('internal/streams/destroy');
|
const { destroyer } = require('internal/streams/destroy');
|
||||||
const eos = require('internal/streams/end-of-stream');
|
const eos = require('internal/streams/end-of-stream');
|
||||||
@ -105,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal;
|
|||||||
Stream.finished = eos;
|
Stream.finished = eos;
|
||||||
Stream.destroy = destroyer;
|
Stream.destroy = destroyer;
|
||||||
Stream.compose = compose;
|
Stream.compose = compose;
|
||||||
|
Stream.setDefaultHighWaterMark = setDefaultHighWaterMark;
|
||||||
|
Stream.getDefaultHighWaterMark = getDefaultHighWaterMark;
|
||||||
|
|
||||||
ObjectDefineProperty(Stream, 'promises', {
|
ObjectDefineProperty(Stream, 'promises', {
|
||||||
__proto__: null,
|
__proto__: null,
|
||||||
|
36
test/parallel/test-stream-set-default-hwm.js
Normal file
36
test/parallel/test-stream-set-default-hwm.js
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
require('../common');
|
||||||
|
|
||||||
|
const assert = require('node:assert');
|
||||||
|
const {
|
||||||
|
setDefaultHighWaterMark,
|
||||||
|
getDefaultHighWaterMark,
|
||||||
|
Writable,
|
||||||
|
Readable,
|
||||||
|
Transform
|
||||||
|
} = require('stream');
|
||||||
|
|
||||||
|
assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000);
|
||||||
|
setDefaultHighWaterMark(false, 32 * 1000);
|
||||||
|
assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000);
|
||||||
|
|
||||||
|
assert.notStrictEqual(getDefaultHighWaterMark(true), 32);
|
||||||
|
setDefaultHighWaterMark(true, 32);
|
||||||
|
assert.strictEqual(getDefaultHighWaterMark(true), 32);
|
||||||
|
|
||||||
|
const w = new Writable({
|
||||||
|
write() {}
|
||||||
|
});
|
||||||
|
assert.strictEqual(w.writableHighWaterMark, 32 * 1000);
|
||||||
|
|
||||||
|
const r = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
assert.strictEqual(r.readableHighWaterMark, 32 * 1000);
|
||||||
|
|
||||||
|
const t = new Transform({
|
||||||
|
transform() {}
|
||||||
|
});
|
||||||
|
assert.strictEqual(t.writableHighWaterMark, 32 * 1000);
|
||||||
|
assert.strictEqual(t.readableHighWaterMark, 32 * 1000);
|
Loading…
x
Reference in New Issue
Block a user