diff --git a/lib/net.js b/lib/net.js index 3c537e03370..e409545d050 100644 --- a/lib/net.js +++ b/lib/net.js @@ -92,16 +92,6 @@ function allocEmptyBuffer () { emptyBuffer.length = 0; } -function _doFlush () { - var socket = this.socket; - // Stream becomes writable on connect() but don't flush if there's - // nothing actually to write - if (socket.flush()) { - if (socket._events && socket._events['drain']) socket.emit("drain"); - if (socket.ondrain) socket.ondrain(); // Optimization - } -} - function setImplmentationMethods (self) { function noData(buf, off, len) { return !buf || @@ -225,7 +215,7 @@ function setImplmentationMethods (self) { if (self.secureStream.clearPending()) { process.nextTick(function () { - if (self._readWatcher) self._readWatcher.callback(); + if (self.readable) self._onReadable(true); }); } @@ -281,74 +271,28 @@ function setImplmentationMethods (self) { } }; + +function onReadable (readable, writeable) { + assert(this.socket); + var socket = this.socket; + socket._onReadable(); +} + + +function onWritable (readable, writeable) { + assert(this.socket); + var socket = this.socket; + if (socket._connecting) { + socket._onConnect(); + } else { + socket._onWritable(); + } +} + function initStream (self) { self._readWatcher = ioWatchers.alloc(); - self._readWatcher.callback = function () { - // If this is the first recv (pool doesn't exist) or we've used up - // most of the pool, allocate a new one. - if (!pool || pool.length - pool.used < kMinPoolSpace) { - // discard the old pool. Can't add to the free list because - // users might have refernces to slices on it. - pool = null; - allocNewPool(); - } - - //debug('pool.used ' + pool.used); - var bytesRead; - - try { - bytesRead = self._readImpl(pool, - pool.used, - pool.length - pool.used, - (arguments.length > 0)); - } catch (e) { - self.destroy(e); - return; - } - - // Note that some _readImpl() implementations return -1 bytes - // read as an indication not to do any processing on the result - // (but not an error). - - if (bytesRead === 0) { - self.readable = false; - self._readWatcher.stop(); - - if (!self.writable) self.destroy(); - // Note: 'close' not emitted until nextTick. - - if (!self.allowHalfOpen) self.end(); - if (self._events && self._events['end']) self.emit('end'); - if (self.onend) self.onend(); - } else if (bytesRead > 0) { - - require('timers').active(self); - - var start = pool.used; - var end = pool.used + bytesRead; - pool.used += bytesRead; - - if (self._decoder) { - // emit String - var string = self._decoder.write(pool.slice(start, end)); - if (string.length) self.emit('data', string); - } else { - // emit buffer - if (self._events && self._events['data']) { - // emit a slice - self.emit('data', pool.slice(start, end)); - } - } - - // Optimization: emit the original buffer with end points - if (self.ondata) self.ondata(pool, start, end); - } else if (bytesRead == -2) { - // Temporary fix - need SSL refactor. - // -2 originates from SecureStream::ReadExtract - self.destroy(new Error('openssl read error')); - return false; - } - }; + self._readWatcher.socket = self; + self._readWatcher.callback = onReadable; self.readable = false; // Queue of buffers and string that need to be written to socket. @@ -358,7 +302,7 @@ function initStream (self) { self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; - self._writeWatcher.callback = _doFlush; + self._writeWatcher.callback = onWritable; self.writable = false; } @@ -707,6 +651,7 @@ function doConnect (socket, port, host) { // Don't start the read watcher until connection is established socket._readWatcher.set(socket.fd, true, false); + socket._readWatcher.socket = socket; // How to connect on POSIX: Wait for fd to become writable, then call // socketError() if there isn't an error, we're connected. AFAIK this a @@ -715,37 +660,120 @@ function doConnect (socket, port, host) { // Manual Page connect(2) under the error code EINPROGRESS. socket._writeWatcher.set(socket.fd, false, true); socket._writeWatcher.start(); - socket._writeWatcher.callback = function () { - var errno = socketError(socket.fd); - if (errno == 0) { - // connection established - socket._connecting = false; - socket.resume(); - socket.readable = socket.writable = true; - socket._writeWatcher.callback = _doFlush; - try { - socket.emit('connect'); - } catch (e) { - socket.destroy(e); - return; - } - - - if (socket._writeQueue && socket._writeQueue.length) { - // Flush socket in case any writes are queued up while connecting. - // ugly - _doFlush.call(socket._writeWatcher); - } - - } else if (errno != EINPROGRESS) { - socket.destroy(errnoException(errno, 'connect')); - } - }; + socket._writeWatcher.socket = socket; + socket._writeWatcher.callback = onWritable; } + function toPort (x) { return (x = Number(x)) >= 0 ? x : false; } +Stream.prototype._onConnect = function () { + var errno = socketError(this.fd); + if (errno == 0) { + // connection established + this._connecting = false; + this.resume(); + this.readable = this.writable = true; + try { + this.emit('connect'); + } catch (e) { + this.destroy(e); + return; + } + + + if (this._writeQueue && this._writeQueue.length) { + // Flush this in case any writes are queued up while connecting. + this._onWritable(); + } + + } else if (errno != EINPROGRESS) { + this.destroy(errnoException(errno, 'connect')); + } +}; + + +Stream.prototype._onWritable = function () { + // Stream becomes writable on connect() but don't flush if there's + // nothing actually to write + if (this.flush()) { + if (this._events && this._events['drain']) this.emit("drain"); + if (this.ondrain) this.ondrain(); // Optimization + } +}; + + +Stream.prototype._onReadable = function () { + var self = this; + + // If this is the first recv (pool doesn't exist) or we've used up + // most of the pool, allocate a new one. + if (!pool || pool.length - pool.used < kMinPoolSpace) { + // discard the old pool. Can't add to the free list because + // users might have refernces to slices on it. + pool = null; + allocNewPool(); + } + + //debug('pool.used ' + pool.used); + var bytesRead; + + try { + bytesRead = self._readImpl(pool, + pool.used, + pool.length - pool.used, + !arguments[0] /* stupid calledByIOWatcher shit */); + } catch (e) { + self.destroy(e); + return; + } + + // Note that some _readImpl() implementations return -1 bytes + // read as an indication not to do any processing on the result + // (but not an error). + + if (bytesRead === 0) { + self.readable = false; + self._readWatcher.stop(); + + if (!self.writable) self.destroy(); + // Note: 'close' not emitted until nextTick. + + if (!self.allowHalfOpen) self.end(); + if (self._events && self._events['end']) self.emit('end'); + if (self.onend) self.onend(); + } else if (bytesRead > 0) { + + require('timers').active(self); + + var start = pool.used; + var end = pool.used + bytesRead; + pool.used += bytesRead; + + if (self._decoder) { + // emit String + var string = self._decoder.write(pool.slice(start, end)); + if (string.length) self.emit('data', string); + } else { + // emit buffer + if (self._events && self._events['data']) { + // emit a slice + self.emit('data', pool.slice(start, end)); + } + } + + // Optimization: emit the original buffer with end points + if (self.ondata) self.ondata(pool, start, end); + } else if (bytesRead == -2) { + // Temporary fix - need SSL refactor. + // -2 originates from SecureStream::ReadExtract + self.destroy(new Error('openssl read error')); + return false; + } +}; + + // var stream = new Stream(); // stream.connect(80) - TCP connect to port 80 on the localhost // stream.connect(80, 'nodejs.org') - TCP connect to port 80 on nodejs.org @@ -836,12 +864,14 @@ Stream.prototype.destroy = function (exception) { if (this._writeWatcher) { this._writeWatcher.stop(); + this._writeWatcher.socket = null; ioWatchers.free(this._writeWatcher); this._writeWatcher = null; } if (this._readWatcher) { this._readWatcher.stop(); + this._readWatcher.socket = null; ioWatchers.free(this._readWatcher); this._readWatcher = null; }