diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 24bafe25d70..ebb771dd559 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -92,8 +92,6 @@ method. (See below.) * `options` {Object} * `bufferSize` {Number} The size of the chunks to consume from the underlying resource. Default=16kb - * `lowWaterMark` {Number} The minimum number of bytes to store in - the internal buffer before emitting `readable`. Default=0 * `highWaterMark` {Number} The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default=16kb @@ -193,9 +191,7 @@ myReader.on('readable', function() { ### Event: 'readable' -When there is data ready to be consumed, this event will fire. The -number of bytes that are required to be considered "readable" depends -on the `lowWaterMark` option set in the constructor. +When there is data ready to be consumed, this event will fire. When this event emits, call the `read()` method to consume the data. @@ -322,8 +318,6 @@ method. (See below.) * `options` {Object} * `highWaterMark` {Number} Buffer level when `write()` starts returning false. Default=16kb - * `lowWaterMark` {Number} The buffer level when `'drain'` is - emitted. Default=0 * `decodeStrings` {Boolean} Whether or not to decode strings into Buffers before passing them to `_write()`. Default=true @@ -371,10 +365,8 @@ flushed to the underlying resource. Returns `false` to indicate that the buffer is full, and the data will be sent out in the future. The `'drain'` event will indicate when the buffer is empty again. -The specifics of when `write()` will return false, and when a -subsequent `'drain'` event will be emitted, are determined by the -`highWaterMark` and `lowWaterMark` options provided to the -constructor. +The specifics of when `write()` will return false, is determined by +the `highWaterMark` option provided to the constructor. ### writable.end([chunk], [encoding], [callback]) diff --git a/doc/blog/feature/streams2.md b/doc/blog/feature/streams2.md index 38479e81de7..9adef39563e 100644 --- a/doc/blog/feature/streams2.md +++ b/doc/blog/feature/streams2.md @@ -111,7 +111,7 @@ feedback. A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are -instances of EventEmitter. +instances of [EventEmitter][] You can load the Stream base classes by doing `require('stream')`. There are base classes provided for Readable streams, Writable @@ -198,13 +198,14 @@ method. (See below.) * `options` {Object} * `bufferSize` {Number} The size of the chunks to consume from the underlying resource. Default=16kb - * `lowWaterMark` {Number} The minimum number of bytes to store in - the internal buffer before emitting `readable`. Default=0 * `highWaterMark` {Number} The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default=16kb * `encoding` {String} If specified, then buffers will be decoded to strings using the specified encoding. Default=null + * `objectMode` {Boolean} Whether this stream should behave + as a stream of objects. Meaning that stream.read(n) returns + a single value instead of a Buffer of size n In classes that extend the Readable class, make sure to call the constructor so that the buffering settings can be properly @@ -218,7 +219,7 @@ initialized. All Readable stream implementations must provide a `_read` method to fetch data from the underlying resource. -**This function MUST NOT be called directly.** It should be +Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Readable class methods only. @@ -231,6 +232,46 @@ the class that defines it, and should not be called directly by user programs. However, you **are** expected to override this method in your own extension classes. +### readable.push(chunk) + +* `chunk` {Buffer | null | String} Chunk of data to push into the read queue +* return {Boolean} Whether or not more pushes should be performed + +The `Readable` class works by putting data into a read queue to be +pulled out later by calling the `read()` method when the `'readable'` +event fires. + +The `push()` method will explicitly insert some data into the read +queue. If it is called with `null` then it will signal the end of the +data. + +In some cases, you may be wrapping a lower-level source which has some +sort of pause/resume mechanism, and a data callback. In those cases, +you could wrap the low-level source object by doing something like +this: + +```javascript +// source is an object with readStop() and readStart() methods, +// and an `ondata` member that gets called when it has data, and +// an `onend` member that gets called when the data is over. + +var stream = new Readable(); + +source.ondata = function(chunk) { + // if push() returns false, then we need to stop reading from source + if (!stream.push(chunk)) + source.readStop(); +}; + +source.onend = function() { + stream.push(null); +}; + +// _read will be called when the stream wants to pull more data in +stream._read = function(size, cb) { + source.readStart(); +}; +``` ### readable.wrap(stream) @@ -256,9 +297,7 @@ myReader.on('readable', function() { ### Event: 'readable' -When there is data ready to be consumed, this event will fire. The -number of bytes that are required to be considered "readable" depends -on the `lowWaterMark` option set in the constructor. +When there is data ready to be consumed, this event will fire. When this event emits, call the `read()` method to consume the data. @@ -385,8 +424,6 @@ method. (See below.) * `options` {Object} * `highWaterMark` {Number} Buffer level when `write()` starts returning false. Default=16kb - * `lowWaterMark` {Number} The buffer level when `'drain'` is - emitted. Default=0 * `decodeStrings` {Boolean} Whether or not to decode strings into Buffers before passing them to `_write()`. Default=true @@ -402,7 +439,7 @@ initialized. All Writable stream implementations must provide a `_write` method to send data to the underlying resource. -**This function MUST NOT be called directly.** It should be +Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Writable class methods only. @@ -434,16 +471,16 @@ flushed to the underlying resource. Returns `false` to indicate that the buffer is full, and the data will be sent out in the future. The `'drain'` event will indicate when the buffer is empty again. -The specifics of when `write()` will return false, and when a -subsequent `'drain'` event will be emitted, are determined by the -`highWaterMark` and `lowWaterMark` options provided to the -constructor. +The specifics of when `write()` will return false, is determined by +the `highWaterMark` option provided to the constructor. -### writable.end([chunk], [encoding]) +### writable.end([chunk], [encoding], [callback]) * `chunk` {Buffer | String} Optional final data to be written * `encoding` {String} Optional. If `chunk` is a string, then encoding defaults to `'utf8'` +* `callback` {Function} Optional. Called when the final chunk is + successfully written. Call this method to signal the end of the data being written to the stream. @@ -459,6 +496,11 @@ without buffering again. Listen for it when `stream.write()` returns Emitted when the underlying resource (for example, the backing file descriptor) has been closed. Not all streams will emit this. +### Event: 'finish' + +When `end()` is called and there are no more chunks to write, this +event is emitted. + ### Event: 'pipe' * `source` {Readable Stream} @@ -538,7 +580,7 @@ initialized. All Transform stream implementations must provide a `_transform` method to accept input and produce output. -**This function MUST NOT be called directly.** It should be +Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Transform class methods only. @@ -564,7 +606,7 @@ your own extension classes. * `callback` {Function} Call this function (optionally with an error argument) when you are done flushing any remaining data. -**This function MUST NOT be called directly.** It MAY be implemented +Note: **This function MUST NOT be called directly.** It MAY be implemented by child classes, and if so, will be called by the internal Transform class methods only. @@ -592,3 +634,6 @@ This is a trivial implementation of a `Transform` stream that simply passes the input bytes across to the output. Its purpose is mainly for examples and testing, but there are occasionally use cases where it can come in handy. + + +[EventEmitter]: http://nodejs.org/api/events.html#events_class_events_eventemitter diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 333a3a015d8..947a0b9c9a3 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -39,18 +39,10 @@ function ReadableState(options, stream) { var hwm = options.highWaterMark; this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024; - // the minimum number of bytes to buffer before emitting 'readable' - // default to pushing everything out as fast as possible. - this.lowWaterMark = options.lowWaterMark || 0; - // cast to ints. this.bufferSize = ~~this.bufferSize; - this.lowWaterMark = ~~this.lowWaterMark; this.highWaterMark = ~~this.highWaterMark; - if (this.lowWaterMark > this.highWaterMark) - throw new Error('lowWaterMark cannot be higher than highWaterMark'); - this.buffer = []; this.length = 0; this.pipes = null; @@ -111,15 +103,15 @@ Readable.prototype.push = function(chunk) { rs.onread(null, chunk); // if it's past the high water mark, we can push in some more. - // Also, if it's still within the lowWaterMark, we can stand some - // more bytes. This is to work around cases where hwm=0 and - // lwm=0, such as the repl. Also, if the push() triggered a + // Also, if we have no data yet, we can stand some + // more bytes. This is to work around cases where hwm=0, + // such as the repl. Also, if the push() triggered a // readable event, and the user called read(largeNumber) such that // needReadable was set, then we ought to push more, so that another // 'readable' event will be triggered. return rs.needReadable || rs.length < rs.highWaterMark || - rs.length <= rs.lowWaterMark; + rs.length === 0; }; // backwards compatibility. @@ -324,12 +316,12 @@ function onread(stream, er, chunk) { state.length += state.objectMode ? 1 : chunk.length; state.buffer.push(chunk); - // if we haven't gotten enough to pass the lowWaterMark, + // if we haven't gotten any data, // and we haven't ended, then don't bother telling the user // that it's time to read more data. Otherwise, emitting 'readable' // probably will trigger another stream.read(), which can trigger // another _read(n,cb) before this one returns! - if (state.length <= state.lowWaterMark) { + if (state.length === 0) { state.reading = true; stream._read(state.bufferSize, state.onread); return; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 8b75a52b62a..b83fedb6260 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -60,9 +60,7 @@ // // However, even in such a pathological case, only a single written chunk // would be consumed, and then the rest would wait (un-transformed) until -// the results of the previous transformed chunk were consumed. Because -// the transform happens on-demand, it will only transform as much as is -// necessary to fill the readable buffer to the specified lowWaterMark. +// the results of the previous transformed chunk were consumed. module.exports = Transform; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index d6c84d9494d..9fe4c85bd37 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -41,21 +41,13 @@ function WritableState(options, stream) { var hwm = options.highWaterMark; this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024; - // the point that it has to get to before we call _write(chunk,cb) - // default to pushing everything out as fast as possible. - this.lowWaterMark = options.lowWaterMark || 0; - // object stream flag to indicate whether or not this stream // contains buffers or objects. this.objectMode = !!options.objectMode; // cast to ints. - this.lowWaterMark = ~~this.lowWaterMark; this.highWaterMark = ~~this.highWaterMark; - if (this.lowWaterMark > this.highWaterMark) - throw new Error('lowWaterMark cannot be higher than highWaterMark'); - this.needDrain = false; // at the start of calling end() this.ending = false; @@ -225,7 +217,7 @@ function onwrite(stream, er) { return; } - if (state.length <= state.lowWaterMark && state.needDrain) { + if (state.length === 0 && state.needDrain) { // Must force callback to be called on nextTick, so that we don't // emit 'drain' before the write() consumer gets the 'false' return // value, and has a chance to attach a 'drain' listener. diff --git a/lib/fs.js b/lib/fs.js index 7209d2a3048..78940a8304a 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1390,7 +1390,6 @@ function ReadStream(path, options) { // a little bit bigger buffer and water marks by default options = util._extend({ bufferSize: 64 * 1024, - lowWaterMark: 16 * 1024, highWaterMark: 64 * 1024 }, options || {}); diff --git a/lib/tty.js b/lib/tty.js index 4ba276818d8..ccc1ebee28b 100644 --- a/lib/tty.js +++ b/lib/tty.js @@ -46,7 +46,6 @@ function ReadStream(fd, options) { options = util._extend({ highWaterMark: 0, - lowWaterMark: 0, readable: true, writable: false, handle: new TTY(fd, true) diff --git a/lib/zlib.js b/lib/zlib.js index e81559dc9f4..409286bc147 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -309,24 +309,7 @@ Zlib.prototype.reset = function reset() { }; Zlib.prototype._flush = function(output, callback) { - var rs = this._readableState; - var self = this; - this._transform(null, output, function(er) { - if (er) - return callback(er); - - // now a weird thing happens... it could be that you called flush - // but everything had already actually been consumed, but it wasn't - // enough to get over the Readable class's lowWaterMark. - // In that case, we emit 'readable' now to make sure it's consumed. - if (rs.length && - rs.length < rs.lowWaterMark && - !rs.ended && - rs.needReadable) - self.emit('readable'); - - callback(); - }); + this._transform(null, output, callback); }; Zlib.prototype.flush = function(callback) { diff --git a/src/node.js b/src/node.js index 7a0aecbc021..5ef6e3c4781 100644 --- a/src/node.js +++ b/src/node.js @@ -625,7 +625,6 @@ var tty = NativeModule.require('tty'); stdin = new tty.ReadStream(fd, { highWaterMark: 0, - lowWaterMark: 0, readable: true, writable: false }); diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js index afedc5b5217..d4d146a84a8 100644 --- a/test/simple/test-file-write-stream.js +++ b/test/simple/test-file-write-stream.js @@ -26,7 +26,6 @@ var path = require('path'); var fs = require('fs'); var fn = path.join(common.tmpDir, 'write.txt'); var file = fs.createWriteStream(fn, { - lowWaterMark: 0, highWaterMark: 10 }); diff --git a/test/simple/test-file-write-stream2.js b/test/simple/test-file-write-stream2.js index 4f2e73ce824..68361bb52b6 100644 --- a/test/simple/test-file-write-stream2.js +++ b/test/simple/test-file-write-stream2.js @@ -63,7 +63,6 @@ removeTestFile(); // drain at 0, return false at 10. file = fs.createWriteStream(filepath, { - lowWaterMark: 0, highWaterMark: 11 }); diff --git a/test/simple/test-fs-read-stream-err.js b/test/simple/test-fs-read-stream-err.js index 77960f06e00..54aa21799ea 100644 --- a/test/simple/test-fs-read-stream-err.js +++ b/test/simple/test-fs-read-stream-err.js @@ -24,8 +24,7 @@ var assert = require('assert'); var fs = require('fs'); var stream = fs.createReadStream(__filename, { - bufferSize: 64, - lowWaterMark: 0 + bufferSize: 64 }); var err = new Error('BAM'); diff --git a/test/simple/test-fs-write-stream-err.js b/test/simple/test-fs-write-stream-err.js index a4d20200e5e..53ef4e34e8b 100644 --- a/test/simple/test-fs-write-stream-err.js +++ b/test/simple/test-fs-write-stream-err.js @@ -24,7 +24,6 @@ var assert = require('assert'); var fs = require('fs'); var stream = fs.createWriteStream(common.tmpDir + '/out', { - lowWaterMark: 0, highWaterMark: 10 }); var err = new Error('BAM'); diff --git a/test/simple/test-net-binary.js b/test/simple/test-net-binary.js index 6b41d72d7b0..31004658464 100644 --- a/test/simple/test-net-binary.js +++ b/test/simple/test-net-binary.js @@ -41,7 +41,6 @@ for (var i = 255; i >= 0; i--) { // safe constructor var echoServer = net.Server(function(connection) { - // connection._readableState.lowWaterMark = 0; console.error('SERVER got connection'); connection.setEncoding('binary'); connection.on('data', function(chunk) { @@ -64,8 +63,6 @@ echoServer.on('listening', function() { port: common.PORT }); - // c._readableState.lowWaterMark = 0; - c.setEncoding('binary'); c.on('data', function(chunk) { console.error('CLIENT data %j', chunk); diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index ab4b066b346..bc4ca536e28 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -450,18 +450,3 @@ test('sync _read ending', function (t) { t.end(); }) }); - -assert.throws(function() { - var bad = new R({ - highWaterMark: 10, - lowWaterMark: 1000 - }); -}); - -assert.throws(function() { - var W = require('stream').Writable; - var bad = new W({ - highWaterMark: 10, - lowWaterMark: 1000 - }); -}); diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index 5579a004532..6c1e7395ce3 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -215,35 +215,8 @@ test('falsey values', function(t) { })); }); -test('low watermark _read', function(t) { - var r = new Readable({ - lowWaterMark: 2, - highWaterMark: 6, - objectMode: true - }); - - var calls = 0; - - r._read = function(n, cb) { - calls++; - cb(null, 'foo'); - }; - - // touch to cause it - r.read(0); - - r.push(null); - - r.pipe(toArray(function(list) { - assert.deepEqual(list, ['foo', 'foo', 'foo']); - - t.end(); - })); -}); - test('high watermark _read', function(t) { var r = new Readable({ - lowWaterMark: 0, highWaterMark: 6, objectMode: true }); @@ -285,61 +258,6 @@ test('high watermark push', function(t) { t.end(); }); -test('low watermark push', function(t) { - var r = new Readable({ - lowWaterMark: 2, - highWaterMark: 4, - objectMode: true - }); - var l = console.log; - - var called = 0; - var reading = false; - - r._read = function() { - called++; - - if (reading) { - assert.equal(r.push(42), false); - } - } - - assert.equal(called, 0); - assert.equal(r.push(0), true); - assert.equal(called, 1); - assert.equal(r.push(1), true); - assert.equal(called, 2); - assert.equal(r.push(2), true); - assert.equal(called, 2); - assert.equal(r.push(3), false); - assert.equal(called, 2); - assert.equal(r.push(4), false); - assert.equal(called, 2); - assert.equal(r.push(5), false); - assert.equal(called, 2); - assert.deepEqual(r._readableState.buffer, [0, 1, 2, 3, 4, 5]); - - reading = true; - - assert.equal(r.read(), 0); - assert.equal(called, 2); - assert.equal(r.read(), 1); - assert.equal(called, 3); - assert.equal(r.read(), 2); - assert.equal(called, 4); - assert.equal(r.read(), 3); - assert.equal(called, 5); - assert.equal(r.read(), 4); - assert.equal(called, 6); - r.push(null); - - r.pipe(toArray(function(array) { - assert.deepEqual(array, [5, 42, 42, 42, 42]); - - t.end(); - })); -}); - test('stream of buffers converted to object halfway through', function(t) { var r = new Readable(); r._read = noop; diff --git a/test/simple/test-stream2-push.js b/test/simple/test-stream2-push.js index a4881c453be..29b438d32eb 100644 --- a/test/simple/test-stream2-push.js +++ b/test/simple/test-stream2-push.js @@ -32,7 +32,6 @@ var EE = require('events').EventEmitter; // a mock thing a bit like the net.Socket/tcp_wrap.handle interaction var stream = new Readable({ - lowWaterMark: 0, highWaterMark: 16, encoding: 'utf8' }); diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index 92f5784bd43..998e1f8e517 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -212,8 +212,6 @@ test('assymetric transform (compress)', function(t) { }.bind(this), 10); }; - pt._writableState.lowWaterMark = 3; - pt.write(new Buffer('aaaa')); pt.write(new Buffer('bbbb')); pt.write(new Buffer('cccc')); @@ -241,9 +239,7 @@ test('assymetric transform (compress)', function(t) { test('passthrough event emission', function(t) { - var pt = new PassThrough({ - lowWaterMark: 0 - }); + var pt = new PassThrough(); var emits = 0; pt.on('readable', function() { var state = pt._readableState; diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index 08be8fac3de..4cd37fd822c 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -82,7 +82,6 @@ process.nextTick(run); test('write fast', function(t) { var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 100 }); @@ -100,7 +99,6 @@ test('write fast', function(t) { test('write slow', function(t) { var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 100 }); @@ -121,7 +119,6 @@ test('write slow', function(t) { test('write backpressure', function(t) { var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 50 }); @@ -154,7 +151,6 @@ test('write backpressure', function(t) { test('write bufferize', function(t) { var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 100 }); @@ -185,7 +181,6 @@ test('write bufferize', function(t) { test('write no bufferize', function(t) { var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 100, decodeStrings: false }); @@ -234,7 +229,6 @@ test('write callbacks', function (t) { callbacks._called = []; var tw = new TestWriter({ - lowWaterMark: 5, highWaterMark: 100 });