[net2] Add Socket.setTimeout
Still seeing crashes and performance problems.
This commit is contained in:
parent
4635ed7fde
commit
ca862d75de
196
lib/net.js
196
lib/net.js
@ -34,6 +34,167 @@ var EINPROGRESS = process.EINPROGRESS;
|
||||
var ENOENT = process.ENOENT;
|
||||
var END_OF_FILE = 0;
|
||||
|
||||
|
||||
// IDLE TIMEOUTS
|
||||
//
|
||||
// Because often many sockets will have the same idle timeout we will not
|
||||
// use one timeout watcher per socket. It is too much overhead. Instead
|
||||
// we'll use a single watcher for all sockets with the same timeout value
|
||||
// and a linked list. This technique is described in the libev manual:
|
||||
// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
|
||||
|
||||
|
||||
var timeout = new (function () {
|
||||
// Object containing all lists, timers
|
||||
// key = time in milliseconds
|
||||
// value = list
|
||||
var lists = {};
|
||||
|
||||
// show the most idle socket
|
||||
function peek (list) {
|
||||
if (list._idlePrev == list) return null;
|
||||
return list._idlePrev;
|
||||
}
|
||||
|
||||
|
||||
// remove the most idle socket from the list
|
||||
function shift (list) {
|
||||
var first = list._idlePrev;
|
||||
remove(first);
|
||||
return first;
|
||||
}
|
||||
|
||||
|
||||
// remove a socket from its list
|
||||
function remove (socket) {
|
||||
socket._idleNext._idlePrev = socket._idlePrev;
|
||||
socket._idlePrev._idleNext = socket._idleNext;
|
||||
}
|
||||
|
||||
|
||||
// remove a socket from its list and place at the end.
|
||||
function append (list, socket) {
|
||||
remove(socket);
|
||||
socket._idleNext = list._idleNext;
|
||||
socket._idleNext._idlePrev = socket;
|
||||
socket._idlePrev = list
|
||||
list._idleNext = socket;
|
||||
}
|
||||
|
||||
|
||||
function normalize (msecs) {
|
||||
if (!msecs || msecs <= 0) return 0;
|
||||
// round up to one sec
|
||||
if (msecs < 1000) return 1000;
|
||||
// round down to nearest second.
|
||||
return msecs - (msecs % 1000);
|
||||
}
|
||||
|
||||
// the main function - creates lists on demand and the watchers associated
|
||||
// with them.
|
||||
function insert (socket, msecs) {
|
||||
socket._idleStart = process.now;
|
||||
socket._idleTimeout = msecs;
|
||||
|
||||
if (!msecs) return;
|
||||
|
||||
var list;
|
||||
|
||||
if (lists[msecs]) {
|
||||
list = lists[msecs];
|
||||
} else {
|
||||
list = new process.Timer();
|
||||
list._idleNext = list;
|
||||
list._idlePrev = list;
|
||||
|
||||
lists[msecs] = list;
|
||||
|
||||
list.callback = function () {
|
||||
sys.puts('timeout callback ' + msecs);
|
||||
// TODO - don't stop and start the watcher all the time.
|
||||
// just set its repeat
|
||||
var now = process.now;
|
||||
var first;
|
||||
while (first = peek(list)) {
|
||||
var diff = now - first._idleStart;
|
||||
if (diff < msecs) {
|
||||
list.again(msecs - diff);
|
||||
sys.puts(msecs + ' list wait');
|
||||
return;
|
||||
} else {
|
||||
remove(first);
|
||||
assert(first != peek(list));
|
||||
first.emit('timeout');
|
||||
first.forceClose(new Error('idle timeout'));
|
||||
}
|
||||
}
|
||||
sys.puts(msecs + ' list empty');
|
||||
assert(list._idleNext == list); // list is empty
|
||||
list.stop();
|
||||
};
|
||||
}
|
||||
|
||||
if (list._idleNext == list) {
|
||||
// if empty (re)start the timer
|
||||
list.again(msecs);
|
||||
}
|
||||
|
||||
append(list, socket);
|
||||
assert(list._idleNext != list); // list is not empty
|
||||
}
|
||||
|
||||
|
||||
var unenroll = this.unenroll = function (socket) {
|
||||
socket._idleNext._idlePrev = socket._idlePrev;
|
||||
socket._idlePrev._idleNext = socket._idleNext;
|
||||
|
||||
var list = lists[socket._idleTimeout];
|
||||
// if empty then stop the watcher
|
||||
//sys.puts('unenroll');
|
||||
if (list && list._idlePrev == list) {
|
||||
//sys.puts('unenroll: list empty');
|
||||
list.stop();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Does not start the time, just sets up the members needed.
|
||||
this.enroll = function (socket, msecs) {
|
||||
// if this socket was already in a list somewhere
|
||||
// then we should unenroll it from that
|
||||
if (socket._idleNext) unenroll(socket);
|
||||
|
||||
socket._idleTimeout = msecs;
|
||||
socket._idleNext = socket;
|
||||
socket._idlePrev = socket;
|
||||
};
|
||||
|
||||
// call this whenever the socket is active (not idle)
|
||||
// it will reset its timeout.
|
||||
this.active = function (socket) {
|
||||
var msecs = socket._idleTimeout;
|
||||
if (msecs) {
|
||||
var list = lists[msecs];
|
||||
if (socket._idleNext == socket) {
|
||||
insert(socket, msecs);
|
||||
} else {
|
||||
// inline append
|
||||
socket._idleStart = process.now;
|
||||
socket._idleNext._idlePrev = socket._idlePrev;
|
||||
socket._idlePrev._idleNext = socket._idleNext;
|
||||
socket._idleNext = list._idleNext;
|
||||
socket._idleNext._idlePrev = socket;
|
||||
socket._idlePrev = list
|
||||
list._idleNext = socket;
|
||||
}
|
||||
}
|
||||
};
|
||||
})();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
// This is a free list to avoid creating so many of the same object.
|
||||
|
||||
function FreeList (name, max, constructor) {
|
||||
@ -43,29 +204,33 @@ function FreeList (name, max, constructor) {
|
||||
this.list = [];
|
||||
}
|
||||
|
||||
|
||||
FreeList.prototype.alloc = function () {
|
||||
//debug("alloc " + this.name + " " + this.list.length);
|
||||
return this.list.length ? this.list.shift()
|
||||
: this.constructor.apply(this, arguments);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
FreeList.prototype.free = function (obj) {
|
||||
//debug("free " + this.name + " " + this.list.length);
|
||||
if (this.list.length < this.max) {
|
||||
this.list.push(obj);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
var ioWatchers = new FreeList("iowatcher", 100, function () {
|
||||
return new IOWatcher();
|
||||
});
|
||||
|
||||
|
||||
var nb = 0;
|
||||
var buffers = new FreeList("buffer", 100, function (l) {
|
||||
return new Buffer(500);
|
||||
return new Buffer(l);
|
||||
});
|
||||
|
||||
|
||||
// Allocated on demand.
|
||||
var recvBuffer = null;
|
||||
function allocRecvBuffer () {
|
||||
@ -73,6 +238,7 @@ function allocRecvBuffer () {
|
||||
recvBuffer.used = 0;
|
||||
}
|
||||
|
||||
|
||||
function _doFlush () {
|
||||
var socket = this.socket;
|
||||
// Socket becomes writeable on connect() but don't flush if there's
|
||||
@ -90,6 +256,8 @@ function _doFlush () {
|
||||
}
|
||||
|
||||
function initSocket (self) {
|
||||
timeout.enroll(self, 60*1000); // default timeout, 60 seconds
|
||||
|
||||
self._readWatcher = ioWatchers.alloc();
|
||||
self._readWatcher.callback = function () {
|
||||
// If this is the first recv (recvBuffer doesn't exist) or we've used up
|
||||
@ -140,6 +308,9 @@ function initSocket (self) {
|
||||
|
||||
if (!self.writable) self.forceClose();
|
||||
} else if (bytesRead > 0) {
|
||||
|
||||
timeout.active(self);
|
||||
|
||||
var start = recvBuffer.used;
|
||||
var end = recvBuffer.used + bytesRead;
|
||||
|
||||
@ -215,6 +386,7 @@ exports.createConnection = function (port, host) {
|
||||
|
||||
Object.defineProperty(Socket.prototype, 'readyState', {
|
||||
get: function () {
|
||||
sys.error('readyState is depricated. Use stream.readable or stream.writable');
|
||||
if (this.readable && this.writable) {
|
||||
return 'open';
|
||||
} else if (this.readable && !this.writable){
|
||||
@ -396,6 +568,9 @@ Socket.prototype.flush = function () {
|
||||
return false;
|
||||
}
|
||||
|
||||
timeout.active(self);
|
||||
|
||||
|
||||
if (bytesWritten === null) {
|
||||
// EAGAIN
|
||||
debug('write EAGAIN');
|
||||
@ -459,6 +634,8 @@ Socket.prototype.connect = function () {
|
||||
|
||||
var self = this;
|
||||
if (self.fd) throw new Error('Socket already opened');
|
||||
|
||||
timeout.active(socket);
|
||||
|
||||
if (typeof(arguments[0]) == 'string') {
|
||||
self.fd = socket('unix');
|
||||
@ -477,28 +654,34 @@ Socket.prototype.connect = function () {
|
||||
}
|
||||
};
|
||||
|
||||
var xxx = 0;
|
||||
|
||||
|
||||
Socket.prototype.address = function () {
|
||||
return getsockname(this.fd);
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.setNoDelay = function (v) {
|
||||
if (this.type == 'tcp') setNoDelay(this.fd, v);
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.setTimeout = function (msecs) {
|
||||
timeout.enroll(this, msecs);
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.pause = function () {
|
||||
this._readWatcher.stop();
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.resume = function () {
|
||||
if (!this.fd) throw new Error('Cannot resume() closed Socket.');
|
||||
this._readWatcher.set(this.fd, true, false);
|
||||
this._readWatcher.start();
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.forceClose = function (exception) {
|
||||
// recvBuffer is shared between sockets, so don't need to free it here.
|
||||
var self = this;
|
||||
@ -521,6 +704,8 @@ Socket.prototype.forceClose = function (exception) {
|
||||
this._readWatcher = null;
|
||||
}
|
||||
|
||||
timeout.unenroll(this);
|
||||
|
||||
if (this.fd) {
|
||||
close(this.fd);
|
||||
debug('close ' + this.fd);
|
||||
@ -580,6 +765,7 @@ function Server (listener) {
|
||||
// The 'connect' event probably should be removed for server-side
|
||||
// sockets. It's redundent.
|
||||
peer.emit('connect');
|
||||
timeout.active(peer);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user