More efficient Stream.write

This commit is contained in:
Ryan Dahl 2010-03-27 14:07:43 -07:00
parent aa6a785286
commit d1b78c3f5d

View File

@ -40,7 +40,7 @@ var needsLookup = binding.needsLookup;
var errnoException = binding.errnoException;
var EINPROGRESS = binding.EINPROGRESS;
var ENOENT = binding.ENOENT;
var END_OF_FILE = 0;
var END_OF_FILE = 42;
// IDLE TIMEOUTS
@ -254,11 +254,7 @@ function _doFlush () {
var socket = this.socket;
// Stream becomes writeable on connect() but don't flush if there's
// nothing actually to write
if (socket._writeQueueSize == 0) {
return;
}
if (socket.flush()) {
assert(socket._writeQueueSize == 0);
if (socket._events && socket._events['drain']) socket.emit("drain");
if (socket.ondrain) socket.ondrain(); // Optimization
}
@ -280,7 +276,7 @@ function initStream (self) {
allocNewPool();
}
debug('pool.used ' + pool.used);
//debug('pool.used ' + pool.used);
var bytesRead;
try {
@ -293,7 +289,7 @@ function initStream (self) {
return;
}
debug('bytesRead ' + bytesRead + '\n');
//debug('bytesRead ' + bytesRead + '\n');
if (bytesRead === 0) {
self.readable = false;
@ -342,9 +338,10 @@ function initStream (self) {
};
self.readable = false;
self._writeQueue = []; // queue of buffers that need to be written to socket
// XXX use link list?
self._writeQueueSize = 0; // in bytes, not to be confused with _writeQueue.length!
// queue of buffers that need to be written to socket
// XXX use link list?
self._writeQueue = [];
self._writeQueueEncoding = [];
self._writeWatcher = ioWatchers.alloc();
self._writeWatcher.socket = self;
@ -401,66 +398,193 @@ Object.defineProperty(Stream.prototype, 'readyState', {
});
Stream.prototype._allocateSendBuffer = function () {
var b = buffers.alloc(1024);
b.used = 0;
b.sent = 0;
b.isMsg = false;
this._writeQueue.push(b);
return b;
};
// Here's the deal. Character encodings are hard. We need to take javascript
// strings and turn them into raw binary to send them to socket. Javascript
// strings are pure unicode (I think V8 uses 16-bit arrays to hold them).
// So an encoding needs to be given to write it out to socket - this is
// usually 'utf8'.
//
// This function, encodeString, takes a buffer and writes the string to it
// starting at buffer.used. If it could fit the entire string into the
// buffer then it increases the buffer's .used member and returns buffer.
// Otherwise it creates a new buffer large enough to fit the entire string,
// writes that string into the new buffer, and then returns it.
function encodeString (buffer, string, encoding) {
encoding = (encoding || 'utf8').toLowerCase();
var bytesWritten;
Stream.prototype._writeString = function (data, encoding) {
var self = this;
if (!self.writable) throw new Error('Stream is not writable');
var buffer;
if (self._writeQueue.length == 0) {
buffer = self._allocateSendBuffer();
} else {
buffer = self.__writeQueueLast();
if (buffer.length - buffer.used < 64) {
buffer = self._allocateSendBuffer();
if (string.length < buffer.length - buffer.used) {
// Try to write
if (encoding == 'utf8' || encoding == 'utf-8') {
bytesWritten = buffer.utf8Write(string, buffer.used);
debug('wrote ' + bytesWritten + ' utf8 bytes to buffer');
if (buffer[bytesWritten-1] == 0) {
// wrote the whole string.
buffer.used += bytesWritten-1;
return buffer;
}
} else {
if (encoding == 'ascii') {
bytesWritten = buffer.asciiWrite(string, buffer.used);
buffer.used += bytesWritten; // bytesWritten-1 ?
} else {
bytesWritten = buffer.binaryWrite(string, buffer.used);
buffer.used += bytesWritten;
}
return buffer;
}
}
encoding = (encoding || 'utf8').toLowerCase(); // default to utf8
// Couldn't fit the string in the given buffer. Instead of partially
// writing it and then slicing the string, we'll do something stupid and
// just create a new temporary buffer just for that string.
// (The reasoning is: slicing is expensive.)
var charsWritten;
var bytesWritten;
var byteLength = Buffer.byteLength(string, encoding);
var newBuffer = new Buffer(byteLength);
debug('alloced new buffer for string : ' + newBuffer.length);
if (encoding == 'utf8') {
charsWritten = buffer.utf8Write(data, buffer.used);
bytesWritten = Buffer.byteLength(data.slice(0, charsWritten));
if (encoding == 'utf8' || encoding == 'utf-8') {
bytesWritten = newBuffer.utf8Write(string, 0);
} else if (encoding == 'ascii') {
// ascii
charsWritten = buffer.asciiWrite(data, buffer.used);
bytesWritten = charsWritten;
bytesWritten = newBuffer.asciiWrite(string, 0);
} else {
// binary
charsWritten = buffer.binaryWrite(data, buffer.used);
bytesWritten = charsWritten;
bytesWritten = newBuffer.binaryWrite(string, 0);
}
buffer.used += bytesWritten;
self._writeQueueSize += bytesWritten;
debug('filled up new buffer');
debug('charsWritten ' + charsWritten);
debug('buffer.used ' + buffer.used);
// If we didn't finish, then recurse with the rest of the string.
if (charsWritten < data.length) {
debug('recursive write');
self._writeString(data.slice(charsWritten), encoding);
assert(bytesWritten == byteLength);
newBuffer.used = byteLength;
newBuffer.sent = 0;
return newBuffer;
}
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Stream.prototype.write = function (data, encoding) {
if (this._writeQueue && this._writeQueue.length) {
// Slow. There is already a write queue, so let's append to it.
if (this._writeQueueLast() == END_OF_FILE) {
throw new Error('Stream.close() called already; cannot write.');
}
this._writeQueue.push(data); // TODO if string of the same encoding concat?
this._writeQueueEncoding.push(encoding);
return false;
} else {
// Fast.
return this._writeOut(data, encoding);
}
};
Stream.prototype.__writeQueueLast = function () {
return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null;
// Directly writes the data to socket.
// unshifts remainder onto _writeQueue.
Stream.prototype._writeOut = function (data, encoding) {
if (!this.writable) throw new Error('Stream is not writable');
// The most common case. There is no write queue. Just push the data
// directly to the socket.
var buffer, off, len;
if (typeof data == 'string') {
if (!pool) allocNewPool();
var startOffset = pool.used;
buffer = encodeString(pool, data, encoding);
off = (buffer == pool ? startOffset : 0);
len = buffer.used - off;
} else {
buffer = data;
off = data.sent || 0;
len = data.length;
}
debug('write [fd, off, len] =' + JSON.stringify([this.fd, off, len]));
// Send the buffer.
var bytesWritten;
try {
bytesWritten = write(this.fd, buffer, off, len);
} catch (e) {
this.forceClose(e);
return false;
}
debug('wrote ' + bytesWritten);
timeout.active(this);
if (bytesWritten == len) {
// awesome. sent to buffer - save that space
buffer.used -= len;
return true;
}
//sys.error('write incomplete ' + bytesWritten + ' < ' + len);
this._writeWatcher.start();
if (buffer == data) {
//sys.error('string');
bytesWritten = bytesWritten || 0;
data = buffer.slice(bytesWritten, len);
data.sent = 0;
data.used = data.length;
} else if (buffer == pool) {
//sys.error('pool');
data = pool.slice(off + bytesWritten, off + len);
data.sent = 0;
data.used = data.length;
} else {
data = buffer;
data.sent = bytesWritten;
}
assert(typeof data.used == 'number');
assert(typeof data.sent == 'number');
// sys.error('data.used = ' + data.used);
// sys.error('data.sent = ' + data.sent);
// sys.error('bytes left, data.used - data.send = ' + (data.used - data.sent));
//if (!this._writeQueue) initWriteStream(this);
// data should be the next thing to write.
this._writeQueue.unshift(data);
this._writeQueueEncoding.unshift(null);
return false;
}
// Flushes the write buffer out.
// Returns true if the entire buffer was flushed.
Stream.prototype.flush = function () {
while (this._writeQueue && this._writeQueue.length) {
var data = this._writeQueue.shift();
var encoding = this._writeQueueEncoding.shift();
if (data == END_OF_FILE) {
this._shutdown();
return true;
}
var flushed = this._writeOut(data,encoding);
if (!flushed) return false;
}
if (this._writeWatcher) this._writeWatcher.stop();
return true;
};
@ -468,170 +592,18 @@ Stream.prototype.send = function () {
throw new Error('send renamed to write');
};
Stream.prototype._writeQueueLast = function () {
return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
: null;
};
Stream.prototype.setEncoding = function (enc) {
// TODO check values, error out on bad, and deprecation message?
this._encoding = enc.toLowerCase();
};
// Returns true if all the data was flushed to socket. Returns false if
// something was queued. If data was queued, then the "drain" event will
// signal when it has been finally flushed to socket.
Stream.prototype.write = function (data, encoding) {
var self = this;
if (!self.writable) throw new Error('Stream is not writable');
if (self._writeQueue && self._writeQueue.length) {
// slow
// There is already a write queue, so let's append to it.
return self._queueWrite(data, encoding);
} else {
// The most common case. There is no write queue. Just push the data
// directly to the socket.
var bytesWritten;
var buffer = data, off = 0, len = data.length;
if (typeof data == 'string') {
encoding = (encoding || 'utf8').toLowerCase();
var bytes = Buffer.byteLength(data, encoding);
if (!pool) allocNewPool();
if (pool.length - pool.used < bytes) {
// not enough room - go to slow case
return self._queueWrite(data, encoding);
}
var charsWritten;
if (encoding == 'utf8') {
pool.utf8Write(data, pool.used);
} else if (encoding == 'ascii') {
// ascii
pool.asciiWrite(data, pool.used);
} else {
// binary
pool.binaryWrite(data, pool.used);
}
buffer = pool;
off = pool.used;
len = bytes;
}
//debug('write [fd, off, len] =' + JSON.stringify([self.fd, off, len]));
// Send the buffer.
try {
bytesWritten = write(self.fd, buffer, off, len);
} catch (e) {
self.forceClose(e);
return false;
}
//debug('wrote ' + bytesWritten);
// Note: if using the pool - we don't need to increase
// pool.used because it was all sent. Just reuse that space.
if (bytesWritten == len) return true;
//debug('write incomplete ' + bytesWritten + ' < ' + len);
if (buffer == data) {
data.sent = bytesWritten || 0;
data.used = data.length;
} else {
// string
pool.used += bytesWritten;
data = pool.slice(off+bytesWritten, off+len+bytesWritten);
data.sent = 0;
data.used = data.length;
}
//if (!self._writeQueue) initWriteStream(self);
self._writeQueue.push(data);
self._writeQueueSize += data.used;
return false;
}
}
Stream.prototype._queueWrite = function (data, encoding) {
//debug('_queueWrite');
var self = this;
if (self.__writeQueueLast() == END_OF_FILE) {
throw new Error('socket.close() called already; cannot write.');
}
if (typeof(data) == 'string') {
self._writeString(data, encoding);
} else {
// data is a Buffer
// walk through the _writeQueue, find the first empty buffer
//var inserted = false;
data.sent = 0;
data.used = data.length;
self._writeQueue.push(data);
self._writeQueueSize += data.used;
}
return this.flush();
};
// Flushes the write buffer out.
// Returns true if the entire buffer was flushed.
Stream.prototype.flush = function () {
var self = this;
var bytesWritten;
while (self._writeQueue.length) {
if (!self.writable) throw new Error('Stream is not writable');
var b = self._writeQueue[0];
if (b == END_OF_FILE) {
self._shutdown();
return true;
}
if (b.sent == b.used) {
// shift!
self._writeQueue.shift();
buffers.free(b);
continue;
}
var fdToSend = null;
try {
bytesWritten = write(self.fd,
b,
b.sent,
b.used - b.sent);
} catch (e) {
self.forceClose(e);
return false;
}
timeout.active(self);
if (bytesWritten === null) {
// EAGAIN
debug('write EAGAIN');
self._writeWatcher.start();
assert(self._writeQueueSize > 0);
return false;
} else {
b.sent += bytesWritten;
self._writeQueueSize -= bytesWritten;
//debug('bytes sent: ' + b.sent);
}
}
if (self._writeWatcher) self._writeWatcher.stop();
return true;
};
function doConnect (socket, port, host) {
try {
@ -752,9 +724,8 @@ Stream.prototype.forceClose = function (exception) {
timeout.unenroll(this);
// FIXME Bug when this.fd == 0
if (this.fd) {
if (typeof this.fd == 'number') {
close(this.fd);
//debug('close ' + this.fd);
this.fd = null;
process.nextTick(function () {
if (exception) self.emit('error', exception);
@ -782,7 +753,7 @@ Stream.prototype._shutdown = function () {
Stream.prototype.close = function () {
if (this.writable) {
if (this.__writeQueueLast() != END_OF_FILE) {
if (this._writeQueueLast() != END_OF_FILE) {
this._writeQueue.push(END_OF_FILE);
this.flush();
}