src: simplify handles for libuv streams
Instead of passing along the handle object, just set it as a property on the stream handle object and let the read handler grab it from there. PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
7c4b09b24b
commit
5898dc3d0c
@ -465,7 +465,10 @@ function setupChannel(target, channel) {
|
|||||||
var jsonBuffer = '';
|
var jsonBuffer = '';
|
||||||
var pendingHandle = null;
|
var pendingHandle = null;
|
||||||
channel.buffering = false;
|
channel.buffering = false;
|
||||||
channel.onread = function(nread, pool, recvHandle) {
|
channel.pendingHandle = null;
|
||||||
|
channel.onread = function(nread, pool) {
|
||||||
|
const recvHandle = channel.pendingHandle;
|
||||||
|
channel.pendingHandle = null;
|
||||||
// TODO(bnoordhuis) Check that nread > 0.
|
// TODO(bnoordhuis) Check that nread > 0.
|
||||||
if (pool) {
|
if (pool) {
|
||||||
if (recvHandle)
|
if (recvHandle)
|
||||||
|
@ -210,6 +210,7 @@ class ModuleWrap;
|
|||||||
V(owner_string, "owner") \
|
V(owner_string, "owner") \
|
||||||
V(parse_error_string, "Parse Error") \
|
V(parse_error_string, "Parse Error") \
|
||||||
V(path_string, "path") \
|
V(path_string, "path") \
|
||||||
|
V(pending_handle_string, "pendingHandle") \
|
||||||
V(pbkdf2_error_string, "PBKDF2 Error") \
|
V(pbkdf2_error_string, "PBKDF2 Error") \
|
||||||
V(pid_string, "pid") \
|
V(pid_string, "pid") \
|
||||||
V(pipe_string, "pipe") \
|
V(pipe_string, "pipe") \
|
||||||
|
@ -33,9 +33,7 @@ inline StreamListener::~StreamListener() {
|
|||||||
|
|
||||||
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
|
inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
|
||||||
CHECK_NE(previous_listener_, nullptr);
|
CHECK_NE(previous_listener_, nullptr);
|
||||||
previous_listener_->OnStreamRead(nread,
|
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
|
||||||
uv_buf_init(nullptr, 0),
|
|
||||||
UV_UNKNOWN_HANDLE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -85,12 +83,10 @@ inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
|
|||||||
return listener_->OnStreamAlloc(suggested_size);
|
return listener_->OnStreamAlloc(suggested_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void StreamResource::EmitRead(ssize_t nread,
|
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
|
||||||
const uv_buf_t& buf,
|
|
||||||
uv_handle_type pending) {
|
|
||||||
if (nread > 0)
|
if (nread > 0)
|
||||||
bytes_read_ += static_cast<uint64_t>(nread);
|
bytes_read_ += static_cast<uint64_t>(nread);
|
||||||
listener_->OnStreamRead(nread, buf, pending);
|
listener_->OnStreamRead(nread, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
|
inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
|
||||||
|
@ -437,23 +437,17 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamBase::CallJSOnreadMethod(ssize_t nread,
|
void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
|
||||||
Local<Object> buf,
|
|
||||||
Local<Object> handle) {
|
|
||||||
Environment* env = env_;
|
Environment* env = env_;
|
||||||
|
|
||||||
Local<Value> argv[] = {
|
Local<Value> argv[] = {
|
||||||
Integer::New(env->isolate(), nread),
|
Integer::New(env->isolate(), nread),
|
||||||
buf,
|
buf
|
||||||
handle
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (argv[1].IsEmpty())
|
if (argv[1].IsEmpty())
|
||||||
argv[1] = Undefined(env->isolate());
|
argv[1] = Undefined(env->isolate());
|
||||||
|
|
||||||
if (argv[2].IsEmpty())
|
|
||||||
argv[2] = Undefined(env->isolate());
|
|
||||||
|
|
||||||
AsyncWrap* wrap = GetAsyncWrap();
|
AsyncWrap* wrap = GetAsyncWrap();
|
||||||
CHECK_NE(wrap, nullptr);
|
CHECK_NE(wrap, nullptr);
|
||||||
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
|
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
|
||||||
@ -495,19 +489,6 @@ uv_buf_t StreamListener::OnStreamAlloc(size_t suggested_size) {
|
|||||||
return uv_buf_init(Malloc(suggested_size), suggested_size);
|
return uv_buf_init(Malloc(suggested_size), suggested_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
|
|
||||||
// This cannot be virtual because it is just as valid to override the other
|
|
||||||
// OnStreamRead() callback.
|
|
||||||
CHECK(0 && "OnStreamRead() needs to be implemented");
|
|
||||||
}
|
|
||||||
|
|
||||||
void StreamListener::OnStreamRead(ssize_t nread,
|
|
||||||
const uv_buf_t& buf,
|
|
||||||
uv_handle_type pending) {
|
|
||||||
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
|
|
||||||
OnStreamRead(nread, buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
|
void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
|
||||||
CHECK_NE(stream_, nullptr);
|
CHECK_NE(stream_, nullptr);
|
||||||
|
@ -150,17 +150,8 @@ class StreamListener {
|
|||||||
// with base nullpptr in case of an error.
|
// with base nullpptr in case of an error.
|
||||||
// `nread` is the number of read bytes (which is at most the buffer length),
|
// `nread` is the number of read bytes (which is at most the buffer length),
|
||||||
// or, if negative, a libuv error code.
|
// or, if negative, a libuv error code.
|
||||||
// The variant with a `uv_handle_type` argument is used by libuv-backed
|
|
||||||
// streams for handle transfers (e.g. passing net.Socket instances between
|
|
||||||
// cluster workers). For all other streams, overriding the simple variant
|
|
||||||
// should be sufficient.
|
|
||||||
// By default, the second variant crashes if `pending` is set and otherwise
|
|
||||||
// calls the simple variant.
|
|
||||||
virtual void OnStreamRead(ssize_t nread,
|
virtual void OnStreamRead(ssize_t nread,
|
||||||
const uv_buf_t& buf) = 0;
|
const uv_buf_t& buf) = 0;
|
||||||
virtual void OnStreamRead(ssize_t nread,
|
|
||||||
const uv_buf_t& buf,
|
|
||||||
uv_handle_type pending);
|
|
||||||
|
|
||||||
// This is called once a Write has finished. `status` may be 0 or,
|
// This is called once a Write has finished. `status` may be 0 or,
|
||||||
// if negative, a libuv error code.
|
// if negative, a libuv error code.
|
||||||
@ -229,9 +220,7 @@ class StreamResource {
|
|||||||
uv_buf_t EmitAlloc(size_t suggested_size);
|
uv_buf_t EmitAlloc(size_t suggested_size);
|
||||||
// Call the current listener's OnStreamRead() method and update the
|
// Call the current listener's OnStreamRead() method and update the
|
||||||
// stream's read byte counter.
|
// stream's read byte counter.
|
||||||
void EmitRead(ssize_t nread,
|
void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
|
||||||
const uv_buf_t& buf = uv_buf_init(nullptr, 0),
|
|
||||||
uv_handle_type pending = UV_UNKNOWN_HANDLE);
|
|
||||||
// Call the current listener's OnStreamAfterWrite() method.
|
// Call the current listener's OnStreamAfterWrite() method.
|
||||||
void EmitAfterWrite(WriteWrap* w, int status);
|
void EmitAfterWrite(WriteWrap* w, int status);
|
||||||
|
|
||||||
@ -260,10 +249,7 @@ class StreamBase : public StreamResource {
|
|||||||
virtual bool IsIPCPipe();
|
virtual bool IsIPCPipe();
|
||||||
virtual int GetFD();
|
virtual int GetFD();
|
||||||
|
|
||||||
void CallJSOnreadMethod(
|
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
|
||||||
ssize_t nread,
|
|
||||||
v8::Local<v8::Object> buf,
|
|
||||||
v8::Local<v8::Object> handle = v8::Local<v8::Object>());
|
|
||||||
|
|
||||||
// These are called by the respective {Write,Shutdown}Wrap class.
|
// These are called by the respective {Write,Shutdown}Wrap class.
|
||||||
virtual void AfterShutdown(ShutdownWrap* req, int status);
|
virtual void AfterShutdown(ShutdownWrap* req, int status);
|
||||||
|
@ -93,7 +93,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
|
|||||||
provider),
|
provider),
|
||||||
StreamBase(env),
|
StreamBase(env),
|
||||||
stream_(stream) {
|
stream_(stream) {
|
||||||
PushStreamListener(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -146,7 +145,13 @@ bool LibuvStreamWrap::IsIPCPipe() {
|
|||||||
|
|
||||||
|
|
||||||
int LibuvStreamWrap::ReadStart() {
|
int LibuvStreamWrap::ReadStart() {
|
||||||
return uv_read_start(stream(), OnAlloc, OnRead);
|
return uv_read_start(stream(), [](uv_handle_t* handle,
|
||||||
|
size_t suggested_size,
|
||||||
|
uv_buf_t* buf) {
|
||||||
|
static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
|
||||||
|
}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -155,16 +160,11 @@ int LibuvStreamWrap::ReadStop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::OnAlloc(uv_handle_t* handle,
|
void LibuvStreamWrap::OnUvAlloc(size_t suggested_size, uv_buf_t* buf) {
|
||||||
size_t suggested_size,
|
HandleScope scope(env()->isolate());
|
||||||
uv_buf_t* buf) {
|
Context::Scope context_scope(env()->context());
|
||||||
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
|
|
||||||
HandleScope scope(wrap->env()->isolate());
|
|
||||||
Context::Scope context_scope(wrap->env()->context());
|
|
||||||
|
|
||||||
CHECK_EQ(wrap->stream(), reinterpret_cast<uv_stream_t*>(handle));
|
*buf = EmitAlloc(suggested_size);
|
||||||
|
|
||||||
*buf = wrap->EmitAlloc(suggested_size);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -190,64 +190,47 @@ static Local<Object> AcceptHandle(Environment* env, LibuvStreamWrap* parent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::OnStreamRead(ssize_t nread,
|
void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) {
|
||||||
const uv_buf_t& buf,
|
HandleScope scope(env()->isolate());
|
||||||
uv_handle_type pending) {
|
|
||||||
HandleScope handle_scope(env()->isolate());
|
|
||||||
Context::Scope context_scope(env()->context());
|
Context::Scope context_scope(env()->context());
|
||||||
|
|
||||||
if (nread <= 0) {
|
|
||||||
free(buf.base);
|
|
||||||
if (nread < 0)
|
|
||||||
CallJSOnreadMethod(nread, Local<Object>());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
CHECK_LE(static_cast<size_t>(nread), buf.len);
|
|
||||||
|
|
||||||
Local<Object> pending_obj;
|
|
||||||
|
|
||||||
if (pending == UV_TCP) {
|
|
||||||
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
|
|
||||||
} else if (pending == UV_NAMED_PIPE) {
|
|
||||||
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
|
|
||||||
} else if (pending == UV_UDP) {
|
|
||||||
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
|
|
||||||
} else {
|
|
||||||
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
Local<Object> obj = Buffer::New(env(), buf.base, nread).ToLocalChecked();
|
|
||||||
CallJSOnreadMethod(nread, obj, pending_obj);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::OnRead(uv_stream_t* handle,
|
|
||||||
ssize_t nread,
|
|
||||||
const uv_buf_t* buf) {
|
|
||||||
LibuvStreamWrap* wrap = static_cast<LibuvStreamWrap*>(handle->data);
|
|
||||||
HandleScope scope(wrap->env()->isolate());
|
|
||||||
Context::Scope context_scope(wrap->env()->context());
|
|
||||||
uv_handle_type type = UV_UNKNOWN_HANDLE;
|
uv_handle_type type = UV_UNKNOWN_HANDLE;
|
||||||
|
|
||||||
if (wrap->is_named_pipe_ipc() &&
|
if (is_named_pipe_ipc() &&
|
||||||
uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
|
uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(stream())) > 0) {
|
||||||
type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
|
type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(stream()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We should not be getting this callback if someone as already called
|
// We should not be getting this callback if someone as already called
|
||||||
// uv_close() on the handle.
|
// uv_close() on the handle.
|
||||||
CHECK_EQ(wrap->persistent().IsEmpty(), false);
|
CHECK_EQ(persistent().IsEmpty(), false);
|
||||||
|
|
||||||
if (nread > 0) {
|
if (nread > 0) {
|
||||||
if (wrap->is_tcp()) {
|
if (is_tcp()) {
|
||||||
NODE_COUNT_NET_BYTES_RECV(nread);
|
NODE_COUNT_NET_BYTES_RECV(nread);
|
||||||
} else if (wrap->is_named_pipe()) {
|
} else if (is_named_pipe()) {
|
||||||
NODE_COUNT_PIPE_BYTES_RECV(nread);
|
NODE_COUNT_PIPE_BYTES_RECV(nread);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Local<Object> pending_obj;
|
||||||
|
|
||||||
|
if (type == UV_TCP) {
|
||||||
|
pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env(), this);
|
||||||
|
} else if (type == UV_NAMED_PIPE) {
|
||||||
|
pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env(), this);
|
||||||
|
} else if (type == UV_UDP) {
|
||||||
|
pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env(), this);
|
||||||
|
} else {
|
||||||
|
CHECK_EQ(type, UV_UNKNOWN_HANDLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pending_obj.IsEmpty()) {
|
||||||
|
object()->Set(env()->context(),
|
||||||
|
env()->pending_handle_string(),
|
||||||
|
pending_obj).FromJust();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wrap->EmitRead(nread, *buf, type);
|
EmitRead(nread, *buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -373,11 +356,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
|
|||||||
req_wrap->Done(status);
|
req_wrap->Done(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
|
|
||||||
StreamBase::AfterWrite(w, status);
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace node
|
} // namespace node
|
||||||
|
|
||||||
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,
|
NODE_BUILTIN_MODULE_CONTEXT_AWARE(stream_wrap,
|
||||||
|
@ -33,9 +33,7 @@
|
|||||||
|
|
||||||
namespace node {
|
namespace node {
|
||||||
|
|
||||||
class LibuvStreamWrap : public HandleWrap,
|
class LibuvStreamWrap : public HandleWrap, public StreamBase {
|
||||||
public StreamListener,
|
|
||||||
public StreamBase {
|
|
||||||
public:
|
public:
|
||||||
static void Initialize(v8::Local<v8::Object> target,
|
static void Initialize(v8::Local<v8::Object> target,
|
||||||
v8::Local<v8::Value> unused,
|
v8::Local<v8::Value> unused,
|
||||||
@ -93,30 +91,12 @@ class LibuvStreamWrap : public HandleWrap,
|
|||||||
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
|
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||||
|
|
||||||
// Callbacks for libuv
|
// Callbacks for libuv
|
||||||
static void OnAlloc(uv_handle_t* handle,
|
void OnUvAlloc(size_t suggested_size, uv_buf_t* buf);
|
||||||
size_t suggested_size,
|
void OnUvRead(ssize_t nread, const uv_buf_t* buf);
|
||||||
uv_buf_t* buf);
|
|
||||||
|
|
||||||
static void OnRead(uv_stream_t* handle,
|
|
||||||
ssize_t nread,
|
|
||||||
const uv_buf_t* buf);
|
|
||||||
static void AfterUvWrite(uv_write_t* req, int status);
|
static void AfterUvWrite(uv_write_t* req, int status);
|
||||||
static void AfterUvShutdown(uv_shutdown_t* req, int status);
|
static void AfterUvShutdown(uv_shutdown_t* req, int status);
|
||||||
|
|
||||||
// Resource interface implementation
|
|
||||||
void OnStreamRead(ssize_t nread,
|
|
||||||
const uv_buf_t& buf) override {
|
|
||||||
CHECK(0 && "must not be called");
|
|
||||||
}
|
|
||||||
void OnStreamRead(ssize_t nread,
|
|
||||||
const uv_buf_t& buf,
|
|
||||||
uv_handle_type pending) override;
|
|
||||||
void OnStreamAfterWrite(WriteWrap* w, int status) override {
|
|
||||||
previous_listener_->OnStreamAfterWrite(w, status);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AfterWrite(WriteWrap* req_wrap, int status) override;
|
|
||||||
|
|
||||||
uv_stream_t* const stream_;
|
uv_stream_t* const stream_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user