diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9f384a409e9..b9f8291874f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -31,7 +31,7 @@ util.inherits(Readable, Stream); function ReadableState(options, stream) { options = options || {}; - // the argument passed to this._read(n,cb) + // the argument passed to this._read(n) this.bufferSize = options.bufferSize || 16 * 1024; // the point at which it stops calling _read() to fill the buffer @@ -58,10 +58,6 @@ function ReadableState(options, stream) { // not happen before the first write call. this.sync = true; - this.onread = function(er, data) { - onread(stream, er, data); - }; - // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. this.needReadable = false; @@ -121,7 +117,7 @@ function readableAddChunk(stream, state, chunk, addToFront) { if (er) { stream.emit('error', er); } else if (chunk === null || chunk === undefined) { - onreadEof(stream, state); + onEofChunk(stream, state); } else if (state.objectMode || chunk && chunk.length > 0) { if (state.decoder) chunk = state.decoder.write(chunk); @@ -196,7 +192,7 @@ function howMuchToRead(n, state) { return n; } -// you can override either this method, or _read(n, cb) below. +// you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { var state = this._readableState; var nOrig = n; @@ -264,7 +260,7 @@ Readable.prototype.read = function(n) { if (state.length === 0) state.needReadable = true; // call internal read method - this._read(state.bufferSize, state.onread); + this._read(state.bufferSize); state.sync = false; } @@ -301,18 +297,6 @@ Readable.prototype.read = function(n) { return ret; }; -// This is the function passed to _read(n,cb) as the callback. -// It should be called exactly once for every _read() call. -function onread(stream, er, chunk) { - var state = stream._readableState; - var sync = state.sync; - - if (er) - stream.emit('error', er); - else - stream.push(chunk); -} - function chunkInvalid(state, chunk) { var er = null; if (!Buffer.isBuffer(chunk) && @@ -327,7 +311,7 @@ function chunkInvalid(state, chunk) { } -function onreadEof(stream, state) { +function onEofChunk(stream, state) { state.ended = true; if (state.decoder) { var chunk = state.decoder.end(); @@ -371,7 +355,7 @@ function emitReadable_(stream) { // at this point, the user has presumably seen the 'readable' event, // and called read() to consume some data. that may have triggered -// in turn another _read(n,cb) call, in which case reading = true if +// in turn another _read(n) call, in which case reading = true if // it's in progress. // However, if we're not ended, or reading, and the length < hwm, // then go ahead and try to read some more right now preemptively. @@ -395,10 +379,8 @@ function maybeReadMore_(stream, state) { // call cb(er, data) where data is <= n in length. // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. -Readable.prototype._read = function(n, cb) { - process.nextTick(function() { - cb(new Error('not implemented')); - }); +Readable.prototype._read = function(n) { + this.emit('error', new Error('not implemented')); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -758,7 +740,7 @@ Readable.prototype.wrap = function(stream) { // when we try to consume some more bytes, simply unpause the // underlying stream. - self._read = function(n, cb) { + self._read = function(n) { if (paused) { stream.resume(); paused = false; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 0ee5a5030ee..d8f6e605328 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -36,11 +36,11 @@ // The Transform stream has all the aspects of the readable and writable // stream classes. When you write(chunk), that calls _write(chunk,cb) // internally, and returns false if there's a lot of pending writes -// buffered up. When you call read(), that calls _read(n,cb) until +// buffered up. When you call read(), that calls _read(n) until // there's enough pending readable data buffered up. // // In a transform stream, the written data is placed in a buffer. When -// _read(n,cb) is called, it transforms the queued up data, calling the +// _read(n) is called, it transforms the queued up data, calling the // buffered _write cb's as it consumes chunks. If consuming a single // written chunk would result in multiple output chunks, then the first // outputted bit calls the readcb, and subsequent chunks just go into @@ -106,7 +106,7 @@ function afterTransform(stream, er, data) { var rs = stream._readableState; if (rs.needReadable || rs.length < rs.highWaterMark) { - stream._read(); + stream._read(rs.bufferSize); } } @@ -162,13 +162,13 @@ Transform.prototype._write = function(chunk, cb) { return; var rs = this._readableState; if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) - this._read(); + this._read(rs.bufferSize); }; // Doesn't matter what the args are here. // the output and callback functions passed to _transform do all the work. // That we got here means that the readable side wants more data. -Transform.prototype._read = function(n, cb) { +Transform.prototype._read = function(n) { var ts = this._transformState; if (ts.writechunk && ts.writecb && !ts.transforming) { diff --git a/lib/fs.js b/lib/fs.js index 8188f717000..a1bc487b3d3 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1453,10 +1453,10 @@ ReadStream.prototype.open = function() { }); }; -ReadStream.prototype._read = function(n, cb) { +ReadStream.prototype._read = function(n) { if (typeof this.fd !== 'number') return this.once('open', function() { - this._read(n, cb); + this._read(n); }); if (this.destroyed) @@ -1482,7 +1482,7 @@ ReadStream.prototype._read = function(n, cb) { // already read everything we were supposed to read! // treat as EOF. if (toRead <= 0) - return cb(); + return this.push(null); // the actual read. var self = this; @@ -1498,14 +1498,14 @@ ReadStream.prototype._read = function(n, cb) { if (self.autoClose) { self.destroy(); } - return cb(er); + self.emit('error', er); + } else { + var b = null; + if (bytesRead > 0) + b = thisPool.slice(start, start + bytesRead); + + self.push(b); } - - var b = null; - if (bytesRead > 0) - b = thisPool.slice(start, start + bytesRead); - - cb(null, b); } }; diff --git a/lib/http.js b/lib/http.js index e5c709da0f9..56d9aae4236 100644 --- a/lib/http.js +++ b/lib/http.js @@ -331,12 +331,12 @@ IncomingMessage.prototype.read = function(n) { }; -IncomingMessage.prototype._read = function(n, callback) { +IncomingMessage.prototype._read = function(n) { // We actually do almost nothing here, because the parserOnBody // function fills up our internal buffer directly. However, we // do need to unpause the underlying socket so that it flows. if (!this.socket.readable) - return callback(null, null); + this.push(null); else readStart(this.socket); }; diff --git a/lib/net.js b/lib/net.js index 83ca47cccdc..934f9f2ea2a 100644 --- a/lib/net.js +++ b/lib/net.js @@ -228,7 +228,7 @@ function afterShutdown(status, handle, req) { function onSocketEnd() { // XXX Should not have to do as much crap in this function. // ended should already be true, since this is called *after* - // the EOF errno and onread has returned null to the _read cb. + // the EOF errno and onread has eof'ed debug('onSocketEnd', this._readableState); this._readableState.ended = true; if (this._readableState.endEmitted) { @@ -335,25 +335,19 @@ Object.defineProperty(Socket.prototype, 'bufferSize', { // Just call handle.readStart until we have enough in the buffer -Socket.prototype._read = function(n, callback) { +Socket.prototype._read = function(n) { debug('_read'); + if (this._connecting || !this._handle) { debug('_read wait for connection'); - this.once('connect', this._read.bind(this, n, callback)); - return; - } - - assert(callback === this._readableState.onread); - assert(this._readableState.reading = true); - - if (!this._handle.reading) { + this.once('connect', this._read.bind(this, n)); + } else if (!this._handle.reading) { + // not already reading, start the flow debug('Socket._read readStart'); this._handle.reading = true; var r = this._handle.readStart(); if (r) this._destroy(errnoException(process._errno, 'read')); - } else { - debug('readStart already has been called.'); } }; @@ -495,7 +489,7 @@ function onread(buffer, offset, length) { if (self.onend) self.once('end', self.onend); - // send a null to the _read cb to signal the end of data. + // push a null to signal the end of data. self.push(null); // internal end event so that we know that the actual socket diff --git a/lib/tls.js b/lib/tls.js index 2a780373adf..bb0a03af22e 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -381,12 +381,13 @@ CryptoStream.prototype._writePending = function writePending() { }; -CryptoStream.prototype._read = function read(size, cb) { +CryptoStream.prototype._read = function read(size) { // XXX: EOF?! - if (!this.pair.ssl) return cb(null, null); + if (!this.pair.ssl) return this.push(null); // Wait for session to be resumed - if (this._resumingSession || !this._reading) return cb(null, ''); + // Mark that we're done reading, but don't provide data or EOF + if (this._resumingSession || !this._reading) return this.push(''); var out; if (this === this.pair.cleartext) { @@ -441,11 +442,12 @@ CryptoStream.prototype._read = function read(size, cb) { if (this === this.pair.cleartext) this._opposite._done(); - return cb(null, null); + // EOF + return this.push(null); } // Bail out - return cb(null, ''); + return this.push(''); } // Give them requested data @@ -459,7 +461,7 @@ CryptoStream.prototype._read = function read(size, cb) { self.read(bytesRead); }); } - return cb(null, pool.slice(start, start + bytesRead)); + return this.push(pool.slice(start, start + bytesRead)); }; diff --git a/test/simple/test-stream-big-push.js b/test/simple/test-stream-big-push.js index f54bcc30d38..e3787e4412f 100644 --- a/test/simple/test-stream-big-push.js +++ b/test/simple/test-stream-big-push.js @@ -33,10 +33,10 @@ var reads = 0; var eofed = false; var ended = false; -r._read = function(n, cb) { +r._read = function(n) { if (reads === 0) { setTimeout(function() { - cb(null, str); + r.push(str); }); reads++; } else if (reads === 1) { @@ -46,7 +46,7 @@ r._read = function(n, cb) { } else { assert(!eofed); eofed = true; - cb(null, null); + r.push(null); } }; diff --git a/test/simple/test-stream-push-order.js b/test/simple/test-stream-push-order.js index 900a8c778df..f2e6ec29ce3 100644 --- a/test/simple/test-stream-push-order.js +++ b/test/simple/test-stream-push-order.js @@ -30,14 +30,15 @@ var s = new Readable({ var list = ['1', '2', '3', '4', '5', '6']; -s._read = function (n, cb) { +s._read = function (n) { var one = list.shift(); - if (!one) - return cb(null, null); - - var two = list.shift(); - s.push(one); - cb(null, two); + if (!one) { + s.push(null); + } else { + var two = list.shift(); + s.push(one); + s.push(two); + } }; var v = s.read(0); diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index c24ec243f6b..d3b53fd7cd3 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -406,7 +406,7 @@ test('read(0) for ended streams', function (t) { var r = new R(); var written = false; var ended = false; - r._read = function () {}; + r._read = function (n) {}; r.push(new Buffer("foo")); r.push(null); @@ -435,8 +435,8 @@ test('read(0) for ended streams', function (t) { test('sync _read ending', function (t) { var r = new R(); var called = false; - r._read = function (n, cb) { - cb(null, null); + r._read = function (n) { + r.push(null); }; r.once('end', function () { diff --git a/test/simple/test-stream2-compatibility.js b/test/simple/test-stream2-compatibility.js index cb07278d55e..2b98c1fa8f0 100644 --- a/test/simple/test-stream2-compatibility.js +++ b/test/simple/test-stream2-compatibility.js @@ -41,8 +41,8 @@ function TestReader() { util.inherits(TestReader, R); -TestReader.prototype._read = function(n, cb) { - cb(null, this._buffer); +TestReader.prototype._read = function(n) { + this.push(this._buffer); this._buffer = new Buffer(0); }; diff --git a/test/simple/test-stream2-finish-pipe.js b/test/simple/test-stream2-finish-pipe.js index 686af2449d8..bcb57a74a0a 100644 --- a/test/simple/test-stream2-finish-pipe.js +++ b/test/simple/test-stream2-finish-pipe.js @@ -23,19 +23,19 @@ var common = require('../common.js'); var stream = require('stream'); var Buffer = require('buffer').Buffer; -var R = new stream.Readable(); -R._read = function(size, cb) { - cb(null, new Buffer(size)); +var r = new stream.Readable(); +r._read = function(size) { + r.push(new Buffer(size)); }; -var W = new stream.Writable(); -W._write = function(data, cb) { +var w = new stream.Writable(); +w._write = function(data, cb) { cb(null); }; -R.pipe(W); +r.pipe(w); // This might sound unrealistic, but it happens in net.js. When // `socket.allowHalfOpen === false`, EOF will cause `.destroySoon()` call which // ends the writable side of net.Socket. -W.end(); +w.end(); diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index 7ff30c850ed..8939ad7a682 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -126,9 +126,9 @@ test('read(n) is ignored', function(t) { test('can read objects from _read (sync)', function(t) { var r = new Readable({ objectMode: true }); var list = [{ one: '1'}, { two: '2' }]; - r._read = function(n, cb) { + r._read = function(n) { var item = list.shift(); - cb(null, item || null); + r.push(item || null); }; r.pipe(toArray(function(list) { @@ -144,10 +144,10 @@ test('can read objects from _read (sync)', function(t) { test('can read objects from _read (async)', function(t) { var r = new Readable({ objectMode: true }); var list = [{ one: '1'}, { two: '2' }]; - r._read = function(n, cb) { + r._read = function(n) { var item = list.shift(); process.nextTick(function() { - cb(null, item || null); + r.push(item || null); }); }; @@ -223,7 +223,7 @@ test('high watermark _read', function(t) { var calls = 0; var list = ['1', '2', '3', '4', '5', '6', '7', '8']; - r._read = function() { + r._read = function(n) { calls++; }; @@ -249,7 +249,7 @@ test('high watermark push', function(t) { highWaterMark: 6, objectMode: true }); - r._read = function() {}; + r._read = function(n) {}; for (var i = 0; i < 6; i++) { var bool = r.push(i); assert.equal(bool, i === 5 ? false : true); diff --git a/test/simple/test-stream2-pipe-error-handling.js b/test/simple/test-stream2-pipe-error-handling.js index c17139f5d37..82c9a79be96 100644 --- a/test/simple/test-stream2-pipe-error-handling.js +++ b/test/simple/test-stream2-pipe-error-handling.js @@ -27,10 +27,10 @@ var stream = require('stream'); var count = 1000; var source = new stream.Readable(); - source._read = function(n, cb) { + source._read = function(n) { n = Math.min(count, n); count -= n; - cb(null, new Buffer(n)); + source.push(new Buffer(n)); }; var unpipedDest; @@ -67,10 +67,10 @@ var stream = require('stream'); var count = 1000; var source = new stream.Readable(); - source._read = function(n, cb) { + source._read = function(n) { n = Math.min(count, n); count -= n; - cb(null, new Buffer(n)); + source.push(new Buffer(n)); }; var unpipedDest; diff --git a/test/simple/test-stream2-read-sync-stack.js b/test/simple/test-stream2-read-sync-stack.js index 4e5ab172955..e8a73053c8c 100644 --- a/test/simple/test-stream2-read-sync-stack.js +++ b/test/simple/test-stream2-read-sync-stack.js @@ -30,9 +30,9 @@ var N = 256 * 1024; process.maxTickDepth = N + 2; var reads = 0; -r._read = function(n, cb) { +r._read = function(n) { var chunk = reads++ === N ? null : new Buffer(1); - cb(null, chunk); + r.push(chunk); }; r.on('readable', function onReadable() { diff --git a/test/simple/test-stream2-readable-empty-buffer-no-eof.js b/test/simple/test-stream2-readable-empty-buffer-no-eof.js index 6cc7c73f08f..cd301785802 100644 --- a/test/simple/test-stream2-readable-empty-buffer-no-eof.js +++ b/test/simple/test-stream2-readable-empty-buffer-no-eof.js @@ -43,28 +43,28 @@ function test1() { var buf = new Buffer(5); buf.fill('x'); var reads = 5; - r._read = function(n, cb) { + r._read = function(n) { switch (reads--) { case 0: - return cb(null, null); // EOF + return r.push(null); // EOF case 1: - return cb(null, buf); + return r.push(buf); case 2: setTimeout(r.read.bind(r, 0), 10); - return cb(null, new Buffer(0)); // Not-EOF! + return r.push(new Buffer(0)); // Not-EOF! case 3: setTimeout(r.read.bind(r, 0), 10); return process.nextTick(function() { - return cb(null, new Buffer(0)); + return r.push(new Buffer(0)); }); case 4: setTimeout(r.read.bind(r, 0), 10); return setTimeout(function() { - return cb(null, new Buffer(0)); + return r.push(new Buffer(0)); }); case 5: return setTimeout(function() { - return cb(null, buf); + return r.push(buf); }); default: throw new Error('unreachable'); @@ -92,11 +92,11 @@ function test1() { function test2() { var r = new Readable({ encoding: 'base64' }); var reads = 5; - r._read = function(n, cb) { + r._read = function(n) { if (!reads--) - return cb(null, null); // EOF + return r.push(null); // EOF else - return cb(null, new Buffer('x')); + return r.push(new Buffer('x')); }; var results = []; diff --git a/test/simple/test-stream2-readable-legacy-drain.js b/test/simple/test-stream2-readable-legacy-drain.js index a1fffd9bb50..675da8e90de 100644 --- a/test/simple/test-stream2-readable-legacy-drain.js +++ b/test/simple/test-stream2-readable-legacy-drain.js @@ -28,8 +28,8 @@ var Readable = Stream.Readable; var r = new Readable(); var N = 256; var reads = 0; -r._read = function(n, cb) { - return cb(null, ++reads === N ? null : new Buffer(1)); +r._read = function(n) { + return r.push(++reads === N ? null : new Buffer(1)); }; var rended = false; diff --git a/test/simple/test-stream2-readable-non-empty-end.js b/test/simple/test-stream2-readable-non-empty-end.js index 351bef6098d..7314ae77b13 100644 --- a/test/simple/test-stream2-readable-non-empty-end.js +++ b/test/simple/test-stream2-readable-non-empty-end.js @@ -32,10 +32,10 @@ for (var i = 1; i <= 10; i++) { var test = new Readable(); var n = 0; -test._read = function(size, cb) { +test._read = function(size) { var chunk = chunks[n++]; setTimeout(function() { - cb(null, chunk); + test.push(chunk); }); }; diff --git a/test/simple/test-stream2-set-encoding.js b/test/simple/test-stream2-set-encoding.js index 3571bac4f12..758a4342eed 100644 --- a/test/simple/test-stream2-set-encoding.js +++ b/test/simple/test-stream2-set-encoding.js @@ -72,25 +72,25 @@ function TestReader(n, opts) { this.len = n || 100; } -TestReader.prototype._read = function(n, cb) { +TestReader.prototype._read = function(n) { setTimeout(function() { if (this.pos >= this.len) { - return cb(); + return this.push(null); } n = Math.min(n, this.len - this.pos); if (n <= 0) { - return cb(); + return this.push(null); } this.pos += n; var ret = new Buffer(n); ret.fill('a'); - console.log("cb(null, ret)", ret) + console.log("this.push(ret)", ret) - return cb(null, ret); + return this.push(ret); }.bind(this), 1); }; diff --git a/test/simple/test-stream2-unpipe-drain.js b/test/simple/test-stream2-unpipe-drain.js index 0fc963e802f..4f5b3b7532a 100644 --- a/test/simple/test-stream2-unpipe-drain.js +++ b/test/simple/test-stream2-unpipe-drain.js @@ -45,9 +45,9 @@ function TestReader(id) { } util.inherits(TestReader, stream.Readable); -TestReader.prototype._read = function (size, callback) { +TestReader.prototype._read = function (size) { this.reads += 1; - crypto.randomBytes(size, callback); + this.push(crypto.randomBytes(size)); }; var src1 = new TestReader(); diff --git a/test/simple/test-stream2-unpipe-leak.js b/test/simple/test-stream2-unpipe-leak.js index a560bfa0cb9..993dd16e387 100644 --- a/test/simple/test-stream2-unpipe-leak.js +++ b/test/simple/test-stream2-unpipe-leak.js @@ -27,30 +27,30 @@ var stream = require('stream'); var util = require('util'); function TestWriter() { - stream.Writable.call(this); + stream.Writable.call(this); } util.inherits(TestWriter, stream.Writable); -TestWriter.prototype._write = function (buffer, callback) { - callback(null); +TestWriter.prototype._write = function(buffer, callback) { + callback(null); }; var dest = new TestWriter(); function TestReader() { - stream.Readable.call(this); + stream.Readable.call(this); } util.inherits(TestReader, stream.Readable); -TestReader.prototype._read = function (size, callback) { - callback(new Buffer('hallo')); +TestReader.prototype._read = function(size) { + stream.push(new Buffer('hallo')); }; var src = new TestReader(); for (var i = 0; i < 10; i++) { - src.pipe(dest); - src.unpipe(dest); + src.pipe(dest); + src.unpipe(dest); } assert.equal(src.listeners('end').length, 0);