Move idle timers into its own module
This commit is contained in:
parent
dcc4fffe4d
commit
5cc29b80f2
177
lib/net.js
177
lib/net.js
@ -24,7 +24,6 @@ var binding = process.binding('net');
|
|||||||
var FreeList = require('freelist').FreeList;
|
var FreeList = require('freelist').FreeList;
|
||||||
|
|
||||||
var IOWatcher = process.binding('io_watcher').IOWatcher;
|
var IOWatcher = process.binding('io_watcher').IOWatcher;
|
||||||
var Timer = process.binding('timer').Timer;
|
|
||||||
var constants = process.binding('constants');
|
var constants = process.binding('constants');
|
||||||
var assert = process.assert;
|
var assert = process.assert;
|
||||||
|
|
||||||
@ -53,163 +52,6 @@ var EMFILE = constants.EMFILE;
|
|||||||
var END_OF_FILE = 42;
|
var END_OF_FILE = 42;
|
||||||
var SecureContext, SecureStream; // lazy loaded
|
var SecureContext, SecureStream; // lazy loaded
|
||||||
|
|
||||||
// IDLE TIMEOUTS
|
|
||||||
//
|
|
||||||
// Because often many sockets will have the same idle timeout we will not
|
|
||||||
// use one timeout watcher per socket. It is too much overhead. Instead
|
|
||||||
// we'll use a single watcher for all sockets with the same timeout value
|
|
||||||
// and a linked list. This technique is described in the libev manual:
|
|
||||||
// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
|
|
||||||
|
|
||||||
|
|
||||||
var timeout = new (function () {
|
|
||||||
// Object containing all lists, timers
|
|
||||||
// key = time in milliseconds
|
|
||||||
// value = list
|
|
||||||
var lists = {};
|
|
||||||
|
|
||||||
// show the most idle socket
|
|
||||||
function peek (list) {
|
|
||||||
if (list._idlePrev == list) return null;
|
|
||||||
return list._idlePrev;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// remove the most idle socket from the list
|
|
||||||
function shift (list) {
|
|
||||||
var first = list._idlePrev;
|
|
||||||
remove(first);
|
|
||||||
return first;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// remove a socket from its list
|
|
||||||
function remove (socket) {
|
|
||||||
socket._idleNext._idlePrev = socket._idlePrev;
|
|
||||||
socket._idlePrev._idleNext = socket._idleNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// remove a socket from its list and place at the end.
|
|
||||||
function append (list, socket) {
|
|
||||||
remove(socket);
|
|
||||||
socket._idleNext = list._idleNext;
|
|
||||||
socket._idleNext._idlePrev = socket;
|
|
||||||
socket._idlePrev = list;
|
|
||||||
list._idleNext = socket;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
function normalize (msecs) {
|
|
||||||
if (!msecs || msecs <= 0) return 0;
|
|
||||||
// round up to one sec
|
|
||||||
if (msecs < 1000) return 1000;
|
|
||||||
// round down to nearest second.
|
|
||||||
return msecs - (msecs % 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
// the main function - creates lists on demand and the watchers associated
|
|
||||||
// with them.
|
|
||||||
function insert (socket, msecs) {
|
|
||||||
socket._idleStart = new Date();
|
|
||||||
socket._idleTimeout = msecs;
|
|
||||||
|
|
||||||
if (!msecs) return;
|
|
||||||
|
|
||||||
var list;
|
|
||||||
|
|
||||||
if (lists[msecs]) {
|
|
||||||
list = lists[msecs];
|
|
||||||
} else {
|
|
||||||
list = new Timer();
|
|
||||||
list._idleNext = list;
|
|
||||||
list._idlePrev = list;
|
|
||||||
|
|
||||||
lists[msecs] = list;
|
|
||||||
|
|
||||||
list.callback = function () {
|
|
||||||
debug('timeout callback ' + msecs);
|
|
||||||
// TODO - don't stop and start the watcher all the time.
|
|
||||||
// just set its repeat
|
|
||||||
var now = new Date();
|
|
||||||
debug("now: " + now);
|
|
||||||
var first;
|
|
||||||
while (first = peek(list)) {
|
|
||||||
var diff = now - first._idleStart;
|
|
||||||
if (diff < msecs) {
|
|
||||||
list.again(msecs - diff);
|
|
||||||
debug(msecs + ' list wait because diff is ' + diff);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
remove(first);
|
|
||||||
assert(first != peek(list));
|
|
||||||
first.emit('timeout');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug(msecs + ' list empty');
|
|
||||||
assert(list._idleNext == list); // list is empty
|
|
||||||
list.stop();
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (list._idleNext == list) {
|
|
||||||
// if empty (re)start the timer
|
|
||||||
list.again(msecs);
|
|
||||||
}
|
|
||||||
|
|
||||||
append(list, socket);
|
|
||||||
assert(list._idleNext != list); // list is not empty
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
var unenroll = this.unenroll = function (socket) {
|
|
||||||
if (socket._idleNext) {
|
|
||||||
socket._idleNext._idlePrev = socket._idlePrev;
|
|
||||||
socket._idlePrev._idleNext = socket._idleNext;
|
|
||||||
|
|
||||||
var list = lists[socket._idleTimeout];
|
|
||||||
// if empty then stop the watcher
|
|
||||||
//debug('unenroll');
|
|
||||||
if (list && list._idlePrev == list) {
|
|
||||||
//debug('unenroll: list empty');
|
|
||||||
list.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
// Does not start the time, just sets up the members needed.
|
|
||||||
this.enroll = function (socket, msecs) {
|
|
||||||
// if this socket was already in a list somewhere
|
|
||||||
// then we should unenroll it from that
|
|
||||||
if (socket._idleNext) unenroll(socket);
|
|
||||||
|
|
||||||
socket._idleTimeout = msecs;
|
|
||||||
socket._idleNext = socket;
|
|
||||||
socket._idlePrev = socket;
|
|
||||||
};
|
|
||||||
|
|
||||||
// call this whenever the socket is active (not idle)
|
|
||||||
// it will reset its timeout.
|
|
||||||
this.active = function (socket) {
|
|
||||||
var msecs = socket._idleTimeout;
|
|
||||||
if (msecs) {
|
|
||||||
var list = lists[msecs];
|
|
||||||
if (socket._idleNext == socket) {
|
|
||||||
insert(socket, msecs);
|
|
||||||
} else {
|
|
||||||
// inline append
|
|
||||||
socket._idleStart = new Date();
|
|
||||||
socket._idleNext._idlePrev = socket._idlePrev;
|
|
||||||
socket._idlePrev._idleNext = socket._idleNext;
|
|
||||||
socket._idleNext = list._idleNext;
|
|
||||||
socket._idleNext._idlePrev = socket;
|
|
||||||
socket._idlePrev = list;
|
|
||||||
list._idleNext = socket;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
})();
|
|
||||||
|
|
||||||
var ioWatchers = new FreeList("iowatcher", 100, function () {
|
var ioWatchers = new FreeList("iowatcher", 100, function () {
|
||||||
return new IOWatcher();
|
return new IOWatcher();
|
||||||
@ -480,7 +322,7 @@ function initStream (self) {
|
|||||||
if (self.onend) self.onend();
|
if (self.onend) self.onend();
|
||||||
} else if (bytesRead > 0) {
|
} else if (bytesRead > 0) {
|
||||||
|
|
||||||
timeout.active(self);
|
require('timers').active(self);
|
||||||
|
|
||||||
var start = pool.used;
|
var start = pool.used;
|
||||||
var end = pool.used + bytesRead;
|
var end = pool.used + bytesRead;
|
||||||
@ -551,6 +393,11 @@ util.inherits(Stream, stream.Stream);
|
|||||||
exports.Stream = Stream;
|
exports.Stream = Stream;
|
||||||
|
|
||||||
|
|
||||||
|
Stream.prototype._onTimeout = function () {
|
||||||
|
this.emit('timeout');
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
Stream.prototype.setSecure = function (credentials) {
|
Stream.prototype.setSecure = function (credentials) {
|
||||||
// Do we have openssl crypto?
|
// Do we have openssl crypto?
|
||||||
try {
|
try {
|
||||||
@ -774,7 +621,7 @@ Stream.prototype._writeOut = function (data, encoding, fd) {
|
|||||||
|
|
||||||
debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");
|
debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");
|
||||||
|
|
||||||
timeout.active(this);
|
require('timers').active(this);
|
||||||
|
|
||||||
if (bytesWritten == len) {
|
if (bytesWritten == len) {
|
||||||
// awesome. sent to buffer.
|
// awesome. sent to buffer.
|
||||||
@ -909,7 +756,7 @@ Stream.prototype.connect = function () {
|
|||||||
if (self.fd) throw new Error('Stream already opened');
|
if (self.fd) throw new Error('Stream already opened');
|
||||||
if (!self._readWatcher) throw new Error('No readWatcher');
|
if (!self._readWatcher) throw new Error('No readWatcher');
|
||||||
|
|
||||||
timeout.active(socket);
|
require('timers').active(socket);
|
||||||
|
|
||||||
self._connecting = true; // set false in doConnect
|
self._connecting = true; // set false in doConnect
|
||||||
self.writable = true;
|
self.writable = true;
|
||||||
@ -957,10 +804,10 @@ Stream.prototype.setKeepAlive = function (enable, time) {
|
|||||||
|
|
||||||
Stream.prototype.setTimeout = function (msecs) {
|
Stream.prototype.setTimeout = function (msecs) {
|
||||||
if (msecs > 0) {
|
if (msecs > 0) {
|
||||||
timeout.enroll(this, msecs);
|
require('timers').enroll(this, msecs);
|
||||||
if (this.fd) { timeout.active(this); }
|
if (this.fd) { require('timers').active(this); }
|
||||||
} else if (msecs === 0) {
|
} else if (msecs === 0) {
|
||||||
timeout.unenroll(this);
|
require('timers').unenroll(this);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -999,7 +846,7 @@ Stream.prototype.destroy = function (exception) {
|
|||||||
this._readWatcher = null;
|
this._readWatcher = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout.unenroll(this);
|
require('timers').unenroll(this);
|
||||||
|
|
||||||
if (this.secure) {
|
if (this.secure) {
|
||||||
this.secureStream.close();
|
this.secureStream.close();
|
||||||
|
159
lib/timers.js
Normal file
159
lib/timers.js
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
var Timer = process.binding('timer').Timer;
|
||||||
|
var assert = process.assert;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* To enable debug statements for the timers do NODE_DEBUG=8 ./node script.js
|
||||||
|
*/
|
||||||
|
var debugLevel = parseInt(process.env.NODE_DEBUG, 16);
|
||||||
|
function debug () {
|
||||||
|
if (debugLevel & 0x8) {
|
||||||
|
require('util').error.apply(this, arguments);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IDLE TIMEOUTS
|
||||||
|
//
|
||||||
|
// Because often many sockets will have the same idle timeout we will not
|
||||||
|
// use one timeout watcher per socket. It is too much overhead. Instead
|
||||||
|
// we'll use a single watcher for all sockets with the same timeout value
|
||||||
|
// and a linked list. This technique is described in the libev manual:
|
||||||
|
// http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#Be_smart_about_timeouts
|
||||||
|
|
||||||
|
// Object containing all lists, timers
|
||||||
|
// key = time in milliseconds
|
||||||
|
// value = list
|
||||||
|
var lists = {};
|
||||||
|
|
||||||
|
// show the most idle socket
|
||||||
|
function peek (list) {
|
||||||
|
if (list._idlePrev == list) return null;
|
||||||
|
return list._idlePrev;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// remove the most idle socket from the list
|
||||||
|
function shift (list) {
|
||||||
|
var first = list._idlePrev;
|
||||||
|
remove(first);
|
||||||
|
return first;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// remove a socket from its list
|
||||||
|
function remove (socket) {
|
||||||
|
socket._idleNext._idlePrev = socket._idlePrev;
|
||||||
|
socket._idlePrev._idleNext = socket._idleNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// remove a socket from its list and place at the end.
|
||||||
|
function append (list, socket) {
|
||||||
|
remove(socket);
|
||||||
|
socket._idleNext = list._idleNext;
|
||||||
|
socket._idleNext._idlePrev = socket;
|
||||||
|
socket._idlePrev = list;
|
||||||
|
list._idleNext = socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// the main function - creates lists on demand and the watchers associated
|
||||||
|
// with them.
|
||||||
|
function insert (socket, msecs) {
|
||||||
|
socket._idleStart = new Date();
|
||||||
|
socket._idleTimeout = msecs;
|
||||||
|
|
||||||
|
if (!msecs) return;
|
||||||
|
|
||||||
|
var list;
|
||||||
|
|
||||||
|
if (lists[msecs]) {
|
||||||
|
list = lists[msecs];
|
||||||
|
} else {
|
||||||
|
list = new Timer();
|
||||||
|
list._idleNext = list;
|
||||||
|
list._idlePrev = list;
|
||||||
|
|
||||||
|
lists[msecs] = list;
|
||||||
|
|
||||||
|
list.callback = function () {
|
||||||
|
debug('timeout callback ' + msecs);
|
||||||
|
// TODO - don't stop and start the watcher all the time.
|
||||||
|
// just set its repeat
|
||||||
|
var now = new Date();
|
||||||
|
debug("now: " + now);
|
||||||
|
var first;
|
||||||
|
while (first = peek(list)) {
|
||||||
|
var diff = now - first._idleStart;
|
||||||
|
if (diff < msecs) {
|
||||||
|
list.again(msecs - diff);
|
||||||
|
debug(msecs + ' list wait because diff is ' + diff);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
remove(first);
|
||||||
|
assert(first != peek(list));
|
||||||
|
if (first._onTimeout) first._onTimeout();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug(msecs + ' list empty');
|
||||||
|
assert(list._idleNext == list); // list is empty
|
||||||
|
list.stop();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (list._idleNext == list) {
|
||||||
|
// if empty (re)start the timer
|
||||||
|
list.again(msecs);
|
||||||
|
}
|
||||||
|
|
||||||
|
append(list, socket);
|
||||||
|
assert(list._idleNext != list); // list is not empty
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
var unenroll = exports.unenroll = function (socket) {
|
||||||
|
if (socket._idleNext) {
|
||||||
|
socket._idleNext._idlePrev = socket._idlePrev;
|
||||||
|
socket._idlePrev._idleNext = socket._idleNext;
|
||||||
|
|
||||||
|
var list = lists[socket._idleTimeout];
|
||||||
|
// if empty then stop the watcher
|
||||||
|
//debug('unenroll');
|
||||||
|
if (list && list._idlePrev == list) {
|
||||||
|
//debug('unenroll: list empty');
|
||||||
|
list.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// Does not start the time, just sets up the members needed.
|
||||||
|
exports.enroll = function (socket, msecs) {
|
||||||
|
// if this socket was already in a list somewhere
|
||||||
|
// then we should unenroll it from that
|
||||||
|
if (socket._idleNext) unenroll(socket);
|
||||||
|
|
||||||
|
socket._idleTimeout = msecs;
|
||||||
|
socket._idleNext = socket;
|
||||||
|
socket._idlePrev = socket;
|
||||||
|
};
|
||||||
|
|
||||||
|
// call this whenever the socket is active (not idle)
|
||||||
|
// it will reset its timeout.
|
||||||
|
exports.active = function (socket) {
|
||||||
|
var msecs = socket._idleTimeout;
|
||||||
|
if (msecs) {
|
||||||
|
var list = lists[msecs];
|
||||||
|
if (socket._idleNext == socket) {
|
||||||
|
insert(socket, msecs);
|
||||||
|
} else {
|
||||||
|
// inline append
|
||||||
|
socket._idleStart = new Date();
|
||||||
|
socket._idleNext._idlePrev = socket._idlePrev;
|
||||||
|
socket._idlePrev._idleNext = socket._idleNext;
|
||||||
|
socket._idleNext = list._idleNext;
|
||||||
|
socket._idleNext._idlePrev = socket;
|
||||||
|
socket._idlePrev = list;
|
||||||
|
list._idleNext = socket;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
Loading…
x
Reference in New Issue
Block a user