stream_wrap: introduce StreamWrapCallbacks

StreamWrapCallbacks is a helper class for incepting into uv_stream_t*
management process.
This commit is contained in:
Fedor Indutny 2013-06-11 12:55:49 +02:00
parent 6978e998ee
commit 4c48a39c65
2 changed files with 219 additions and 122 deletions

View File

@ -56,23 +56,6 @@ using v8::String;
using v8::TryCatch;
using v8::Value;
typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
class WriteWrap: public ReqWrap<uv_write_t> {
public:
void* operator new(size_t size, char* storage) { return storage; }
// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { assert(0); }
protected:
// People should not be using the non-placement new and delete operator on a
// WriteWrap. Ensure this never happens.
void* operator new (size_t size) { assert(0); };
void operator delete(void* ptr) { assert(0); };
};
static Persistent<String> buffer_sym;
static Persistent<String> bytes_sym;
@ -110,8 +93,10 @@ void StreamWrap::Initialize(Handle<Object> target) {
StreamWrap::StreamWrap(Handle<Object> object, uv_stream_t* stream)
: HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)) {
: HandleWrap(object, reinterpret_cast<uv_handle_t*>(stream)),
default_callbacks_(this) {
stream_ = stream;
callbacks_ = &default_callbacks_;
}
@ -173,8 +158,8 @@ Handle<Value> StreamWrap::ReadStop(const Arguments& args) {
uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
assert(wrap->stream_ == reinterpret_cast<uv_stream_t*>(handle));
char* buf = slab_allocator->Allocate(wrap->object_, suggested_size);
return uv_buf_init(buf, suggested_size);
return wrap->callbacks_->DoAlloc(handle, suggested_size);
}
@ -200,8 +185,10 @@ static Local<Object> AcceptHandle(uv_stream_t* pipe) {
}
void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending) {
void StreamWrap::OnReadCommon(uv_stream_t* handle,
ssize_t nread,
uv_buf_t buf,
uv_handle_type pending) {
HandleScope scope(node_isolate);
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
@ -210,56 +197,16 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
// uv_close() on the handle.
assert(wrap->object_.IsEmpty() == false);
if (nread < 0) {
// If libuv reports an error or EOF it *may* give us a buffer back. In that
// case, return the space to the slab.
if (buf.base != NULL) {
slab_allocator->Shrink(wrap->object_, buf.base, 0);
if (nread > 0) {
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_RECV(nread);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_RECV(nread);
}
SetErrno(uv_last_error(uv_default_loop()));
MakeCallback(wrap->object_, onread_sym, 0, NULL);
return;
}
assert(buf.base != NULL);
Local<Object> slab = slab_allocator->Shrink(wrap->object_,
buf.base,
nread);
if (nread == 0) return;
assert(static_cast<size_t>(nread) <= buf.len);
int argc = 3;
Local<Value> argv[4] = {
slab,
Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate),
Integer::NewFromUnsigned(nread, node_isolate)
};
Local<Object> pending_obj;
if (pending == UV_TCP) {
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
} else if (pending == UV_NAMED_PIPE) {
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
} else {
assert(pending == UV_UNKNOWN_HANDLE);
}
if (!pending_obj.IsEmpty()) {
argv[3] = pending_obj;
argc++;
}
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_RECV(nread);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_RECV(nread);
}
MakeCallback(wrap->object_, onread_sym, argc, argv);
wrap->callbacks_->DoRead(handle, nread, buf, pending);
}
@ -294,37 +241,29 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
assert(args.Length() >= 1 && Buffer::HasInstance(args[0]));
size_t length = Buffer::Length(args[0]);
char* storage = new char[sizeof(WriteWrap)];
WriteWrap* req_wrap = new (storage) WriteWrap();
WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
req_wrap->object_->SetHiddenValue(buffer_sym, args[0]);
uv_buf_t buf;
WriteBuffer(args[0], &buf);
int r = uv_write(&req_wrap->req_,
wrap->stream_,
&buf,
1,
StreamWrap::AfterWrite);
int r = wrap->callbacks_->DoWrite(req_wrap,
&buf,
1,
NULL,
StreamWrap::AfterWrite);
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym,
Integer::NewFromUnsigned(length, node_isolate));
wrap->UpdateWriteQueueSize();
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(length);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(length);
}
return scope.Close(req_wrap->object_);
}
}
@ -359,7 +298,7 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
}
char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
WriteWrap* req_wrap = new (storage) WriteWrap();
WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
char* data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
@ -378,12 +317,11 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
reinterpret_cast<uv_pipe_t*>(wrap->stream_)->ipc;
if (!ipc_pipe) {
r = uv_write(&req_wrap->req_,
wrap->stream_,
&buf,
1,
StreamWrap::AfterWrite);
r = wrap->callbacks_->DoWrite(req_wrap,
&buf,
1,
NULL,
StreamWrap::AfterWrite);
} else {
uv_handle_t* send_handle = NULL;
@ -403,31 +341,22 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
req_wrap->object_->Set(handle_sym, send_handle_obj);
}
r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
r = wrap->callbacks_->DoWrite(req_wrap,
&buf,
1,
reinterpret_cast<uv_stream_t*>(send_handle),
StreamWrap::AfterWrite);
}
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size));
wrap->UpdateWriteQueueSize();
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(buf.len);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(buf.len);
}
return scope.Close(req_wrap->object_);
}
}
@ -483,7 +412,7 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
storage_size += sizeof(WriteWrap);
char* storage = new char[storage_size];
WriteWrap* req_wrap = new (storage) WriteWrap();
WriteWrap* req_wrap = new (storage) WriteWrap(wrap);
uint32_t bytes = 0;
size_t offset = sizeof(WriteWrap);
@ -513,11 +442,11 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
bytes += str_size;
}
int r = uv_write(&req_wrap->req_,
wrap->stream_,
bufs,
count,
StreamWrap::AfterWrite);
int r = wrap->callbacks_->DoWrite(req_wrap,
bufs,
count,
NULL,
StreamWrap::AfterWrite);
// Deallocate space
if (bufs != bufs_)
@ -526,20 +455,12 @@ Handle<Value> StreamWrap::Writev(const Arguments& args) {
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New(bytes));
wrap->UpdateWriteQueueSize();
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(bytes);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(bytes);
}
return scope.Close(req_wrap->object_);
}
}
@ -561,8 +482,8 @@ Handle<Value> StreamWrap::WriteUcs2String(const Arguments& args) {
void StreamWrap::AfterWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = (WriteWrap*) req->data;
StreamWrap* wrap = (StreamWrap*) req->handle->data;
WriteWrap* req_wrap = container_of(req, WriteWrap, req_);
StreamWrap* wrap = req_wrap->wrap_;
HandleScope scope(node_isolate);
@ -579,7 +500,7 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) {
SetErrno(uv_last_error(uv_default_loop()));
}
wrap->UpdateWriteQueueSize();
wrap->callbacks_->AfterWrite(req_wrap);
Local<Value> argv[] = {
Integer::New(status, node_isolate),
@ -601,7 +522,7 @@ Handle<Value> StreamWrap::Shutdown(const Arguments& args) {
ShutdownWrap* req_wrap = new ShutdownWrap();
int r = uv_shutdown(&req_wrap->req_, wrap->stream_, AfterShutdown);
int r = wrap->callbacks_->DoShutdown(req_wrap, AfterShutdown);
req_wrap->Dispatched();
@ -641,4 +562,103 @@ void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
}
int StreamWrapCallbacks::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb) {
int r;
if (send_handle == NULL) {
r = uv_write(&w->req_, wrap_->stream_, bufs, count, cb);
} else {
r = uv_write2(&w->req_, wrap_->stream_, bufs, count, send_handle, cb);
}
if (!r) {
size_t bytes = 0;
for (int i = 0; i < count; i++)
bytes += bufs[i].len;
if (wrap_->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(bytes);
} else if (wrap_->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(bytes);
}
}
wrap_->UpdateWriteQueueSize();
return r;
}
void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
wrap_->UpdateWriteQueueSize();
}
uv_buf_t StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
size_t suggested_size) {
char* buf = slab_allocator->Allocate(wrap_->object_, suggested_size);
return uv_buf_init(buf, suggested_size);
}
void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
ssize_t nread,
uv_buf_t buf,
uv_handle_type pending) {
HandleScope scope(node_isolate);
if (nread < 0) {
// If libuv reports an error or EOF it *may* give us a buffer back. In that
// case, return the space to the slab.
if (buf.base != NULL)
slab_allocator->Shrink(Self(), buf.base, 0);
SetErrno(uv_last_error(uv_default_loop()));
MakeCallback(Self(), onread_sym, 0, NULL);
return;
}
Local<Object> slab = slab_allocator->Shrink(wrap_->object_, buf.base, nread);
if (nread == 0) return;
assert(static_cast<size_t>(nread) <= buf.len);
int argc = 3;
Local<Value> argv[4] = {
slab,
Integer::NewFromUnsigned(buf.base - Buffer::Data(slab), node_isolate),
Integer::NewFromUnsigned(nread, node_isolate)
};
Local<Object> pending_obj;
if (pending == UV_TCP) {
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(handle);
} else if (pending == UV_NAMED_PIPE) {
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(handle);
} else if (pending == UV_UDP) {
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(handle);
} else {
assert(pending == UV_UNKNOWN_HANDLE);
}
if (!pending_obj.IsEmpty()) {
argv[3] = pending_obj;
argc++;
}
MakeCallback(wrap_->object_, onread_sym, argc, argv);
}
int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
return uv_shutdown(&req_wrap->req_, wrap_->stream_, cb);
}
Handle<Object> StreamWrapCallbacks::Self() {
return wrap_->object_;
}
}

View File

@ -25,18 +25,83 @@
#include "v8.h"
#include "node.h"
#include "handle_wrap.h"
#include "req_wrap.h"
#include "string_bytes.h"
namespace node {
// Forward declaration
class WriteWrap;
class StreamWrap;
typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
class WriteWrap: public ReqWrap<uv_write_t> {
public:
explicit WriteWrap(StreamWrap* wrap) {
wrap_ = wrap;
}
void* operator new(size_t size, char* storage) { return storage; }
// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { assert(0); }
StreamWrap* wrap_;
protected:
// People should not be using the non-placement new and delete operator on a
// WriteWrap. Ensure this never happens.
void* operator new(size_t size) { assert(0); };
void operator delete(void* ptr) { assert(0); };
};
// Overridable callbacks' types
class StreamWrapCallbacks {
public:
explicit StreamWrapCallbacks(StreamWrap* wrap) : wrap_(wrap) {
}
explicit StreamWrapCallbacks(StreamWrapCallbacks* old) : wrap_(old->wrap_) {
}
virtual ~StreamWrapCallbacks() {
}
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
uv_write_cb cb);
virtual void AfterWrite(WriteWrap* w);
virtual uv_buf_t DoAlloc(uv_handle_t* handle, size_t suggested_size);
virtual void DoRead(uv_stream_t* handle,
ssize_t nread,
uv_buf_t buf,
uv_handle_type pending);
virtual int DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb);
v8::Handle<v8::Object> Self();
protected:
StreamWrap* wrap_;
};
class StreamWrap : public HandleWrap {
public:
uv_stream_t* GetStream() { return stream_; }
void OverrideCallbacks(StreamWrapCallbacks* callbacks) {
StreamWrapCallbacks* old = callbacks_;
callbacks_ = callbacks;
if (old != &default_callbacks_)
delete old;
}
StreamWrapCallbacks* GetCallbacks() {
return callbacks_;
}
static void Initialize(v8::Handle<v8::Object> target);
static v8::Handle<v8::Value> GetFD(v8::Local<v8::String>,
@ -53,10 +118,19 @@ class StreamWrap : public HandleWrap {
static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);
// Overridable callbacks
StreamWrapCallbacks* callbacks_;
protected:
static size_t WriteBuffer(v8::Handle<v8::Value> val, uv_buf_t* buf);
StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
~StreamWrap() {
if (callbacks_ != &default_callbacks_) {
delete callbacks_;
callbacks_ = NULL;
}
}
void StateChange() { }
void UpdateWriteQueueSize();
@ -79,6 +153,9 @@ class StreamWrap : public HandleWrap {
size_t slab_offset_;
uv_stream_t* stream_;
StreamWrapCallbacks default_callbacks_;
friend class StreamWrapCallbacks;
};