streams2: Support write(chunk,[encoding],[callback])

This commit is contained in:
isaacs 2012-10-09 17:31:29 -07:00
parent 0678480b57
commit 71e2b61388

View File

@ -73,13 +73,18 @@ function Writable(options) {
// Override this method for sync streams // Override this method for sync streams
// override the _write(chunk, cb) method for async streams // override the _write(chunk, cb) method for async streams
Writable.prototype.write = function(chunk, encoding) { Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState; var state = this._writableState;
if (state.ended) { if (state.ended) {
this.emit('error', new Error('write after end')); this.emit('error', new Error('write after end'));
return; return;
} }
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
var l = chunk.length; var l = chunk.length;
if (false === state.decodeStrings) if (false === state.decodeStrings)
chunk = [chunk, encoding]; chunk = [chunk, encoding];
@ -94,24 +99,43 @@ Writable.prototype.write = function(chunk, encoding) {
if (ret === false) if (ret === false)
state.needDrain = true; state.needDrain = true;
// if we're already writing something, then just put this
// in the queue, and wait our turn.
if (state.writing) { if (state.writing) {
state.buffer.push(chunk); state.buffer.push([chunk, cb]);
return ret; return ret;
} }
state.writing = true; state.writing = true;
var sync = true;
this._write(chunk, writecb.bind(this)); this._write(chunk, writecb.bind(this));
sync = false;
return ret; return ret;
function writecb(er) { function writecb(er) {
state.writing = false; state.writing = false;
if (er) { if (er) {
this.emit('error', er); if (cb) {
if (sync)
process.nextTick(cb.bind(null, er));
else
cb(er);
} else
this.emit('error', er);
return; return;
} }
state.length -= l; state.length -= l;
if (cb) {
// don't call the cb until the next tick if we're in sync mode.
// also, defer if we're about to write some more right now.
if (sync || state.buffer.length)
process.nextTick(cb);
else
cb();
}
if (state.length === 0 && (state.ended || state.ending)) { if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end. // emit 'finish' at the very end.
this.emit('finish'); this.emit('finish');
@ -120,8 +144,15 @@ Writable.prototype.write = function(chunk, encoding) {
// if there's something in the buffer waiting, then do that, too. // if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) { if (state.buffer.length) {
chunk = state.buffer.shift(); var chunkCb = state.buffer.shift();
l = chunk.length; chunk = chunkCb[0];
cb = chunkCb[1];
if (false === state.decodeStrings)
l = chunk[0].length;
else
l = chunk.length;
state.writing = true; state.writing = true;
this._write(chunk, writecb.bind(this)); this._write(chunk, writecb.bind(this));
} }