stream: _write takes an encoding argument

This vastly reduces the overhead of decodeStrings:false streams,
such as net and http.
This commit is contained in:
isaacs 2013-03-03 19:14:06 -08:00
parent cd68d86c32
commit 426b4c6258
16 changed files with 100 additions and 98 deletions

View File

@ -436,8 +436,8 @@ Resumes the incoming `'data'` events after a `pause()`.
A `Writable` Stream has the following methods, members, and events. A `Writable` Stream has the following methods, members, and events.
Note that `stream.Writable` is an abstract class designed to be Note that `stream.Writable` is an abstract class designed to be
extended with an underlying implementation of the `_write(chunk, cb)` extended with an underlying implementation of the
method. (See below.) `_write(chunk, encoding, cb)` method. (See below.)
### new stream.Writable([options]) ### new stream.Writable([options])
@ -451,10 +451,16 @@ In classes that extend the Writable class, make sure to call the
constructor so that the buffering settings can be properly constructor so that the buffering settings can be properly
initialized. initialized.
### writable.\_write(chunk, callback) ### writable.\_write(chunk, encoding, callback)
* `chunk` {Buffer | Array} The data to be written * `chunk` {Buffer | String} The chunk to be written. Will always
* `callback` {Function} Called with an error, or null when finished be a buffer unless the `decodeStrings` option was set to `false`.
* `encoding` {String} If the chunk is a string, then this is the
encoding type. Ignore chunk is a buffer. Note that chunk will
**always** be a buffer unless the `decodeStrings` option is
explicitly set to `false`.
* `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk.
All Writable stream implementations must provide a `_write` method to All Writable stream implementations must provide a `_write` method to
send data to the underlying resource. send data to the underlying resource.
@ -467,9 +473,12 @@ Call the callback using the standard `callback(error)` pattern to
signal that the write completed successfully or with an error. signal that the write completed successfully or with an error.
If the `decodeStrings` flag is set in the constructor options, then If the `decodeStrings` flag is set in the constructor options, then
`chunk` will be an array rather than a Buffer. This is to support `chunk` may be a string rather than a Buffer, and `encoding` will
indicate the sort of string that it is. This is to support
implementations that have an optimized handling for certain string implementations that have an optimized handling for certain string
data encodings. data encodings. If you do not explicitly set the `decodeStrings`
option to `false`, then you can safely ignore the `encoding` argument,
and assume that `chunk` will always be a Buffer.
This method is prefixed with an underscore because it is internal to This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user the class that defines it, and should not be called directly by user
@ -543,13 +552,13 @@ TCP socket connection.
Note that `stream.Duplex` is an abstract class designed to be Note that `stream.Duplex` is an abstract class designed to be
extended with an underlying implementation of the `_read(size)` extended with an underlying implementation of the `_read(size)`
and `_write(chunk, callback)` methods as you would with a Readable or and `_write(chunk, encoding, callback)` methods as you would with a Readable or
Writable stream class. Writable stream class.
Since JavaScript doesn't have multiple prototypal inheritance, this Since JavaScript doesn't have multiple prototypal inheritance, this
class prototypally inherits from Readable, and then parasitically from class prototypally inherits from Readable, and then parasitically from
Writable. It is thus up to the user to implement both the lowlevel Writable. It is thus up to the user to implement both the lowlevel
`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method `_read(n)` method as well as the lowlevel `_write(chunk, encoding, cb)` method
on extension duplex classes. on extension duplex classes.
### new stream.Duplex(options) ### new stream.Duplex(options)
@ -589,9 +598,12 @@ In classes that extend the Transform class, make sure to call the
constructor so that the buffering settings can be properly constructor so that the buffering settings can be properly
initialized. initialized.
### transform.\_transform(chunk, callback) ### transform.\_transform(chunk, encoding, callback)
* `chunk` {Buffer} The chunk to be transformed. * `chunk` {Buffer | String} The chunk to be transformed. Will always
be a buffer unless the `decodeStrings` option was set to `false`.
* `encoding` {String} If the chunk is a string, then this is the
encoding type. (Ignore if `decodeStrings` chunk is a buffer.)
* `callback` {Function} Call this function (optionally with an error * `callback` {Function} Call this function (optionally with an error
argument) when you are done processing the supplied chunk. argument) when you are done processing the supplied chunk.
@ -671,7 +683,7 @@ function SimpleProtocol(options) {
SimpleProtocol.prototype = Object.create( SimpleProtocol.prototype = Object.create(
Transform.prototype, { constructor: { value: SimpleProtocol }}); Transform.prototype, { constructor: { value: SimpleProtocol }});
SimpleProtocol.prototype._transform = function(chunk, done) { SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) { if (!this._inBody) {
// check if the chunk has a \n\n // check if the chunk has a \n\n
var split = -1; var split = -1;

View File

@ -36,6 +36,6 @@ function PassThrough(options) {
Transform.call(this, options); Transform.call(this, options);
} }
PassThrough.prototype._transform = function(chunk, cb) { PassThrough.prototype._transform = function(chunk, encoding, cb) {
cb(null, chunk); cb(null, chunk);
}; };

View File

@ -155,10 +155,11 @@ Transform.prototype._transform = function(chunk, output, cb) {
throw new Error('not implemented'); throw new Error('not implemented');
}; };
Transform.prototype._write = function(chunk, cb) { Transform.prototype._write = function(chunk, encoding, cb) {
var ts = this._transformState; var ts = this._transformState;
ts.writecb = cb; ts.writecb = cb;
ts.writechunk = chunk; ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) { if (!ts.transforming) {
var rs = this._readableState; var rs = this._readableState;
if (ts.needTransform || if (ts.needTransform ||
@ -176,7 +177,7 @@ Transform.prototype._read = function(n) {
if (ts.writechunk && ts.writecb && !ts.transforming) { if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true; ts.transforming = true;
this._transform(ts.writechunk, ts.afterTransform); this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else { } else {
// mark that we need a transform, so that any data that comes in // mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it. // will get processed, now that we've asked for it.

View File

@ -146,15 +146,6 @@ function validChunk(stream, state, chunk, cb) {
return valid; return valid;
} }
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
chunk = new Buffer(chunk, encoding);
}
return chunk;
}
Writable.prototype.write = function(chunk, encoding, cb) { Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState; var state = this._writableState;
var ret = false; var ret = false;
@ -177,6 +168,15 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret; return ret;
}; };
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
chunk = new Buffer(chunk, encoding);
}
return chunk;
}
// if we're already writing something, then just put this // if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write // in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag. // If we return false, then we need a drain event, so set that flag.
@ -184,17 +184,13 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding); chunk = decodeChunk(state, chunk, encoding);
var len = state.objectMode ? 1 : chunk.length; var len = state.objectMode ? 1 : chunk.length;
// XXX Remove. _write() should take an encoding.
if (state.decodeStrings === false)
chunk = [chunk, encoding];
state.length += len; state.length += len;
var ret = state.length < state.highWaterMark; var ret = state.length < state.highWaterMark;
state.needDrain = !ret; state.needDrain = !ret;
if (state.writing) if (state.writing)
state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb] state.buffer.push([chunk, encoding, cb]);
else else
doWrite(stream, state, len, chunk, encoding, cb); doWrite(stream, state, len, chunk, encoding, cb);
@ -206,8 +202,7 @@ function doWrite(stream, state, len, chunk, encoding, cb) {
state.writecb = cb; state.writecb = cb;
state.writing = true; state.writing = true;
state.sync = true; state.sync = true;
// XXX stream._write(chunk, encoding, state.onwrite) stream._write(chunk, encoding, state.onwrite);
stream._write(chunk, state.onwrite);
state.sync = false; state.sync = false;
} }
@ -271,21 +266,12 @@ function onwriteDrain(stream, state) {
function clearBuffer(stream, state) { function clearBuffer(stream, state) {
state.bufferProcessing = true; state.bufferProcessing = true;
// XXX buffer entry should be [chunk, encoding, cb]
for (var c = 0; c < state.buffer.length; c++) { for (var c = 0; c < state.buffer.length; c++) {
var chunkCb = state.buffer[c]; var entry = state.buffer[c];
var chunk = chunkCb[0]; var chunk = entry[0];
var cb = chunkCb[1]; var encoding = entry[1];
var encoding = ''; var cb = entry[2];
var len; var len = state.objectMode ? 1 : chunk.length;
if (state.objectMode)
len = 1;
else if (false === state.decodeStrings) {
len = chunk[0].length;
encoding = chunk[1];
} else
len = chunk.length;
doWrite(stream, state, len, chunk, encoding, cb); doWrite(stream, state, len, chunk, encoding, cb);
@ -306,10 +292,8 @@ function clearBuffer(stream, state) {
state.buffer.length = 0; state.buffer.length = 0;
} }
Writable.prototype._write = function(chunk, cb) { Writable.prototype._write = function(chunk, encoding, cb) {
process.nextTick(function() { cb(new Error('not implemented'));
cb(new Error('not implemented'));
});
}; };
Writable.prototype.end = function(chunk, encoding, cb) { Writable.prototype.end = function(chunk, encoding, cb) {

View File

@ -160,8 +160,8 @@ function Hash(algorithm, options) {
util.inherits(Hash, stream.Transform); util.inherits(Hash, stream.Transform);
Hash.prototype._transform = function(chunk, callback) { Hash.prototype._transform = function(chunk, encoding, callback) {
this._binding.update(chunk); this._binding.update(chunk, encoding);
callback(); callback();
}; };
@ -226,8 +226,8 @@ function Cipher(cipher, password, options) {
util.inherits(Cipher, stream.Transform); util.inherits(Cipher, stream.Transform);
Cipher.prototype._transform = function(chunk, callback) { Cipher.prototype._transform = function(chunk, encoding, callback) {
this.push(this._binding.update(chunk)); this.push(this._binding.update(chunk, encoding));
callback(); callback();
}; };
@ -351,8 +351,8 @@ function Sign(algorithm, options) {
util.inherits(Sign, stream.Writable); util.inherits(Sign, stream.Writable);
Sign.prototype._write = function(chunk, callback) { Sign.prototype._write = function(chunk, encoding, callback) {
this._binding.update(chunk); this._binding.update(chunk, encoding);
callback(); callback();
}; };

View File

@ -1650,12 +1650,14 @@ WriteStream.prototype.open = function() {
}; };
WriteStream.prototype._write = function(data, cb) { WriteStream.prototype._write = function(data, encoding, cb) {
if (!Buffer.isBuffer(data)) if (!Buffer.isBuffer(data))
return this.emit('error', new Error('Invalid data')); return this.emit('error', new Error('Invalid data'));
if (typeof this.fd !== 'number') if (typeof this.fd !== 'number')
return this.once('open', this._write.bind(this, data, cb)); return this.once('open', function() {
this._write(data, encoding, cb);
});
var self = this; var self = this;
fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {

View File

@ -161,7 +161,8 @@ function Socket(options) {
initSocketHandle(this); initSocketHandle(this);
this._pendingWrite = null; this._pendingData = null;
this._pendingEncoding = '';
// handle strings directly // handle strings directly
this._writableState.decodeStrings = false; this._writableState.decodeStrings = false;
@ -583,22 +584,20 @@ Socket.prototype.write = function(chunk, encoding, cb) {
}; };
Socket.prototype._write = function(dataEncoding, cb) { Socket.prototype._write = function(data, encoding, cb) {
// assert(Array.isArray(dataEncoding));
var data = dataEncoding[0];
var encoding = dataEncoding[1] || 'utf8';
// If we are still connecting, then buffer this for later. // If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while // The Writable logic will buffer up any more writes while
// waiting for this one to be done. // waiting for this one to be done.
if (this._connecting) { if (this._connecting) {
this._pendingWrite = dataEncoding; this._pendingData = data;
this._pendingEncoding = encoding;
this.once('connect', function() { this.once('connect', function() {
this._write(dataEncoding, cb); this._write(data, encoding, cb);
}); });
return; return;
} }
this._pendingWrite = null; this._pendingData = null;
this._pendingEncoding = '';
timers.active(this); timers.active(this);
@ -651,15 +650,16 @@ function createWriteReq(handle, data, encoding) {
Socket.prototype.__defineGetter__('bytesWritten', function() { Socket.prototype.__defineGetter__('bytesWritten', function() {
var bytes = this._bytesDispatched, var bytes = this._bytesDispatched,
state = this._writableState, state = this._writableState,
pending = this._pendingWrite; data = this._pendingData,
encoding = this._pendingEncoding;
state.buffer.forEach(function(el) { state.buffer.forEach(function(el) {
el = el[0]; el = el[0];
bytes += Buffer.byteLength(el[0], el[1]); bytes += Buffer.byteLength(el[0], el[1]);
}); });
if (pending) if (data)
bytes += Buffer.byteLength(pending[0], pending[1]); bytes += Buffer.byteLength(data, encoding);
return bytes; return bytes;
}); });

View File

@ -239,6 +239,7 @@ function CryptoStream(pair, options) {
this.pair = pair; this.pair = pair;
this._pending = null; this._pending = null;
this._pendingEncoding = '';
this._pendingCallback = null; this._pendingCallback = null;
this._doneFlag = false; this._doneFlag = false;
this._resumingSession = false; this._resumingSession = false;
@ -300,7 +301,7 @@ function onCryptoStreamEnd() {
} }
CryptoStream.prototype._write = function write(data, cb) { CryptoStream.prototype._write = function write(data, encoding, cb) {
assert(this._pending === null); assert(this._pending === null);
// Black-hole data // Black-hole data
@ -361,6 +362,7 @@ CryptoStream.prototype._write = function write(data, cb) {
// No write has happened // No write has happened
this._pending = data; this._pending = data;
this._pendingEncoding = encoding;
this._pendingCallback = cb; this._pendingCallback = cb;
if (this === this.pair.cleartext) { if (this === this.pair.cleartext) {
@ -373,11 +375,13 @@ CryptoStream.prototype._write = function write(data, cb) {
CryptoStream.prototype._writePending = function writePending() { CryptoStream.prototype._writePending = function writePending() {
var data = this._pending, var data = this._pending,
encoding = this._pendingEncoding,
cb = this._pendingCallback; cb = this._pendingCallback;
this._pending = null; this._pending = null;
this._pendingEncoding = '';
this._pendingCallback = null; this._pendingCallback = null;
this._write(data, cb); this._write(data, encoding, cb);
}; };

View File

@ -309,7 +309,7 @@ Zlib.prototype.reset = function reset() {
}; };
Zlib.prototype._flush = function(callback) { Zlib.prototype._flush = function(callback) {
this._transform(null, callback); this._transform(null, '', callback);
}; };
Zlib.prototype.flush = function(callback) { Zlib.prototype.flush = function(callback) {
@ -343,7 +343,7 @@ Zlib.prototype.close = function(callback) {
}); });
}; };
Zlib.prototype._transform = function(chunk, cb) { Zlib.prototype._transform = function(chunk, encoding, cb) {
var flushFlag; var flushFlag;
var ws = this._writableState; var ws = this._writableState;
var ending = ws.ending || ws.ended; var ending = ws.ending || ws.ended;

View File

@ -29,7 +29,7 @@ r._read = function(size) {
}; };
var w = new stream.Writable(); var w = new stream.Writable();
w._write = function(data, cb) { w._write = function(data, encoding, cb) {
cb(null); cb(null);
}; };

View File

@ -261,7 +261,7 @@ test('high watermark push', function(t) {
test('can write objects to stream', function(t) { test('can write objects to stream', function(t) {
var w = new Writable({ objectMode: true }); var w = new Writable({ objectMode: true });
w._write = function(chunk, cb) { w._write = function(chunk, encoding, cb) {
assert.deepEqual(chunk, { foo: 'bar' }); assert.deepEqual(chunk, { foo: 'bar' });
cb(); cb();
}; };
@ -278,7 +278,7 @@ test('can write multiple objects to stream', function(t) {
var w = new Writable({ objectMode: true }); var w = new Writable({ objectMode: true });
var list = []; var list = [];
w._write = function(chunk, cb) { w._write = function(chunk, encoding, cb) {
list.push(chunk); list.push(chunk);
cb(); cb();
}; };
@ -303,7 +303,7 @@ test('can write strings as objects', function(t) {
}); });
var list = []; var list = [];
w._write = function(chunk, cb) { w._write = function(chunk, encoding, cb) {
list.push(chunk); list.push(chunk);
process.nextTick(cb); process.nextTick(cb);
}; };
@ -328,7 +328,7 @@ test('buffers finish until cb is called', function(t) {
}); });
var called = false; var called = false;
w._write = function(chunk, cb) { w._write = function(chunk, encoding, cb) {
assert.equal(chunk, 'foo'); assert.equal(chunk, 'foo');
process.nextTick(function() { process.nextTick(function() {

View File

@ -40,7 +40,7 @@ var stream = require('stream');
}; };
var dest = new stream.Writable(); var dest = new stream.Writable();
dest._write = function(chunk, cb) { dest._write = function(chunk, encoding, cb) {
cb(); cb();
}; };
@ -80,7 +80,7 @@ var stream = require('stream');
}; };
var dest = new stream.Writable(); var dest = new stream.Writable();
dest._write = function(chunk, cb) { dest._write = function(chunk, encoding, cb) {
cb(); cb();
}; };

View File

@ -90,9 +90,9 @@ var expectWritten =
'asdfgasdfgasdfgasdfg', 'asdfgasdfgasdfgasdfg',
'asdfgasdfgasdfgasdfg' ]; 'asdfgasdfgasdfgasdfg' ];
writer._write = function(chunk, cb) { writer._write = function(chunk, encoding, cb) {
console.error('WRITE %s', chunk[0]); console.error('WRITE %s', chunk);
written.push(chunk[0]); written.push(chunk);
process.nextTick(cb); process.nextTick(cb);
}; };

View File

@ -61,7 +61,7 @@ function child0() {
Writable.call(this, opts); Writable.call(this, opts);
} }
W.prototype._write = function(chunk, cb) { W.prototype._write = function(chunk, encoding, cb) {
var req = handle.writeUtf8String(chunk.toString() + '\n'); var req = handle.writeUtf8String(chunk.toString() + '\n');
// here's the problem. // here's the problem.
// it needs to tell the Writable machinery that it's ok to write // it needs to tell the Writable machinery that it's ok to write

View File

@ -67,7 +67,7 @@ test('writable side consumption', function(t) {
}); });
var transformed = 0; var transformed = 0;
tx._transform = function(chunk, cb) { tx._transform = function(chunk, encoding, cb) {
transformed += chunk.length; transformed += chunk.length;
tx.push(chunk); tx.push(chunk);
cb(); cb();
@ -106,7 +106,7 @@ test('passthrough', function(t) {
test('simple transform', function(t) { test('simple transform', function(t) {
var pt = new Transform; var pt = new Transform;
pt._transform = function(c, cb) { pt._transform = function(c, e, cb) {
var ret = new Buffer(c.length); var ret = new Buffer(c.length);
ret.fill('x'); ret.fill('x');
pt.push(ret); pt.push(ret);
@ -128,7 +128,7 @@ test('simple transform', function(t) {
test('async passthrough', function(t) { test('async passthrough', function(t) {
var pt = new Transform; var pt = new Transform;
pt._transform = function(chunk, cb) { pt._transform = function(chunk, encoding, cb) {
setTimeout(function() { setTimeout(function() {
pt.push(chunk); pt.push(chunk);
cb(); cb();
@ -154,7 +154,7 @@ test('assymetric transform (expand)', function(t) {
var pt = new Transform; var pt = new Transform;
// emit each chunk 2 times. // emit each chunk 2 times.
pt._transform = function(chunk, cb) { pt._transform = function(chunk, encoding, cb) {
setTimeout(function() { setTimeout(function() {
pt.push(chunk); pt.push(chunk);
setTimeout(function() { setTimeout(function() {
@ -189,7 +189,7 @@ test('assymetric transform (compress)', function(t) {
// or whatever's left. // or whatever's left.
pt.state = ''; pt.state = '';
pt._transform = function(chunk, cb) { pt._transform = function(chunk, encoding, cb) {
if (!chunk) if (!chunk)
chunk = ''; chunk = '';
var s = chunk.toString(); var s = chunk.toString();
@ -359,7 +359,7 @@ test('passthrough facaded', function(t) {
test('object transform (json parse)', function(t) { test('object transform (json parse)', function(t) {
console.error('json parse stream'); console.error('json parse stream');
var jp = new Transform({ objectMode: true }); var jp = new Transform({ objectMode: true });
jp._transform = function(data, cb) { jp._transform = function(data, encoding, cb) {
try { try {
jp.push(JSON.parse(data)); jp.push(JSON.parse(data));
cb(); cb();
@ -399,7 +399,7 @@ test('object transform (json parse)', function(t) {
test('object transform (json stringify)', function(t) { test('object transform (json stringify)', function(t) {
console.error('json parse stream'); console.error('json parse stream');
var js = new Transform({ objectMode: true }); var js = new Transform({ objectMode: true });
js._transform = function(data, cb) { js._transform = function(data, encoding, cb) {
try { try {
js.push(JSON.stringify(data)); js.push(JSON.stringify(data));
cb(); cb();

View File

@ -33,7 +33,7 @@ function TestWriter() {
this.written = 0; this.written = 0;
} }
TestWriter.prototype._write = function(chunk, cb) { TestWriter.prototype._write = function(chunk, encoding, cb) {
// simulate a small unpredictable latency // simulate a small unpredictable latency
setTimeout(function() { setTimeout(function() {
this.buffer.push(chunk.toString()); this.buffer.push(chunk.toString());
@ -186,11 +186,10 @@ test('write no bufferize', function(t) {
decodeStrings: false decodeStrings: false
}); });
tw._write = function(chunk, cb) { tw._write = function(chunk, encoding, cb) {
assert(Array.isArray(chunk)); assert(typeof chunk === 'string');
assert(typeof chunk[0] === 'string'); chunk = new Buffer(chunk, encoding);
chunk = new Buffer(chunk[0], chunk[1]); return TestWriter.prototype._write.call(this, chunk, encoding, cb);
return TestWriter.prototype._write.call(this, chunk, cb);
}; };
var encodings = var encodings =
@ -279,7 +278,7 @@ test('end callback after .write() call', function (t) {
test('encoding should be ignored for buffers', function(t) { test('encoding should be ignored for buffers', function(t) {
var tw = new W(); var tw = new W();
var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb';
tw._write = function(chunk, cb) { tw._write = function(chunk, encoding, cb) {
t.equal(chunk.toString('hex'), hex); t.equal(chunk.toString('hex'), hex);
t.end(); t.end();
}; };