src: keep track of open requests
Workers cannot shut down while requests are open, so keep a counter that is increased whenever libuv requests are made and decreased whenever their callback is called. This also applies to other embedders, who may want to shut down an `Environment` instance early. Many thanks for Stephen Belanger for reviewing the original version of this commit in the Ayo.js project. Fixes: https://github.com/nodejs/node/issues/20517 Refs: https://github.com/ayojs/ayo/pull/85 PR-URL: https://github.com/nodejs/node/pull/19377 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
cac8496c2f
commit
1db0039c50
@ -371,6 +371,15 @@ inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) {
|
||||
});
|
||||
}
|
||||
|
||||
void Environment::IncreaseWaitingRequestCounter() {
|
||||
request_waiting_++;
|
||||
}
|
||||
|
||||
void Environment::DecreaseWaitingRequestCounter() {
|
||||
request_waiting_--;
|
||||
CHECK_GE(request_waiting_, 0);
|
||||
}
|
||||
|
||||
inline uv_loop_t* Environment::event_loop() const {
|
||||
return isolate_data()->event_loop();
|
||||
}
|
||||
|
@ -107,7 +107,6 @@ Environment::Environment(IsolateData* isolate_data,
|
||||
#if HAVE_INSPECTOR
|
||||
inspector_agent_(new inspector::Agent(this)),
|
||||
#endif
|
||||
handle_cleanup_waiting_(0),
|
||||
http_parser_buffer_(nullptr),
|
||||
fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2),
|
||||
context_(context->GetIsolate(), context) {
|
||||
@ -241,8 +240,11 @@ void Environment::CleanupHandles() {
|
||||
hc.cb_(this, hc.handle_, hc.arg_);
|
||||
handle_cleanup_queue_.clear();
|
||||
|
||||
while (handle_cleanup_waiting_ != 0 || !handle_wrap_queue_.IsEmpty())
|
||||
while (handle_cleanup_waiting_ != 0 ||
|
||||
request_waiting_ != 0 ||
|
||||
!handle_wrap_queue_.IsEmpty()) {
|
||||
uv_run(event_loop(), UV_RUN_ONCE);
|
||||
}
|
||||
}
|
||||
|
||||
void Environment::StartProfilerIdleNotifier() {
|
||||
|
@ -601,6 +601,9 @@ class Environment {
|
||||
inline uv_check_t* immediate_check_handle();
|
||||
inline uv_idle_t* immediate_idle_handle();
|
||||
|
||||
inline void IncreaseWaitingRequestCounter();
|
||||
inline void DecreaseWaitingRequestCounter();
|
||||
|
||||
inline AsyncHooks* async_hooks();
|
||||
inline ImmediateInfo* immediate_info();
|
||||
inline TickInfo* tick_info();
|
||||
@ -833,7 +836,8 @@ class Environment {
|
||||
HandleWrapQueue handle_wrap_queue_;
|
||||
ReqWrapQueue req_wrap_queue_;
|
||||
std::list<HandleCleanup> handle_cleanup_queue_;
|
||||
int handle_cleanup_waiting_;
|
||||
int handle_cleanup_waiting_ = 0;
|
||||
int request_waiting_ = 0;
|
||||
|
||||
double* heap_statistics_buffer_ = nullptr;
|
||||
double* heap_space_statistics_buffer_ = nullptr;
|
||||
|
@ -3388,6 +3388,9 @@ class Work : public node::AsyncResource {
|
||||
// Establish a handle scope here so that every callback doesn't have to.
|
||||
// Also it is needed for the exception-handling below.
|
||||
v8::HandleScope scope(env->isolate);
|
||||
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
|
||||
env_->DecreaseWaitingRequestCounter();
|
||||
|
||||
CallbackScope callback_scope(work);
|
||||
|
||||
NAPI_CALL_INTO_MODULE(env,
|
||||
@ -3488,6 +3491,8 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
|
||||
|
||||
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
|
||||
|
||||
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
|
||||
env_->IncreaseWaitingRequestCounter();
|
||||
CALL_UV(env, uv_queue_work(event_loop,
|
||||
w->Request(),
|
||||
uvimpl::Work::ExecuteCallback,
|
||||
|
@ -4639,9 +4639,12 @@ void PBKDF2Request::After() {
|
||||
|
||||
|
||||
void PBKDF2Request::After(uv_work_t* work_req, int status) {
|
||||
CHECK_EQ(status, 0);
|
||||
std::unique_ptr<PBKDF2Request> req(
|
||||
ContainerOf(&PBKDF2Request::work_req_, work_req));
|
||||
req->env()->DecreaseWaitingRequestCounter();
|
||||
if (status == UV_ECANCELED)
|
||||
return;
|
||||
CHECK_EQ(status, 0);
|
||||
req->After();
|
||||
}
|
||||
|
||||
@ -4692,6 +4695,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[5]->IsFunction()) {
|
||||
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();
|
||||
|
||||
env->IncreaseWaitingRequestCounter();
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
PBKDF2Request::Work,
|
||||
@ -4831,10 +4835,13 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {
|
||||
|
||||
|
||||
void RandomBytesAfter(uv_work_t* work_req, int status) {
|
||||
CHECK_EQ(status, 0);
|
||||
std::unique_ptr<RandomBytesRequest> req(
|
||||
ContainerOf(&RandomBytesRequest::work_req_, work_req));
|
||||
Environment* env = req->env();
|
||||
env->DecreaseWaitingRequestCounter();
|
||||
if (status == UV_ECANCELED)
|
||||
return;
|
||||
CHECK_EQ(status, 0);
|
||||
HandleScope handle_scope(env->isolate());
|
||||
Context::Scope context_scope(env->context());
|
||||
Local<Value> argv[2];
|
||||
@ -4874,6 +4881,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[1]->IsFunction()) {
|
||||
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();
|
||||
|
||||
env->IncreaseWaitingRequestCounter();
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
RandomBytesWork,
|
||||
@ -4913,6 +4921,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[3]->IsFunction()) {
|
||||
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
|
||||
|
||||
env->IncreaseWaitingRequestCounter();
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
RandomBytesWork,
|
||||
|
@ -214,6 +214,7 @@ class ZCtx : public AsyncWrap {
|
||||
}
|
||||
|
||||
// async version
|
||||
env->IncreaseWaitingRequestCounter();
|
||||
uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After);
|
||||
}
|
||||
|
||||
@ -361,10 +362,17 @@ class ZCtx : public AsyncWrap {
|
||||
|
||||
// v8 land!
|
||||
static void After(uv_work_t* work_req, int status) {
|
||||
CHECK_EQ(status, 0);
|
||||
|
||||
ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req);
|
||||
Environment* env = ctx->env();
|
||||
ctx->write_in_progress_ = false;
|
||||
|
||||
env->DecreaseWaitingRequestCounter();
|
||||
if (status == UV_ECANCELED) {
|
||||
ctx->Close();
|
||||
return;
|
||||
}
|
||||
|
||||
CHECK_EQ(status, 0);
|
||||
|
||||
HandleScope handle_scope(env->isolate());
|
||||
Context::Scope context_scope(env->context());
|
||||
@ -374,7 +382,6 @@ class ZCtx : public AsyncWrap {
|
||||
|
||||
ctx->write_result_[0] = ctx->strm_.avail_out;
|
||||
ctx->write_result_[1] = ctx->strm_.avail_in;
|
||||
ctx->write_in_progress_ = false;
|
||||
|
||||
// call the write() cb
|
||||
Local<Function> cb = PersistentToLocal(env->isolate(),
|
||||
|
@ -20,6 +20,8 @@ ReqWrap<T>::ReqWrap(Environment* env,
|
||||
// FIXME(bnoordhuis) The fact that a reinterpret_cast is needed is
|
||||
// arguably a good indicator that there should be more than one queue.
|
||||
env->req_wrap_queue()->PushBack(reinterpret_cast<ReqWrap<uv_req_t>*>(this));
|
||||
|
||||
Reset();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -33,6 +35,12 @@ void ReqWrap<T>::Dispatched() {
|
||||
req_.data = this;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void ReqWrap<T>::Reset() {
|
||||
original_callback_ = nullptr;
|
||||
req_.data = nullptr;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
|
||||
return ContainerOf(&ReqWrap<T>::req_, req);
|
||||
@ -40,7 +48,8 @@ ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
|
||||
|
||||
template <typename T>
|
||||
void ReqWrap<T>::Cancel() {
|
||||
uv_cancel(reinterpret_cast<uv_req_t*>(&req_));
|
||||
if (req_.data == this) // Only cancel if already dispatched.
|
||||
uv_cancel(reinterpret_cast<uv_req_t*>(&req_));
|
||||
}
|
||||
|
||||
// Below is dark template magic designed to invoke libuv functions that
|
||||
@ -95,7 +104,7 @@ struct CallLibuvFunction<ReqT, void(*)(ReqT*, Args...)> {
|
||||
template <typename ReqT, typename T>
|
||||
struct MakeLibuvRequestCallback {
|
||||
static T For(ReqWrap<ReqT>* req_wrap, T v) {
|
||||
static_assert(!std::is_function<T>::value,
|
||||
static_assert(!is_callable<T>::value,
|
||||
"MakeLibuvRequestCallback missed a callback");
|
||||
return v;
|
||||
}
|
||||
@ -109,6 +118,7 @@ struct MakeLibuvRequestCallback<ReqT, void(*)(ReqT*, Args...)> {
|
||||
|
||||
static void Wrapper(ReqT* req, Args... args) {
|
||||
ReqWrap<ReqT>* req_wrap = ContainerOf(&ReqWrap<ReqT>::req_, req);
|
||||
req_wrap->env()->DecreaseWaitingRequestCounter();
|
||||
F original_callback = reinterpret_cast<F>(req_wrap->original_callback_);
|
||||
original_callback(req, args...);
|
||||
}
|
||||
@ -128,23 +138,26 @@ int ReqWrap<T>::Dispatch(LibuvFunction fn, Args... args) {
|
||||
|
||||
// This expands as:
|
||||
//
|
||||
// return fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...)
|
||||
// ^ ^ ^
|
||||
// | | |
|
||||
// \-- Omitted if `fn` has no | |
|
||||
// first `uv_loop_t*` argument | |
|
||||
// | |
|
||||
// A function callback whose first argument | |
|
||||
// matches the libuv request type is replaced ---/ |
|
||||
// by the `Wrapper` method defined above |
|
||||
// |
|
||||
// Other (non-function) arguments are passed -----/
|
||||
// through verbatim
|
||||
return CallLibuvFunction<T, LibuvFunction>::Call(
|
||||
// int err = fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...)
|
||||
// ^ ^ ^
|
||||
// | | |
|
||||
// \-- Omitted if `fn` has no | |
|
||||
// first `uv_loop_t*` argument | |
|
||||
// | |
|
||||
// A function callback whose first argument | |
|
||||
// matches the libuv request type is replaced ---/ |
|
||||
// by the `Wrapper` method defined above |
|
||||
// |
|
||||
// Other (non-function) arguments are passed -----/
|
||||
// through verbatim
|
||||
int err = CallLibuvFunction<T, LibuvFunction>::Call(
|
||||
fn,
|
||||
env()->event_loop(),
|
||||
req(),
|
||||
MakeLibuvRequestCallback<T, Args>::For(this, args)...);
|
||||
if (err >= 0)
|
||||
env()->IncreaseWaitingRequestCounter();
|
||||
return err;
|
||||
}
|
||||
|
||||
} // namespace node
|
||||
|
@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
|
||||
// Call this after the req has been dispatched, if that did not already
|
||||
// happen by using Dispatch().
|
||||
inline void Dispatched();
|
||||
// Call this after a request has finished, if re-using this object is planned.
|
||||
inline void Reset();
|
||||
T* req() { return &req_; }
|
||||
inline void Cancel();
|
||||
|
||||
|
10
src/util.h
10
src/util.h
@ -447,8 +447,16 @@ struct MallocedBuffer {
|
||||
MallocedBuffer& operator=(const MallocedBuffer&) = delete;
|
||||
};
|
||||
|
||||
} // namespace node
|
||||
// Test whether some value can be called with ().
|
||||
template<typename T, typename = void>
|
||||
struct is_callable : std::is_function<T> { };
|
||||
|
||||
template<typename T>
|
||||
struct is_callable<T, typename std::enable_if<
|
||||
std::is_same<decltype(void(&T::operator())), void>::value
|
||||
>::type> : std::true_type { };
|
||||
|
||||
} // namespace node
|
||||
|
||||
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user