From d1b78c3f5d948728dcee852a9c2bfd59f4e01fbd Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 27 Mar 2010 14:07:43 -0700 Subject: [PATCH] More efficient Stream.write --- lib/net.js | 401 +++++++++++++++++++++++++---------------------------- 1 file changed, 186 insertions(+), 215 deletions(-) diff --git a/lib/net.js b/lib/net.js index c2d5e532d25..1fa358e4dcb 100644 --- a/lib/net.js +++ b/lib/net.js @@ -40,7 +40,7 @@ var needsLookup = binding.needsLookup; var errnoException = binding.errnoException; var EINPROGRESS = binding.EINPROGRESS; var ENOENT = binding.ENOENT; -var END_OF_FILE = 0; +var END_OF_FILE = 42; // IDLE TIMEOUTS @@ -254,11 +254,7 @@ function _doFlush () { var socket = this.socket; // Stream becomes writeable on connect() but don't flush if there's // nothing actually to write - if (socket._writeQueueSize == 0) { - return; - } if (socket.flush()) { - assert(socket._writeQueueSize == 0); if (socket._events && socket._events['drain']) socket.emit("drain"); if (socket.ondrain) socket.ondrain(); // Optimization } @@ -280,7 +276,7 @@ function initStream (self) { allocNewPool(); } - debug('pool.used ' + pool.used); + //debug('pool.used ' + pool.used); var bytesRead; try { @@ -293,7 +289,7 @@ function initStream (self) { return; } - debug('bytesRead ' + bytesRead + '\n'); + //debug('bytesRead ' + bytesRead + '\n'); if (bytesRead === 0) { self.readable = false; @@ -342,9 +338,10 @@ function initStream (self) { }; self.readable = false; - self._writeQueue = []; // queue of buffers that need to be written to socket - // XXX use link list? - self._writeQueueSize = 0; // in bytes, not to be confused with _writeQueue.length! + // queue of buffers that need to be written to socket + // XXX use link list? + self._writeQueue = []; + self._writeQueueEncoding = []; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; @@ -401,66 +398,193 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -Stream.prototype._allocateSendBuffer = function () { - var b = buffers.alloc(1024); - b.used = 0; - b.sent = 0; - b.isMsg = false; - this._writeQueue.push(b); - return b; -}; +// Here's the deal. Character encodings are hard. We need to take javascript +// strings and turn them into raw binary to send them to socket. Javascript +// strings are pure unicode (I think V8 uses 16-bit arrays to hold them). +// So an encoding needs to be given to write it out to socket - this is +// usually 'utf8'. +// +// This function, encodeString, takes a buffer and writes the string to it +// starting at buffer.used. If it could fit the entire string into the +// buffer then it increases the buffer's .used member and returns buffer. +// Otherwise it creates a new buffer large enough to fit the entire string, +// writes that string into the new buffer, and then returns it. +function encodeString (buffer, string, encoding) { + encoding = (encoding || 'utf8').toLowerCase(); + var bytesWritten; - -Stream.prototype._writeString = function (data, encoding) { - var self = this; - if (!self.writable) throw new Error('Stream is not writable'); - var buffer; - - if (self._writeQueue.length == 0) { - buffer = self._allocateSendBuffer(); - } else { - buffer = self.__writeQueueLast(); - if (buffer.length - buffer.used < 64) { - buffer = self._allocateSendBuffer(); + if (string.length < buffer.length - buffer.used) { + // Try to write + if (encoding == 'utf8' || encoding == 'utf-8') { + bytesWritten = buffer.utf8Write(string, buffer.used); + debug('wrote ' + bytesWritten + ' utf8 bytes to buffer'); + if (buffer[bytesWritten-1] == 0) { + // wrote the whole string. + buffer.used += bytesWritten-1; + return buffer; + } + } else { + if (encoding == 'ascii') { + bytesWritten = buffer.asciiWrite(string, buffer.used); + buffer.used += bytesWritten; // bytesWritten-1 ? + } else { + bytesWritten = buffer.binaryWrite(string, buffer.used); + buffer.used += bytesWritten; + } + return buffer; } } - encoding = (encoding || 'utf8').toLowerCase(); // default to utf8 + // Couldn't fit the string in the given buffer. Instead of partially + // writing it and then slicing the string, we'll do something stupid and + // just create a new temporary buffer just for that string. + // (The reasoning is: slicing is expensive.) - var charsWritten; - var bytesWritten; + var byteLength = Buffer.byteLength(string, encoding); + var newBuffer = new Buffer(byteLength); + debug('alloced new buffer for string : ' + newBuffer.length); - if (encoding == 'utf8') { - charsWritten = buffer.utf8Write(data, buffer.used); - bytesWritten = Buffer.byteLength(data.slice(0, charsWritten)); + if (encoding == 'utf8' || encoding == 'utf-8') { + bytesWritten = newBuffer.utf8Write(string, 0); } else if (encoding == 'ascii') { - // ascii - charsWritten = buffer.asciiWrite(data, buffer.used); - bytesWritten = charsWritten; + bytesWritten = newBuffer.asciiWrite(string, 0); } else { - // binary - charsWritten = buffer.binaryWrite(data, buffer.used); - bytesWritten = charsWritten; + bytesWritten = newBuffer.binaryWrite(string, 0); } - buffer.used += bytesWritten; - self._writeQueueSize += bytesWritten; + debug('filled up new buffer'); - debug('charsWritten ' + charsWritten); - debug('buffer.used ' + buffer.used); - // If we didn't finish, then recurse with the rest of the string. - if (charsWritten < data.length) { - debug('recursive write'); - self._writeString(data.slice(charsWritten), encoding); + assert(bytesWritten == byteLength); + + newBuffer.used = byteLength; + newBuffer.sent = 0; + + return newBuffer; +} + + +// Returns true if all the data was flushed to socket. Returns false if +// something was queued. If data was queued, then the "drain" event will +// signal when it has been finally flushed to socket. +Stream.prototype.write = function (data, encoding) { + if (this._writeQueue && this._writeQueue.length) { + // Slow. There is already a write queue, so let's append to it. + if (this._writeQueueLast() == END_OF_FILE) { + throw new Error('Stream.close() called already; cannot write.'); + } + this._writeQueue.push(data); // TODO if string of the same encoding concat? + this._writeQueueEncoding.push(encoding); + return false; + } else { + // Fast. + return this._writeOut(data, encoding); } }; -Stream.prototype.__writeQueueLast = function () { - return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] - : null; +// Directly writes the data to socket. +// unshifts remainder onto _writeQueue. +Stream.prototype._writeOut = function (data, encoding) { + if (!this.writable) throw new Error('Stream is not writable'); + + // The most common case. There is no write queue. Just push the data + // directly to the socket. + + var buffer, off, len; + + if (typeof data == 'string') { + if (!pool) allocNewPool(); + var startOffset = pool.used; + buffer = encodeString(pool, data, encoding); + off = (buffer == pool ? startOffset : 0); + len = buffer.used - off; + } else { + buffer = data; + off = data.sent || 0; + len = data.length; + } + + debug('write [fd, off, len] =' + JSON.stringify([this.fd, off, len])); + + // Send the buffer. + + var bytesWritten; + + try { + bytesWritten = write(this.fd, buffer, off, len); + } catch (e) { + this.forceClose(e); + return false; + } + + debug('wrote ' + bytesWritten); + + timeout.active(this); + + if (bytesWritten == len) { + // awesome. sent to buffer - save that space + buffer.used -= len; + return true; + } + + //sys.error('write incomplete ' + bytesWritten + ' < ' + len); + + this._writeWatcher.start(); + + + if (buffer == data) { + //sys.error('string'); + bytesWritten = bytesWritten || 0; + data = buffer.slice(bytesWritten, len); + data.sent = 0; + data.used = data.length; + + } else if (buffer == pool) { + //sys.error('pool'); + data = pool.slice(off + bytesWritten, off + len); + data.sent = 0; + data.used = data.length; + + } else { + data = buffer; + data.sent = bytesWritten; + } + + assert(typeof data.used == 'number'); + assert(typeof data.sent == 'number'); + +// sys.error('data.used = ' + data.used); +// sys.error('data.sent = ' + data.sent); +// sys.error('bytes left, data.used - data.send = ' + (data.used - data.sent)); + //if (!this._writeQueue) initWriteStream(this); + + // data should be the next thing to write. + this._writeQueue.unshift(data); + this._writeQueueEncoding.unshift(null); + + return false; +} + + +// Flushes the write buffer out. +// Returns true if the entire buffer was flushed. +Stream.prototype.flush = function () { + while (this._writeQueue && this._writeQueue.length) { + var data = this._writeQueue.shift(); + var encoding = this._writeQueueEncoding.shift(); + + if (data == END_OF_FILE) { + this._shutdown(); + return true; + } + + var flushed = this._writeOut(data,encoding); + if (!flushed) return false; + } + if (this._writeWatcher) this._writeWatcher.stop(); + return true; }; @@ -468,170 +592,18 @@ Stream.prototype.send = function () { throw new Error('send renamed to write'); }; + +Stream.prototype._writeQueueLast = function () { + return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] + : null; +}; + + Stream.prototype.setEncoding = function (enc) { // TODO check values, error out on bad, and deprecation message? this._encoding = enc.toLowerCase(); }; -// Returns true if all the data was flushed to socket. Returns false if -// something was queued. If data was queued, then the "drain" event will -// signal when it has been finally flushed to socket. -Stream.prototype.write = function (data, encoding) { - var self = this; - - if (!self.writable) throw new Error('Stream is not writable'); - - if (self._writeQueue && self._writeQueue.length) { - // slow - // There is already a write queue, so let's append to it. - return self._queueWrite(data, encoding); - - } else { - // The most common case. There is no write queue. Just push the data - // directly to the socket. - - var bytesWritten; - var buffer = data, off = 0, len = data.length; - - if (typeof data == 'string') { - encoding = (encoding || 'utf8').toLowerCase(); - var bytes = Buffer.byteLength(data, encoding); - - if (!pool) allocNewPool(); - - if (pool.length - pool.used < bytes) { - // not enough room - go to slow case - return self._queueWrite(data, encoding); - } - - var charsWritten; - if (encoding == 'utf8') { - pool.utf8Write(data, pool.used); - } else if (encoding == 'ascii') { - // ascii - pool.asciiWrite(data, pool.used); - } else { - // binary - pool.binaryWrite(data, pool.used); - } - - buffer = pool; - off = pool.used; - len = bytes; - } - - //debug('write [fd, off, len] =' + JSON.stringify([self.fd, off, len])); - - // Send the buffer. - try { - bytesWritten = write(self.fd, buffer, off, len); - } catch (e) { - self.forceClose(e); - return false; - } - - //debug('wrote ' + bytesWritten); - - // Note: if using the pool - we don't need to increase - // pool.used because it was all sent. Just reuse that space. - - if (bytesWritten == len) return true; - - //debug('write incomplete ' + bytesWritten + ' < ' + len); - - if (buffer == data) { - data.sent = bytesWritten || 0; - data.used = data.length; - } else { - // string - pool.used += bytesWritten; - data = pool.slice(off+bytesWritten, off+len+bytesWritten); - data.sent = 0; - data.used = data.length; - } - - //if (!self._writeQueue) initWriteStream(self); - self._writeQueue.push(data); - self._writeQueueSize += data.used; - return false; - } -} - -Stream.prototype._queueWrite = function (data, encoding) { - //debug('_queueWrite'); - var self = this; - - if (self.__writeQueueLast() == END_OF_FILE) { - throw new Error('socket.close() called already; cannot write.'); - } - - if (typeof(data) == 'string') { - self._writeString(data, encoding); - } else { - // data is a Buffer - // walk through the _writeQueue, find the first empty buffer - //var inserted = false; - data.sent = 0; - data.used = data.length; - self._writeQueue.push(data); - self._writeQueueSize += data.used; - } - return this.flush(); -}; - -// Flushes the write buffer out. -// Returns true if the entire buffer was flushed. -Stream.prototype.flush = function () { - var self = this; - - var bytesWritten; - while (self._writeQueue.length) { - if (!self.writable) throw new Error('Stream is not writable'); - - var b = self._writeQueue[0]; - - if (b == END_OF_FILE) { - self._shutdown(); - return true; - } - - if (b.sent == b.used) { - // shift! - self._writeQueue.shift(); - buffers.free(b); - continue; - } - - var fdToSend = null; - - try { - bytesWritten = write(self.fd, - b, - b.sent, - b.used - b.sent); - } catch (e) { - self.forceClose(e); - return false; - } - - timeout.active(self); - - if (bytesWritten === null) { - // EAGAIN - debug('write EAGAIN'); - self._writeWatcher.start(); - assert(self._writeQueueSize > 0); - return false; - } else { - b.sent += bytesWritten; - self._writeQueueSize -= bytesWritten; - //debug('bytes sent: ' + b.sent); - } - } - if (self._writeWatcher) self._writeWatcher.stop(); - return true; -}; - function doConnect (socket, port, host) { try { @@ -752,9 +724,8 @@ Stream.prototype.forceClose = function (exception) { timeout.unenroll(this); // FIXME Bug when this.fd == 0 - if (this.fd) { + if (typeof this.fd == 'number') { close(this.fd); - //debug('close ' + this.fd); this.fd = null; process.nextTick(function () { if (exception) self.emit('error', exception); @@ -782,7 +753,7 @@ Stream.prototype._shutdown = function () { Stream.prototype.close = function () { if (this.writable) { - if (this.__writeQueueLast() != END_OF_FILE) { + if (this._writeQueueLast() != END_OF_FILE) { this._writeQueue.push(END_OF_FILE); this.flush(); }