stream: There is no _read cb, there is only push

This makes it so that `stream.push(chunk)` is the only way to signal the
end of reading, removing the confusing disparity between the
callback-style _read method, and the fact that most real-world streams
do not have a 1:1 corollation between the "please give me data" event,
and the actual arrival of a chunk of data.

It is still possible, of course, to implement a `CallbackReadable` on
top of this.  Simply provide a method like this as the callback:

    function readCallback(er, chunk) {
      if (er)
        stream.emit('error', er);
      else
        stream.push(chunk);
    }

However, *only* fs streams actually would behave in this way, so it
makes not a lot of sense to make TCP, TLS, HTTP, and all the rest have
to bend into this uncomfortable paradigm.
This commit is contained in:
isaacs 2013-02-28 15:32:32 -08:00
parent 4b67f0be6d
commit 88644eaa2d
20 changed files with 105 additions and 126 deletions

View File

@ -31,7 +31,7 @@ util.inherits(Readable, Stream);
function ReadableState(options, stream) { function ReadableState(options, stream) {
options = options || {}; options = options || {};
// the argument passed to this._read(n,cb) // the argument passed to this._read(n)
this.bufferSize = options.bufferSize || 16 * 1024; this.bufferSize = options.bufferSize || 16 * 1024;
// the point at which it stops calling _read() to fill the buffer // the point at which it stops calling _read() to fill the buffer
@ -58,10 +58,6 @@ function ReadableState(options, stream) {
// not happen before the first write call. // not happen before the first write call.
this.sync = true; this.sync = true;
this.onread = function(er, data) {
onread(stream, er, data);
};
// whenever we return null, then we set a flag to say // whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission. // that we're awaiting a 'readable' event emission.
this.needReadable = false; this.needReadable = false;
@ -121,7 +117,7 @@ function readableAddChunk(stream, state, chunk, addToFront) {
if (er) { if (er) {
stream.emit('error', er); stream.emit('error', er);
} else if (chunk === null || chunk === undefined) { } else if (chunk === null || chunk === undefined) {
onreadEof(stream, state); onEofChunk(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) { } else if (state.objectMode || chunk && chunk.length > 0) {
if (state.decoder) if (state.decoder)
chunk = state.decoder.write(chunk); chunk = state.decoder.write(chunk);
@ -196,7 +192,7 @@ function howMuchToRead(n, state) {
return n; return n;
} }
// you can override either this method, or _read(n, cb) below. // you can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) { Readable.prototype.read = function(n) {
var state = this._readableState; var state = this._readableState;
var nOrig = n; var nOrig = n;
@ -264,7 +260,7 @@ Readable.prototype.read = function(n) {
if (state.length === 0) if (state.length === 0)
state.needReadable = true; state.needReadable = true;
// call internal read method // call internal read method
this._read(state.bufferSize, state.onread); this._read(state.bufferSize);
state.sync = false; state.sync = false;
} }
@ -301,18 +297,6 @@ Readable.prototype.read = function(n) {
return ret; return ret;
}; };
// This is the function passed to _read(n,cb) as the callback.
// It should be called exactly once for every _read() call.
function onread(stream, er, chunk) {
var state = stream._readableState;
var sync = state.sync;
if (er)
stream.emit('error', er);
else
stream.push(chunk);
}
function chunkInvalid(state, chunk) { function chunkInvalid(state, chunk) {
var er = null; var er = null;
if (!Buffer.isBuffer(chunk) && if (!Buffer.isBuffer(chunk) &&
@ -327,7 +311,7 @@ function chunkInvalid(state, chunk) {
} }
function onreadEof(stream, state) { function onEofChunk(stream, state) {
state.ended = true; state.ended = true;
if (state.decoder) { if (state.decoder) {
var chunk = state.decoder.end(); var chunk = state.decoder.end();
@ -371,7 +355,7 @@ function emitReadable_(stream) {
// at this point, the user has presumably seen the 'readable' event, // at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered // and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if // in turn another _read(n) call, in which case reading = true if
// it's in progress. // it's in progress.
// However, if we're not ended, or reading, and the length < hwm, // However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively. // then go ahead and try to read some more right now preemptively.
@ -395,10 +379,8 @@ function maybeReadMore_(stream, state) {
// call cb(er, data) where data is <= n in length. // call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat // for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful. // arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n, cb) { Readable.prototype._read = function(n) {
process.nextTick(function() { this.emit('error', new Error('not implemented'));
cb(new Error('not implemented'));
});
}; };
Readable.prototype.pipe = function(dest, pipeOpts) { Readable.prototype.pipe = function(dest, pipeOpts) {
@ -758,7 +740,7 @@ Readable.prototype.wrap = function(stream) {
// when we try to consume some more bytes, simply unpause the // when we try to consume some more bytes, simply unpause the
// underlying stream. // underlying stream.
self._read = function(n, cb) { self._read = function(n) {
if (paused) { if (paused) {
stream.resume(); stream.resume();
paused = false; paused = false;

View File

@ -36,11 +36,11 @@
// The Transform stream has all the aspects of the readable and writable // The Transform stream has all the aspects of the readable and writable
// stream classes. When you write(chunk), that calls _write(chunk,cb) // stream classes. When you write(chunk), that calls _write(chunk,cb)
// internally, and returns false if there's a lot of pending writes // internally, and returns false if there's a lot of pending writes
// buffered up. When you call read(), that calls _read(n,cb) until // buffered up. When you call read(), that calls _read(n) until
// there's enough pending readable data buffered up. // there's enough pending readable data buffered up.
// //
// In a transform stream, the written data is placed in a buffer. When // In a transform stream, the written data is placed in a buffer. When
// _read(n,cb) is called, it transforms the queued up data, calling the // _read(n) is called, it transforms the queued up data, calling the
// buffered _write cb's as it consumes chunks. If consuming a single // buffered _write cb's as it consumes chunks. If consuming a single
// written chunk would result in multiple output chunks, then the first // written chunk would result in multiple output chunks, then the first
// outputted bit calls the readcb, and subsequent chunks just go into // outputted bit calls the readcb, and subsequent chunks just go into
@ -106,7 +106,7 @@ function afterTransform(stream, er, data) {
var rs = stream._readableState; var rs = stream._readableState;
if (rs.needReadable || rs.length < rs.highWaterMark) { if (rs.needReadable || rs.length < rs.highWaterMark) {
stream._read(); stream._read(rs.bufferSize);
} }
} }
@ -162,13 +162,13 @@ Transform.prototype._write = function(chunk, cb) {
return; return;
var rs = this._readableState; var rs = this._readableState;
if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark)
this._read(); this._read(rs.bufferSize);
}; };
// Doesn't matter what the args are here. // Doesn't matter what the args are here.
// the output and callback functions passed to _transform do all the work. // the output and callback functions passed to _transform do all the work.
// That we got here means that the readable side wants more data. // That we got here means that the readable side wants more data.
Transform.prototype._read = function(n, cb) { Transform.prototype._read = function(n) {
var ts = this._transformState; var ts = this._transformState;
if (ts.writechunk && ts.writecb && !ts.transforming) { if (ts.writechunk && ts.writecb && !ts.transforming) {

View File

@ -1453,10 +1453,10 @@ ReadStream.prototype.open = function() {
}); });
}; };
ReadStream.prototype._read = function(n, cb) { ReadStream.prototype._read = function(n) {
if (typeof this.fd !== 'number') if (typeof this.fd !== 'number')
return this.once('open', function() { return this.once('open', function() {
this._read(n, cb); this._read(n);
}); });
if (this.destroyed) if (this.destroyed)
@ -1482,7 +1482,7 @@ ReadStream.prototype._read = function(n, cb) {
// already read everything we were supposed to read! // already read everything we were supposed to read!
// treat as EOF. // treat as EOF.
if (toRead <= 0) if (toRead <= 0)
return cb(); return this.push(null);
// the actual read. // the actual read.
var self = this; var self = this;
@ -1498,14 +1498,14 @@ ReadStream.prototype._read = function(n, cb) {
if (self.autoClose) { if (self.autoClose) {
self.destroy(); self.destroy();
} }
return cb(er); self.emit('error', er);
} else {
var b = null;
if (bytesRead > 0)
b = thisPool.slice(start, start + bytesRead);
self.push(b);
} }
var b = null;
if (bytesRead > 0)
b = thisPool.slice(start, start + bytesRead);
cb(null, b);
} }
}; };

View File

@ -331,12 +331,12 @@ IncomingMessage.prototype.read = function(n) {
}; };
IncomingMessage.prototype._read = function(n, callback) { IncomingMessage.prototype._read = function(n) {
// We actually do almost nothing here, because the parserOnBody // We actually do almost nothing here, because the parserOnBody
// function fills up our internal buffer directly. However, we // function fills up our internal buffer directly. However, we
// do need to unpause the underlying socket so that it flows. // do need to unpause the underlying socket so that it flows.
if (!this.socket.readable) if (!this.socket.readable)
return callback(null, null); this.push(null);
else else
readStart(this.socket); readStart(this.socket);
}; };

View File

@ -228,7 +228,7 @@ function afterShutdown(status, handle, req) {
function onSocketEnd() { function onSocketEnd() {
// XXX Should not have to do as much crap in this function. // XXX Should not have to do as much crap in this function.
// ended should already be true, since this is called *after* // ended should already be true, since this is called *after*
// the EOF errno and onread has returned null to the _read cb. // the EOF errno and onread has eof'ed
debug('onSocketEnd', this._readableState); debug('onSocketEnd', this._readableState);
this._readableState.ended = true; this._readableState.ended = true;
if (this._readableState.endEmitted) { if (this._readableState.endEmitted) {
@ -335,25 +335,19 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
// Just call handle.readStart until we have enough in the buffer // Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n, callback) { Socket.prototype._read = function(n) {
debug('_read'); debug('_read');
if (this._connecting || !this._handle) { if (this._connecting || !this._handle) {
debug('_read wait for connection'); debug('_read wait for connection');
this.once('connect', this._read.bind(this, n, callback)); this.once('connect', this._read.bind(this, n));
return; } else if (!this._handle.reading) {
} // not already reading, start the flow
assert(callback === this._readableState.onread);
assert(this._readableState.reading = true);
if (!this._handle.reading) {
debug('Socket._read readStart'); debug('Socket._read readStart');
this._handle.reading = true; this._handle.reading = true;
var r = this._handle.readStart(); var r = this._handle.readStart();
if (r) if (r)
this._destroy(errnoException(process._errno, 'read')); this._destroy(errnoException(process._errno, 'read'));
} else {
debug('readStart already has been called.');
} }
}; };
@ -495,7 +489,7 @@ function onread(buffer, offset, length) {
if (self.onend) self.once('end', self.onend); if (self.onend) self.once('end', self.onend);
// send a null to the _read cb to signal the end of data. // push a null to signal the end of data.
self.push(null); self.push(null);
// internal end event so that we know that the actual socket // internal end event so that we know that the actual socket

View File

@ -381,12 +381,13 @@ CryptoStream.prototype._writePending = function writePending() {
}; };
CryptoStream.prototype._read = function read(size, cb) { CryptoStream.prototype._read = function read(size) {
// XXX: EOF?! // XXX: EOF?!
if (!this.pair.ssl) return cb(null, null); if (!this.pair.ssl) return this.push(null);
// Wait for session to be resumed // Wait for session to be resumed
if (this._resumingSession || !this._reading) return cb(null, ''); // Mark that we're done reading, but don't provide data or EOF
if (this._resumingSession || !this._reading) return this.push('');
var out; var out;
if (this === this.pair.cleartext) { if (this === this.pair.cleartext) {
@ -441,11 +442,12 @@ CryptoStream.prototype._read = function read(size, cb) {
if (this === this.pair.cleartext) if (this === this.pair.cleartext)
this._opposite._done(); this._opposite._done();
return cb(null, null); // EOF
return this.push(null);
} }
// Bail out // Bail out
return cb(null, ''); return this.push('');
} }
// Give them requested data // Give them requested data
@ -459,7 +461,7 @@ CryptoStream.prototype._read = function read(size, cb) {
self.read(bytesRead); self.read(bytesRead);
}); });
} }
return cb(null, pool.slice(start, start + bytesRead)); return this.push(pool.slice(start, start + bytesRead));
}; };

View File

@ -33,10 +33,10 @@ var reads = 0;
var eofed = false; var eofed = false;
var ended = false; var ended = false;
r._read = function(n, cb) { r._read = function(n) {
if (reads === 0) { if (reads === 0) {
setTimeout(function() { setTimeout(function() {
cb(null, str); r.push(str);
}); });
reads++; reads++;
} else if (reads === 1) { } else if (reads === 1) {
@ -46,7 +46,7 @@ r._read = function(n, cb) {
} else { } else {
assert(!eofed); assert(!eofed);
eofed = true; eofed = true;
cb(null, null); r.push(null);
} }
}; };

View File

@ -30,14 +30,15 @@ var s = new Readable({
var list = ['1', '2', '3', '4', '5', '6']; var list = ['1', '2', '3', '4', '5', '6'];
s._read = function (n, cb) { s._read = function (n) {
var one = list.shift(); var one = list.shift();
if (!one) if (!one) {
return cb(null, null); s.push(null);
} else {
var two = list.shift(); var two = list.shift();
s.push(one); s.push(one);
cb(null, two); s.push(two);
}
}; };
var v = s.read(0); var v = s.read(0);

View File

@ -406,7 +406,7 @@ test('read(0) for ended streams', function (t) {
var r = new R(); var r = new R();
var written = false; var written = false;
var ended = false; var ended = false;
r._read = function () {}; r._read = function (n) {};
r.push(new Buffer("foo")); r.push(new Buffer("foo"));
r.push(null); r.push(null);
@ -435,8 +435,8 @@ test('read(0) for ended streams', function (t) {
test('sync _read ending', function (t) { test('sync _read ending', function (t) {
var r = new R(); var r = new R();
var called = false; var called = false;
r._read = function (n, cb) { r._read = function (n) {
cb(null, null); r.push(null);
}; };
r.once('end', function () { r.once('end', function () {

View File

@ -41,8 +41,8 @@ function TestReader() {
util.inherits(TestReader, R); util.inherits(TestReader, R);
TestReader.prototype._read = function(n, cb) { TestReader.prototype._read = function(n) {
cb(null, this._buffer); this.push(this._buffer);
this._buffer = new Buffer(0); this._buffer = new Buffer(0);
}; };

View File

@ -23,19 +23,19 @@ var common = require('../common.js');
var stream = require('stream'); var stream = require('stream');
var Buffer = require('buffer').Buffer; var Buffer = require('buffer').Buffer;
var R = new stream.Readable(); var r = new stream.Readable();
R._read = function(size, cb) { r._read = function(size) {
cb(null, new Buffer(size)); r.push(new Buffer(size));
}; };
var W = new stream.Writable(); var w = new stream.Writable();
W._write = function(data, cb) { w._write = function(data, cb) {
cb(null); cb(null);
}; };
R.pipe(W); r.pipe(w);
// This might sound unrealistic, but it happens in net.js. When // This might sound unrealistic, but it happens in net.js. When
// `socket.allowHalfOpen === false`, EOF will cause `.destroySoon()` call which // `socket.allowHalfOpen === false`, EOF will cause `.destroySoon()` call which
// ends the writable side of net.Socket. // ends the writable side of net.Socket.
W.end(); w.end();

View File

@ -126,9 +126,9 @@ test('read(n) is ignored', function(t) {
test('can read objects from _read (sync)', function(t) { test('can read objects from _read (sync)', function(t) {
var r = new Readable({ objectMode: true }); var r = new Readable({ objectMode: true });
var list = [{ one: '1'}, { two: '2' }]; var list = [{ one: '1'}, { two: '2' }];
r._read = function(n, cb) { r._read = function(n) {
var item = list.shift(); var item = list.shift();
cb(null, item || null); r.push(item || null);
}; };
r.pipe(toArray(function(list) { r.pipe(toArray(function(list) {
@ -144,10 +144,10 @@ test('can read objects from _read (sync)', function(t) {
test('can read objects from _read (async)', function(t) { test('can read objects from _read (async)', function(t) {
var r = new Readable({ objectMode: true }); var r = new Readable({ objectMode: true });
var list = [{ one: '1'}, { two: '2' }]; var list = [{ one: '1'}, { two: '2' }];
r._read = function(n, cb) { r._read = function(n) {
var item = list.shift(); var item = list.shift();
process.nextTick(function() { process.nextTick(function() {
cb(null, item || null); r.push(item || null);
}); });
}; };
@ -223,7 +223,7 @@ test('high watermark _read', function(t) {
var calls = 0; var calls = 0;
var list = ['1', '2', '3', '4', '5', '6', '7', '8']; var list = ['1', '2', '3', '4', '5', '6', '7', '8'];
r._read = function() { r._read = function(n) {
calls++; calls++;
}; };
@ -249,7 +249,7 @@ test('high watermark push', function(t) {
highWaterMark: 6, highWaterMark: 6,
objectMode: true objectMode: true
}); });
r._read = function() {}; r._read = function(n) {};
for (var i = 0; i < 6; i++) { for (var i = 0; i < 6; i++) {
var bool = r.push(i); var bool = r.push(i);
assert.equal(bool, i === 5 ? false : true); assert.equal(bool, i === 5 ? false : true);

View File

@ -27,10 +27,10 @@ var stream = require('stream');
var count = 1000; var count = 1000;
var source = new stream.Readable(); var source = new stream.Readable();
source._read = function(n, cb) { source._read = function(n) {
n = Math.min(count, n); n = Math.min(count, n);
count -= n; count -= n;
cb(null, new Buffer(n)); source.push(new Buffer(n));
}; };
var unpipedDest; var unpipedDest;
@ -67,10 +67,10 @@ var stream = require('stream');
var count = 1000; var count = 1000;
var source = new stream.Readable(); var source = new stream.Readable();
source._read = function(n, cb) { source._read = function(n) {
n = Math.min(count, n); n = Math.min(count, n);
count -= n; count -= n;
cb(null, new Buffer(n)); source.push(new Buffer(n));
}; };
var unpipedDest; var unpipedDest;

View File

@ -30,9 +30,9 @@ var N = 256 * 1024;
process.maxTickDepth = N + 2; process.maxTickDepth = N + 2;
var reads = 0; var reads = 0;
r._read = function(n, cb) { r._read = function(n) {
var chunk = reads++ === N ? null : new Buffer(1); var chunk = reads++ === N ? null : new Buffer(1);
cb(null, chunk); r.push(chunk);
}; };
r.on('readable', function onReadable() { r.on('readable', function onReadable() {

View File

@ -43,28 +43,28 @@ function test1() {
var buf = new Buffer(5); var buf = new Buffer(5);
buf.fill('x'); buf.fill('x');
var reads = 5; var reads = 5;
r._read = function(n, cb) { r._read = function(n) {
switch (reads--) { switch (reads--) {
case 0: case 0:
return cb(null, null); // EOF return r.push(null); // EOF
case 1: case 1:
return cb(null, buf); return r.push(buf);
case 2: case 2:
setTimeout(r.read.bind(r, 0), 10); setTimeout(r.read.bind(r, 0), 10);
return cb(null, new Buffer(0)); // Not-EOF! return r.push(new Buffer(0)); // Not-EOF!
case 3: case 3:
setTimeout(r.read.bind(r, 0), 10); setTimeout(r.read.bind(r, 0), 10);
return process.nextTick(function() { return process.nextTick(function() {
return cb(null, new Buffer(0)); return r.push(new Buffer(0));
}); });
case 4: case 4:
setTimeout(r.read.bind(r, 0), 10); setTimeout(r.read.bind(r, 0), 10);
return setTimeout(function() { return setTimeout(function() {
return cb(null, new Buffer(0)); return r.push(new Buffer(0));
}); });
case 5: case 5:
return setTimeout(function() { return setTimeout(function() {
return cb(null, buf); return r.push(buf);
}); });
default: default:
throw new Error('unreachable'); throw new Error('unreachable');
@ -92,11 +92,11 @@ function test1() {
function test2() { function test2() {
var r = new Readable({ encoding: 'base64' }); var r = new Readable({ encoding: 'base64' });
var reads = 5; var reads = 5;
r._read = function(n, cb) { r._read = function(n) {
if (!reads--) if (!reads--)
return cb(null, null); // EOF return r.push(null); // EOF
else else
return cb(null, new Buffer('x')); return r.push(new Buffer('x'));
}; };
var results = []; var results = [];

View File

@ -28,8 +28,8 @@ var Readable = Stream.Readable;
var r = new Readable(); var r = new Readable();
var N = 256; var N = 256;
var reads = 0; var reads = 0;
r._read = function(n, cb) { r._read = function(n) {
return cb(null, ++reads === N ? null : new Buffer(1)); return r.push(++reads === N ? null : new Buffer(1));
}; };
var rended = false; var rended = false;

View File

@ -32,10 +32,10 @@ for (var i = 1; i <= 10; i++) {
var test = new Readable(); var test = new Readable();
var n = 0; var n = 0;
test._read = function(size, cb) { test._read = function(size) {
var chunk = chunks[n++]; var chunk = chunks[n++];
setTimeout(function() { setTimeout(function() {
cb(null, chunk); test.push(chunk);
}); });
}; };

View File

@ -72,25 +72,25 @@ function TestReader(n, opts) {
this.len = n || 100; this.len = n || 100;
} }
TestReader.prototype._read = function(n, cb) { TestReader.prototype._read = function(n) {
setTimeout(function() { setTimeout(function() {
if (this.pos >= this.len) { if (this.pos >= this.len) {
return cb(); return this.push(null);
} }
n = Math.min(n, this.len - this.pos); n = Math.min(n, this.len - this.pos);
if (n <= 0) { if (n <= 0) {
return cb(); return this.push(null);
} }
this.pos += n; this.pos += n;
var ret = new Buffer(n); var ret = new Buffer(n);
ret.fill('a'); ret.fill('a');
console.log("cb(null, ret)", ret) console.log("this.push(ret)", ret)
return cb(null, ret); return this.push(ret);
}.bind(this), 1); }.bind(this), 1);
}; };

View File

@ -45,9 +45,9 @@ function TestReader(id) {
} }
util.inherits(TestReader, stream.Readable); util.inherits(TestReader, stream.Readable);
TestReader.prototype._read = function (size, callback) { TestReader.prototype._read = function (size) {
this.reads += 1; this.reads += 1;
crypto.randomBytes(size, callback); this.push(crypto.randomBytes(size));
}; };
var src1 = new TestReader(); var src1 = new TestReader();

View File

@ -27,30 +27,30 @@ var stream = require('stream');
var util = require('util'); var util = require('util');
function TestWriter() { function TestWriter() {
stream.Writable.call(this); stream.Writable.call(this);
} }
util.inherits(TestWriter, stream.Writable); util.inherits(TestWriter, stream.Writable);
TestWriter.prototype._write = function (buffer, callback) { TestWriter.prototype._write = function(buffer, callback) {
callback(null); callback(null);
}; };
var dest = new TestWriter(); var dest = new TestWriter();
function TestReader() { function TestReader() {
stream.Readable.call(this); stream.Readable.call(this);
} }
util.inherits(TestReader, stream.Readable); util.inherits(TestReader, stream.Readable);
TestReader.prototype._read = function (size, callback) { TestReader.prototype._read = function(size) {
callback(new Buffer('hallo')); stream.push(new Buffer('hallo'));
}; };
var src = new TestReader(); var src = new TestReader();
for (var i = 0; i < 10; i++) { for (var i = 0; i < 10; i++) {
src.pipe(dest); src.pipe(dest);
src.unpipe(dest); src.unpipe(dest);
} }
assert.equal(src.listeners('end').length, 0); assert.equal(src.listeners('end').length, 0);