Add IOWatcher.flush()

To be called if sockets get too much data. This is to force a flush before
the tick ends.
This commit is contained in:
Ryan Dahl 2010-11-12 11:31:39 -08:00
parent 7c3c5c6861
commit 10ff559ec3
3 changed files with 21 additions and 0 deletions

View File

@ -290,6 +290,10 @@ Stream.prototype.write = function (data, encoding, fd) {
this._onWritable(); // Insert writeWatcher into the dumpQueue this._onWritable(); // Insert writeWatcher into the dumpQueue
require('timers').active(this); require('timers').active(this);
if (queueSize > (64*1024)) {
IOWatcher.flush();
}
return queueSize < (64*1024); return queueSize < (64*1024);
}; };

View File

@ -55,6 +55,10 @@ void IOWatcher::Initialize(Handle<Object> target) {
Local<Function> io_watcher = constructor_template->GetFunction(); Local<Function> io_watcher = constructor_template->GetFunction();
target->Set(String::NewSymbol("IOWatcher"), io_watcher); target->Set(String::NewSymbol("IOWatcher"), io_watcher);
NODE_SET_METHOD(constructor_template->GetFunction(),
"flush",
IOWatcher::Flush);
callback_symbol = NODE_PSYMBOL("callback"); callback_symbol = NODE_PSYMBOL("callback");
next_sym = NODE_PSYMBOL("next"); next_sym = NODE_PSYMBOL("next");
@ -194,6 +198,13 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
return Undefined(); return Undefined();
} }
Handle<Value> IOWatcher::Flush(const Arguments& args) {
HandleScope scope; // unneccessary?
IOWatcher::Dump();
return Undefined();
}
#define KB 1024 #define KB 1024
/* /*
@ -233,7 +244,11 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
assert(revents == EV_PREPARE); assert(revents == EV_PREPARE);
assert(w == &dumper); assert(w == &dumper);
Dump();
}
void IOWatcher::Dump() {
HandleScope scope; HandleScope scope;
static struct iovec iov[IOV_MAX]; static struct iovec iov[IOV_MAX];

View File

@ -26,6 +26,7 @@ class IOWatcher : ObjectWrap {
} }
static v8::Handle<v8::Value> New(const v8::Arguments& args); static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Flush(const v8::Arguments& args);
static v8::Handle<v8::Value> Start(const v8::Arguments& args); static v8::Handle<v8::Value> Start(const v8::Arguments& args);
static v8::Handle<v8::Value> Stop(const v8::Arguments& args); static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
static v8::Handle<v8::Value> Set(const v8::Arguments& args); static v8::Handle<v8::Value> Set(const v8::Arguments& args);
@ -34,6 +35,7 @@ class IOWatcher : ObjectWrap {
static void Callback(EV_P_ ev_io *watcher, int revents); static void Callback(EV_P_ ev_io *watcher, int revents);
static void Dump(EV_P_ ev_prepare *watcher, int revents); static void Dump(EV_P_ ev_prepare *watcher, int revents);
static void Dump();
void Start(); void Start();
void Stop(); void Stop();