diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 7017d3ea13d..3c60a7bacf3 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -1,6 +1,9 @@ // Copyright 2009 Ryan Dahl #include +#include +#include + #include namespace node { @@ -8,19 +11,23 @@ namespace node { using namespace v8; Persistent IOWatcher::constructor_template; +Persistent callback_symbol; void IOWatcher::Initialize(Handle target) { HandleScope scope; Local t = FunctionTemplate::New(IOWatcher::New); constructor_template = Persistent::New(t); - constructor_template->InstanceTemplate()->SetInternalFieldCount(2); + constructor_template->InstanceTemplate()->SetInternalFieldCount(1); constructor_template->SetClassName(String::NewSymbol("IOWatcher")); NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", IOWatcher::Start); NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set); target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction()); + + callback_symbol = NODE_PSYMBOL("callback"); } @@ -29,7 +36,7 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) { assert(w == &io->watcher_); HandleScope scope; - Local callback_v = io->handle_->GetInternalField(1); + Local callback_v = io->handle_->Get(callback_symbol); assert(callback_v->IsFunction()); Local callback = Local::Cast(callback_v); @@ -48,13 +55,49 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) { // -// var io = new process.IOWatcher(fd, true, true, function (readable, writable) { +// var io = new process.IOWatcher(function (readable, writable) { // // }); +// io.set(fd, true, false); +// io.start(); // Handle IOWatcher::New(const Arguments& args) { HandleScope scope; + if (!args[0]->IsFunction()) { + return ThrowException(Exception::TypeError( + String::New("First arg should a callback."))); + } + + Local callback = Local::Cast(args[0]); + + IOWatcher *s = new IOWatcher(); + + s->Wrap(args.This()); + + s->handle_->Set(callback_symbol, callback); + + return args.This(); +} + + +Handle IOWatcher::Start(const Arguments& args) { + HandleScope scope; + + IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); + + ev_io_start(EV_DEFAULT_UC_ &io->watcher_); + + io->Ref(); + + return Undefined(); +} + +Handle IOWatcher::Set(const Arguments& args) { + HandleScope scope; + + IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); + if (!args[0]->IsInt32()) { return ThrowException(Exception::TypeError( String::New("First arg should be a file descriptor."))); @@ -78,35 +121,11 @@ Handle IOWatcher::New(const Arguments& args) { if (args[2]->IsTrue()) events |= EV_WRITE; - if (!args[3]->IsFunction()) { - return ThrowException(Exception::TypeError( - String::New("Fourth arg should a callback."))); - } - - Local callback = Local::Cast(args[3]); - - IOWatcher *s = new IOWatcher(fd, events); - - s->Wrap(args.This()); - s->handle_->SetInternalField(1, callback); - - return args.This(); -} - - -Handle IOWatcher::Start(const Arguments& args) { - HandleScope scope; - - IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); - - ev_io_start(EV_DEFAULT_UC_ &io->watcher_); - - io->Ref(); + ev_io_set(&io->watcher_, fd, events); return Undefined(); } - Handle IOWatcher::Stop(const Arguments& args) { HandleScope scope; IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); @@ -117,6 +136,7 @@ Handle IOWatcher::Stop(const Arguments& args) { void IOWatcher::Stop () { if (watcher_.active) { + HandleScope scope; ev_io_stop(EV_DEFAULT_UC_ &watcher_); Unref(); } diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 4e40593241e..420c6de5cc2 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -2,7 +2,7 @@ #ifndef NODE_IO_H_ #define NODE_IO_H_ -#include +#include #include namespace node { @@ -14,8 +14,8 @@ class IOWatcher : ObjectWrap { protected: static v8::Persistent constructor_template; - IOWatcher(int fd, int events) : ObjectWrap() { - ev_io_init(&watcher_, IOWatcher::Callback, fd, events); + IOWatcher() : ObjectWrap() { + ev_init(&watcher_, IOWatcher::Callback); watcher_.data = this; } @@ -26,6 +26,7 @@ class IOWatcher : ObjectWrap { static v8::Handle New(const v8::Arguments& args); static v8::Handle Start(const v8::Arguments& args); static v8::Handle Stop(const v8::Arguments& args); + static v8::Handle Set(const v8::Arguments& args); private: static void Callback(EV_P_ ev_io *watcher, int revents); diff --git a/src/node_net2.cc b/src/node_net2.cc index 13a5aafe7c6..cf2433e5b37 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -334,31 +334,31 @@ static Handle Accept(const Arguments& args) { FD_ARG(args[0]) - struct sockaddr_storage addr; - socklen_t len; + struct sockaddr_storage address_storage; + socklen_t len = sizeof(struct sockaddr_storage); - int peer = accept(fd, (struct sockaddr*) &addr, &len); + int peer_fd = accept(fd, (struct sockaddr*) &address_storage, &len); - if (peer < 0) { - if (errno == EAGAIN) return Null(); + if (peer_fd < 0) { + if (errno == EAGAIN) return scope.Close(Null()); return ThrowException(ErrnoException(errno, "accept")); } - if (!SetNonBlock(peer)) { + if (!SetNonBlock(peer_fd)) { int fcntl_errno = errno; - close(peer); - return ThrowException(ErrnoException(fcntl_errno, "fcntl")); + close(peer_fd); + return ThrowException(ErrnoException(fcntl_errno, "fcntl", "Cannot make peer non-blocking")); } Local peer_info = Object::New(); - peer_info->Set(fd_symbol, Integer::New(fd)); + peer_info->Set(fd_symbol, Integer::New(peer_fd)); - if (addr.ss_family == AF_INET6) { - struct sockaddr_in6 *a = reinterpret_cast(&addr); + if (address_storage.ss_family == AF_INET6) { + struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage; char ip[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN); int port = ntohs(a->sin6_port); @@ -499,11 +499,11 @@ void InitNet2(Handle target) { target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE)); target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED)); - errno_symbol = NODE_PSYMBOL("errno"); - syscall_symbol = NODE_PSYMBOL("syscall"); - fd_symbol = NODE_PSYMBOL("fd"); + errno_symbol = NODE_PSYMBOL("errno"); + syscall_symbol = NODE_PSYMBOL("syscall"); + fd_symbol = NODE_PSYMBOL("fd"); remote_address_symbol = NODE_PSYMBOL("remoteAddress"); - remote_port_symbol = NODE_PSYMBOL("remotePort"); + remote_port_symbol = NODE_PSYMBOL("remotePort"); } } // namespace node diff --git a/tcp.js b/tcp.js index af7334945f6..eafa1448707 100644 --- a/tcp.js +++ b/tcp.js @@ -1,8 +1,55 @@ -var socket = process.socket; -var bind = process.bind; -var listen = process.listen; -var accept = process.accept; -var close = process.close; +var debugLevel = 0; +if ("NODE_DEBUG" in process.ENV) debugLevel = 1; +function debug (x) { + if (debugLevel > 0) { + process.stdio.writeError(x + "\n"); + } +} + +var socket = process.socket; +var bind = process.bind; +var listen = process.listen; +var accept = process.accept; +var close = process.close; +var shutdown = process.shutdown; + +var Peer = function (peerInfo) { + process.EventEmitter.call(); + + var self = this; + + self.fd = peerInfo.fd; + self.remoteAddress = peerInfo.remoteAddress; + self.remotePort = peerInfo.remotePort; + + self.readWatcher = new process.IOWatcher(function () { + debug(self.fd + " readable"); + }); + self.readWatcher.set(self.fd, true, false); + self.readWatcher.start(); + + self.writeWatcher = new process.IOWatcher(function () { + debug(self.fd + " writable"); + }); + self.writeWatcher.set(self.fd, false, true); + + self.readable = true; + self.writable = true; +}; +process.inherits(Peer, process.EventEmitter); + +Peer.prototype.close = function () { + this.readable = false; + this.writable = false; + + this.writeWatcher.stop(); + this.readWatcher.stop(); + close(this.fd); + debug("close peer " + this.fd); + this.fd = null; +}; + + var Server = function (listener) { var self = this; @@ -10,7 +57,19 @@ var Server = function (listener) { if (listener) { self.addListener("connection", listener); } - + + self.watcher = new process.IOWatcher(function (readable, writeable) { + debug("readable " + readable); + debug("writable " + writeable); + while (self.fd) { + debug("accept from " + self.fd); + var peerInfo = accept(self.fd); + debug("accept: " + JSON.stringify(peerInfo)); + if (!peerInfo) return; + var peer = new Peer(peerInfo); + self.emit("connection", peer); + } + }); }; process.inherits(Server, process.EventEmitter); @@ -24,15 +83,7 @@ Server.prototype.listen = function (port, host) { bind(self.fd, port, host); listen(self.fd, 128); // TODO configurable backlog - self.watcher = new process.IOWatcher(self.fd, true, false, function () { - var peerInfo; - while (self.fd) { - peerInfo = accept(self.fd); - if (peerInfo === null) return; - self.emit("connection", peerInfo); - } - }); - + self.watcher.set(self.fd, true, false); self.watcher.start(); }; @@ -41,15 +92,20 @@ Server.prototype.close = function () { if (!self.fd) throw new Error("Not running"); self.watcher.stop(); close(self.fd); - this.watcher = null; - this.fd = null; + self.fd = null; }; /////////////////////////////////////////////////////// var sys = require("sys"); -var server = new Server(function () { - sys.puts("connection"); - server.close(); +var server = new Server(function (peer) { + sys.puts("connection (" + peer.fd + "): " + + peer.remoteAddress + + " port " + + peer.remotePort + ); + sys.puts("server fd: " + server.fd); + peer.close(); }); server.listen(8000); +sys.puts("server fd: " + server.fd);