IOWatcher callback isn't internal, fix bug in Accept

This commit is contained in:
Ryan Dahl 2009-12-15 17:17:45 +01:00
parent 469e2648e5
commit 0ecd0fa598
4 changed files with 144 additions and 67 deletions

View File

@ -1,6 +1,9 @@
// Copyright 2009 Ryan Dahl <ry@tinyclouds.org>
#include <node_io_watcher.h>
#include <node.h>
#include <v8.h>
#include <assert.h>
namespace node {
@ -8,19 +11,23 @@ namespace node {
using namespace v8;
Persistent<FunctionTemplate> IOWatcher::constructor_template;
Persistent<String> callback_symbol;
void IOWatcher::Initialize(Handle<Object> target) {
HandleScope scope;
Local<FunctionTemplate> t = FunctionTemplate::New(IOWatcher::New);
constructor_template = Persistent<FunctionTemplate>::New(t);
constructor_template->InstanceTemplate()->SetInternalFieldCount(2);
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
constructor_template->SetClassName(String::NewSymbol("IOWatcher"));
NODE_SET_PROTOTYPE_METHOD(constructor_template, "start", IOWatcher::Start);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction());
callback_symbol = NODE_PSYMBOL("callback");
}
@ -29,7 +36,7 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
assert(w == &io->watcher_);
HandleScope scope;
Local<Value> callback_v = io->handle_->GetInternalField(1);
Local<Value> callback_v = io->handle_->Get(callback_symbol);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
@ -48,13 +55,49 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
//
// var io = new process.IOWatcher(fd, true, true, function (readable, writable) {
// var io = new process.IOWatcher(function (readable, writable) {
//
// });
// io.set(fd, true, false);
// io.start();
//
Handle<Value> IOWatcher::New(const Arguments& args) {
HandleScope scope;
if (!args[0]->IsFunction()) {
return ThrowException(Exception::TypeError(
String::New("First arg should a callback.")));
}
Local<Function> callback = Local<Function>::Cast(args[0]);
IOWatcher *s = new IOWatcher();
s->Wrap(args.This());
s->handle_->Set(callback_symbol, callback);
return args.This();
}
Handle<Value> IOWatcher::Start(const Arguments& args) {
HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
io->Ref();
return Undefined();
}
Handle<Value> IOWatcher::Set(const Arguments& args) {
HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
if (!args[0]->IsInt32()) {
return ThrowException(Exception::TypeError(
String::New("First arg should be a file descriptor.")));
@ -78,35 +121,11 @@ Handle<Value> IOWatcher::New(const Arguments& args) {
if (args[2]->IsTrue()) events |= EV_WRITE;
if (!args[3]->IsFunction()) {
return ThrowException(Exception::TypeError(
String::New("Fourth arg should a callback.")));
}
Local<Function> callback = Local<Function>::Cast(args[3]);
IOWatcher *s = new IOWatcher(fd, events);
s->Wrap(args.This());
s->handle_->SetInternalField(1, callback);
return args.This();
}
Handle<Value> IOWatcher::Start(const Arguments& args) {
HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
io->Ref();
ev_io_set(&io->watcher_, fd, events);
return Undefined();
}
Handle<Value> IOWatcher::Stop(const Arguments& args) {
HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
@ -117,6 +136,7 @@ Handle<Value> IOWatcher::Stop(const Arguments& args) {
void IOWatcher::Stop () {
if (watcher_.active) {
HandleScope scope;
ev_io_stop(EV_DEFAULT_UC_ &watcher_);
Unref();
}

View File

@ -2,7 +2,7 @@
#ifndef NODE_IO_H_
#define NODE_IO_H_
#include <node.h>
#include <node_object_wrap.h>
#include <ev.h>
namespace node {
@ -14,8 +14,8 @@ class IOWatcher : ObjectWrap {
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
IOWatcher(int fd, int events) : ObjectWrap() {
ev_io_init(&watcher_, IOWatcher::Callback, fd, events);
IOWatcher() : ObjectWrap() {
ev_init(&watcher_, IOWatcher::Callback);
watcher_.data = this;
}
@ -26,6 +26,7 @@ class IOWatcher : ObjectWrap {
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Start(const v8::Arguments& args);
static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
static v8::Handle<v8::Value> Set(const v8::Arguments& args);
private:
static void Callback(EV_P_ ev_io *watcher, int revents);

View File

@ -334,31 +334,31 @@ static Handle<Value> Accept(const Arguments& args) {
FD_ARG(args[0])
struct sockaddr_storage addr;
socklen_t len;
struct sockaddr_storage address_storage;
socklen_t len = sizeof(struct sockaddr_storage);
int peer = accept(fd, (struct sockaddr*) &addr, &len);
int peer_fd = accept(fd, (struct sockaddr*) &address_storage, &len);
if (peer < 0) {
if (errno == EAGAIN) return Null();
if (peer_fd < 0) {
if (errno == EAGAIN) return scope.Close(Null());
return ThrowException(ErrnoException(errno, "accept"));
}
if (!SetNonBlock(peer)) {
if (!SetNonBlock(peer_fd)) {
int fcntl_errno = errno;
close(peer);
return ThrowException(ErrnoException(fcntl_errno, "fcntl"));
close(peer_fd);
return ThrowException(ErrnoException(fcntl_errno, "fcntl", "Cannot make peer non-blocking"));
}
Local<Object> peer_info = Object::New();
peer_info->Set(fd_symbol, Integer::New(fd));
peer_info->Set(fd_symbol, Integer::New(peer_fd));
if (addr.ss_family == AF_INET6) {
struct sockaddr_in6 *a = reinterpret_cast<struct sockaddr_in6*>(&addr);
if (address_storage.ss_family == AF_INET6) {
struct sockaddr_in6 *a = (struct sockaddr_in6*)&address_storage;
char ip[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &a->sin6_addr, ip, INET6_ADDRSTRLEN);
inet_ntop(AF_INET6, &(a->sin6_addr), ip, INET6_ADDRSTRLEN);
int port = ntohs(a->sin6_port);
@ -499,11 +499,11 @@ void InitNet2(Handle<Object> target) {
target->Set(String::NewSymbol("EADDRINUSE"), Integer::New(EADDRINUSE));
target->Set(String::NewSymbol("ECONNREFUSED"), Integer::New(ECONNREFUSED));
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
errno_symbol = NODE_PSYMBOL("errno");
syscall_symbol = NODE_PSYMBOL("syscall");
fd_symbol = NODE_PSYMBOL("fd");
remote_address_symbol = NODE_PSYMBOL("remoteAddress");
remote_port_symbol = NODE_PSYMBOL("remotePort");
remote_port_symbol = NODE_PSYMBOL("remotePort");
}
} // namespace node

96
tcp.js
View File

@ -1,8 +1,55 @@
var socket = process.socket;
var bind = process.bind;
var listen = process.listen;
var accept = process.accept;
var close = process.close;
var debugLevel = 0;
if ("NODE_DEBUG" in process.ENV) debugLevel = 1;
function debug (x) {
if (debugLevel > 0) {
process.stdio.writeError(x + "\n");
}
}
var socket = process.socket;
var bind = process.bind;
var listen = process.listen;
var accept = process.accept;
var close = process.close;
var shutdown = process.shutdown;
var Peer = function (peerInfo) {
process.EventEmitter.call();
var self = this;
self.fd = peerInfo.fd;
self.remoteAddress = peerInfo.remoteAddress;
self.remotePort = peerInfo.remotePort;
self.readWatcher = new process.IOWatcher(function () {
debug(self.fd + " readable");
});
self.readWatcher.set(self.fd, true, false);
self.readWatcher.start();
self.writeWatcher = new process.IOWatcher(function () {
debug(self.fd + " writable");
});
self.writeWatcher.set(self.fd, false, true);
self.readable = true;
self.writable = true;
};
process.inherits(Peer, process.EventEmitter);
Peer.prototype.close = function () {
this.readable = false;
this.writable = false;
this.writeWatcher.stop();
this.readWatcher.stop();
close(this.fd);
debug("close peer " + this.fd);
this.fd = null;
};
var Server = function (listener) {
var self = this;
@ -10,7 +57,19 @@ var Server = function (listener) {
if (listener) {
self.addListener("connection", listener);
}
self.watcher = new process.IOWatcher(function (readable, writeable) {
debug("readable " + readable);
debug("writable " + writeable);
while (self.fd) {
debug("accept from " + self.fd);
var peerInfo = accept(self.fd);
debug("accept: " + JSON.stringify(peerInfo));
if (!peerInfo) return;
var peer = new Peer(peerInfo);
self.emit("connection", peer);
}
});
};
process.inherits(Server, process.EventEmitter);
@ -24,15 +83,7 @@ Server.prototype.listen = function (port, host) {
bind(self.fd, port, host);
listen(self.fd, 128); // TODO configurable backlog
self.watcher = new process.IOWatcher(self.fd, true, false, function () {
var peerInfo;
while (self.fd) {
peerInfo = accept(self.fd);
if (peerInfo === null) return;
self.emit("connection", peerInfo);
}
});
self.watcher.set(self.fd, true, false);
self.watcher.start();
};
@ -41,15 +92,20 @@ Server.prototype.close = function () {
if (!self.fd) throw new Error("Not running");
self.watcher.stop();
close(self.fd);
this.watcher = null;
this.fd = null;
self.fd = null;
};
///////////////////////////////////////////////////////
var sys = require("sys");
var server = new Server(function () {
sys.puts("connection");
server.close();
var server = new Server(function (peer) {
sys.puts("connection (" + peer.fd + "): "
+ peer.remoteAddress
+ " port "
+ peer.remotePort
);
sys.puts("server fd: " + server.fd);
peer.close();
});
server.listen(8000);
sys.puts("server fd: " + server.fd);