streams2: Remove function.bind() usage

It's too slow, unfortunately.
This commit is contained in:
isaacs 2012-11-17 15:24:14 +11:00
parent 2ff499c022
commit b15e19a232
3 changed files with 72 additions and 51 deletions

View File

@ -69,7 +69,9 @@ function ReadableState(options, stream) {
this.endEmitted = false; this.endEmitted = false;
this.reading = false; this.reading = false;
this.sync = false; this.sync = false;
this.onread = onread.bind(stream); this.onread = function(er, data) {
onread(stream, er, data);
};
// whenever we return null, then we set a flag to say // whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission. // that we're awaiting a 'readable' event emission.
@ -208,13 +210,13 @@ Readable.prototype.read = function(n) {
return ret; return ret;
}; };
function onread(er, chunk) { function onread(stream, er, chunk) {
var state = this._readableState; var state = stream._readableState;
var sync = state.sync; var sync = state.sync;
state.reading = false; state.reading = false;
if (er) if (er)
return this.emit('error', er); return stream.emit('error', er);
if (!chunk || !chunk.length) { if (!chunk || !chunk.length) {
// eof // eof
@ -233,10 +235,10 @@ function onread(er, chunk) {
state.needReadable = false; state.needReadable = false;
if (!state.emittedReadable) { if (!state.emittedReadable) {
state.emittedReadable = true; state.emittedReadable = true;
this.emit('readable'); stream.emit('readable');
} }
} else } else
endReadable(this); endReadable(stream);
} }
return; return;
} }
@ -257,7 +259,7 @@ function onread(er, chunk) {
// another _read(n,cb) before this one returns! // another _read(n,cb) before this one returns!
if (state.length <= state.lowWaterMark) { if (state.length <= state.lowWaterMark) {
state.reading = true; state.reading = true;
this._read(state.bufferSize, state.onread); stream._read(state.bufferSize, state.onread);
return; return;
} }
@ -265,7 +267,7 @@ function onread(er, chunk) {
state.needReadable = false; state.needReadable = false;
if (!state.emittedReadable) { if (!state.emittedReadable) {
state.emittedReadable = true; state.emittedReadable = true;
this.emit('readable'); stream.emit('readable');
} }
} }
} }
@ -275,7 +277,9 @@ function onread(er, chunk) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat // for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful. // arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n, cb) { Readable.prototype._read = function(n, cb) {
process.nextTick(cb.bind(this, new Error('not implemented'))); process.nextTick(function() {
cb(new Error('not implemented'));
});
}; };
Readable.prototype.pipe = function(dest, pipeOpts) { Readable.prototype.pipe = function(dest, pipeOpts) {
@ -316,7 +320,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// start the flow. // start the flow.
if (!state.flowing) { if (!state.flowing) {
state.flowing = true; state.flowing = true;
process.nextTick(flow.bind(null, src, pipeOpts)); process.nextTick(function() {
flow(src, pipeOpts);
});
} }
return dest; return dest;
@ -371,7 +377,9 @@ function flow(src, pipeOpts) {
// at this point, no one needed a drain, so we just ran out of data // at this point, no one needed a drain, so we just ran out of data
// on the next readable event, start it over again. // on the next readable event, start it over again.
src.once('readable', flow.bind(null, src, pipeOpts)); src.once('readable', function() {
flow(src, pipeOpts);
});
} }
Readable.prototype.unpipe = function(dest) { Readable.prototype.unpipe = function(dest) {
@ -504,6 +512,7 @@ Readable.prototype.wrap = function(stream) {
var state = this._readableState; var state = this._readableState;
var paused = false; var paused = false;
var self = this;
stream.on('end', function() { stream.on('end', function() {
state.ended = true; state.ended = true;
if (state.decoder) { if (state.decoder) {
@ -515,10 +524,10 @@ Readable.prototype.wrap = function(stream) {
} }
if (state.length > 0) if (state.length > 0)
this.emit('readable'); self.emit('readable');
else else
endReadable(this); endReadable(self);
}.bind(this)); });
stream.on('data', function(chunk) { stream.on('data', function(chunk) {
if (state.decoder) if (state.decoder)
@ -528,14 +537,14 @@ Readable.prototype.wrap = function(stream) {
state.buffer.push(chunk); state.buffer.push(chunk);
state.length += chunk.length; state.length += chunk.length;
this.emit('readable'); self.emit('readable');
// if not consumed, then pause the stream. // if not consumed, then pause the stream.
if (state.length > state.lowWaterMark && !paused) { if (state.length > state.lowWaterMark && !paused) {
paused = true; paused = true;
stream.pause(); stream.pause();
} }
}.bind(this)); });
// proxy all the other methods. // proxy all the other methods.
// important when wrapping filters and duplexes. // important when wrapping filters and duplexes.
@ -551,8 +560,8 @@ Readable.prototype.wrap = function(stream) {
// proxy certain important events. // proxy certain important events.
var events = ['error', 'close', 'destroy', 'pause', 'resume']; var events = ['error', 'close', 'destroy', 'pause', 'resume'];
events.forEach(function(ev) { events.forEach(function(ev) {
stream.on(ev, this.emit.bind(this, ev)); stream.on(ev, self.emit.bind(self, ev));
}.bind(this)); });
// consume some bytes. if not all is consumed, then // consume some bytes. if not all is consumed, then
// pause the underlying stream. // pause the underlying stream.
@ -660,5 +669,7 @@ function endReadable(stream) {
return; return;
state.ended = true; state.ended = true;
state.endEmitted = true; state.endEmitted = true;
process.nextTick(stream.emit.bind(stream, 'end')); process.nextTick(function() {
stream.emit('end');
});
} }

View File

@ -70,10 +70,13 @@ var Duplex = require('_stream_duplex');
var util = require('util'); var util = require('util');
util.inherits(Transform, Duplex); util.inherits(Transform, Duplex);
function TransformState() { function TransformState(stream) {
this.buffer = []; this.buffer = [];
this.transforming = false; this.transforming = false;
this.pendingReadCb = null; this.pendingReadCb = null;
this.output = function(chunk) {
stream._output(chunk);
};
} }
function Transform(options) { function Transform(options) {
@ -83,17 +86,19 @@ function Transform(options) {
Duplex.call(this, options); Duplex.call(this, options);
// bind output so that it can be passed around as a regular function. // bind output so that it can be passed around as a regular function.
this._output = this._output.bind(this); var stream = this;
// the queue of _write chunks that are pending being transformed // the queue of _write chunks that are pending being transformed
this._transformState = new TransformState(); var ts = this._transformState = new TransformState(stream);
// when the writable side finishes, then flush out anything remaining. // when the writable side finishes, then flush out anything remaining.
this.once('finish', function() { this.once('finish', function() {
if ('function' === typeof this._flush) if ('function' === typeof this._flush)
this._flush(this._output, done.bind(this)); this._flush(ts.output, function(er) {
done(stream, er);
});
else else
done.call(this); done(stream);
}); });
} }
@ -159,14 +164,13 @@ Transform.prototype._read = function(n, readcb) {
var req = ts.buffer.shift(); var req = ts.buffer.shift();
var chunk = req[0]; var chunk = req[0];
var writecb = req[1]; var writecb = req[1];
var output = this._output;
ts.transforming = true; ts.transforming = true;
this._transform(chunk, output, function(er, data) { this._transform(chunk, ts.output, function(er, data) {
ts.transforming = false; ts.transforming = false;
if (data) if (data)
output(data); ts.output(data);
writecb(er); writecb(er);
}.bind(this)); });
}; };
Transform.prototype._output = function(chunk) { Transform.prototype._output = function(chunk) {
@ -185,25 +189,25 @@ Transform.prototype._output = function(chunk) {
} }
// otherwise, it's up to us to fill the rs buffer. // otherwise, it's up to us to fill the rs buffer.
var state = this._readableState; var rs = this._readableState;
var len = state.length; var len = rs.length;
state.buffer.push(chunk); rs.buffer.push(chunk);
state.length += chunk.length; rs.length += chunk.length;
if (state.needReadable) { if (rs.needReadable) {
state.needReadable = false; rs.needReadable = false;
this.emit('readable'); this.emit('readable');
} }
}; };
function done(er) { function done(stream, er) {
if (er) if (er)
return this.emit('error', er); return stream.emit('error', er);
// if there's nothing in the write buffer, then that means // if there's nothing in the write buffer, then that means
// that nothing more will ever be provided // that nothing more will ever be provided
var ws = this._writableState; var ws = stream._writableState;
var rs = this._readableState; var rs = stream._readableState;
var ts = this._transformState; var ts = stream._transformState;
if (ws.length) if (ws.length)
throw new Error('calling transform done when ws.length != 0'); throw new Error('calling transform done when ws.length != 0');
@ -221,7 +225,7 @@ function done(er) {
// no more data coming from the writable side, we need to emit // no more data coming from the writable side, we need to emit
// now so that the consumer knows to pick up the tail bits. // now so that the consumer knows to pick up the tail bits.
if (rs.length && rs.needReadable) if (rs.length && rs.needReadable)
this.emit('readable'); stream.emit('readable');
else if (rs.length === 0) else if (rs.length === 0)
this.emit('end'); stream.emit('end');
} }

View File

@ -82,7 +82,9 @@ function WritableState(options, stream) {
this.sync = false; this.sync = false;
// the callback that's passed to _write(chunk,cb) // the callback that's passed to _write(chunk,cb)
this.onwrite = onwrite.bind(stream); this.onwrite = function(er) {
onwrite(stream, er);
};
// the callback that the user supplies to write(chunk,encoding,cb) // the callback that the user supplies to write(chunk,encoding,cb)
this.writecb = null; this.writecb = null;
@ -155,8 +157,8 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret; return ret;
}; };
function onwrite(er) { function onwrite(stream, er) {
var state = this._writableState; var state = stream._writableState;
var sync = state.sync; var sync = state.sync;
var cb = state.writecb; var cb = state.writecb;
var l = state.writelen; var l = state.writelen;
@ -168,11 +170,13 @@ function onwrite(er) {
if (er) { if (er) {
if (cb) { if (cb) {
if (sync) if (sync)
process.nextTick(cb.bind(null, er)); process.nextTick(function() {
cb(er);
});
else else
cb(er); cb(er);
} else } else
this.emit('error', er); stream.emit('error', er);
return; return;
} }
state.length -= l; state.length -= l;
@ -189,7 +193,7 @@ function onwrite(er) {
if (state.length === 0 && (state.ended || state.ending)) { if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end. // emit 'finish' at the very end.
state.finishing = true; state.finishing = true;
this.emit('finish'); stream.emit('finish');
state.finished = true; state.finished = true;
return; return;
} }
@ -209,7 +213,7 @@ function onwrite(er) {
state.writecb = cb; state.writecb = cb;
state.writechunk = chunk; state.writechunk = chunk;
state.writing = true; state.writing = true;
this._write(chunk, state.onwrite); stream._write(chunk, state.onwrite);
} }
if (state.length <= state.lowWaterMark && state.needDrain) { if (state.length <= state.lowWaterMark && state.needDrain) {
@ -220,13 +224,15 @@ function onwrite(er) {
if (!state.needDrain) if (!state.needDrain)
return; return;
state.needDrain = false; state.needDrain = false;
this.emit('drain'); stream.emit('drain');
}.bind(this)); });
} }
} }
Writable.prototype._write = function(chunk, cb) { Writable.prototype._write = function(chunk, cb) {
process.nextTick(cb.bind(this, new Error('not implemented'))); process.nextTick(function() {
cb(new Error('not implemented'));
});
}; };
Writable.prototype.end = function(chunk, encoding) { Writable.prototype.end = function(chunk, encoding) {