diff --git a/src/env-inl.h b/src/env-inl.h index 917ddd1b6bc..f115656353c 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -371,6 +371,15 @@ inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) { }); } +void Environment::IncreaseWaitingRequestCounter() { + request_waiting_++; +} + +void Environment::DecreaseWaitingRequestCounter() { + request_waiting_--; + CHECK_GE(request_waiting_, 0); +} + inline uv_loop_t* Environment::event_loop() const { return isolate_data()->event_loop(); } diff --git a/src/env.cc b/src/env.cc index 6526c680ac1..e5b9c0fd6aa 100644 --- a/src/env.cc +++ b/src/env.cc @@ -107,7 +107,6 @@ Environment::Environment(IsolateData* isolate_data, #if HAVE_INSPECTOR inspector_agent_(new inspector::Agent(this)), #endif - handle_cleanup_waiting_(0), http_parser_buffer_(nullptr), fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2), context_(context->GetIsolate(), context) { @@ -241,8 +240,11 @@ void Environment::CleanupHandles() { hc.cb_(this, hc.handle_, hc.arg_); handle_cleanup_queue_.clear(); - while (handle_cleanup_waiting_ != 0 || !handle_wrap_queue_.IsEmpty()) + while (handle_cleanup_waiting_ != 0 || + request_waiting_ != 0 || + !handle_wrap_queue_.IsEmpty()) { uv_run(event_loop(), UV_RUN_ONCE); + } } void Environment::StartProfilerIdleNotifier() { diff --git a/src/env.h b/src/env.h index 79351666c11..de3014249ea 100644 --- a/src/env.h +++ b/src/env.h @@ -601,6 +601,9 @@ class Environment { inline uv_check_t* immediate_check_handle(); inline uv_idle_t* immediate_idle_handle(); + inline void IncreaseWaitingRequestCounter(); + inline void DecreaseWaitingRequestCounter(); + inline AsyncHooks* async_hooks(); inline ImmediateInfo* immediate_info(); inline TickInfo* tick_info(); @@ -833,7 +836,8 @@ class Environment { HandleWrapQueue handle_wrap_queue_; ReqWrapQueue req_wrap_queue_; std::list handle_cleanup_queue_; - int handle_cleanup_waiting_; + int handle_cleanup_waiting_ = 0; + int request_waiting_ = 0; double* heap_statistics_buffer_ = nullptr; double* heap_space_statistics_buffer_ = nullptr; diff --git a/src/node_api.cc b/src/node_api.cc index d5437d70d93..91a47a12d92 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3388,6 +3388,9 @@ class Work : public node::AsyncResource { // Establish a handle scope here so that every callback doesn't have to. // Also it is needed for the exception-handling below. v8::HandleScope scope(env->isolate); + node::Environment* env_ = node::Environment::GetCurrent(env->isolate); + env_->DecreaseWaitingRequestCounter(); + CallbackScope callback_scope(work); NAPI_CALL_INTO_MODULE(env, @@ -3488,6 +3491,8 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) { uvimpl::Work* w = reinterpret_cast(work); + node::Environment* env_ = node::Environment::GetCurrent(env->isolate); + env_->IncreaseWaitingRequestCounter(); CALL_UV(env, uv_queue_work(event_loop, w->Request(), uvimpl::Work::ExecuteCallback, diff --git a/src/node_crypto.cc b/src/node_crypto.cc index f611f81f16a..10e4f593914 100644 --- a/src/node_crypto.cc +++ b/src/node_crypto.cc @@ -4639,9 +4639,12 @@ void PBKDF2Request::After() { void PBKDF2Request::After(uv_work_t* work_req, int status) { - CHECK_EQ(status, 0); std::unique_ptr req( ContainerOf(&PBKDF2Request::work_req_, work_req)); + req->env()->DecreaseWaitingRequestCounter(); + if (status == UV_ECANCELED) + return; + CHECK_EQ(status, 0); req->After(); } @@ -4692,6 +4695,7 @@ void PBKDF2(const FunctionCallbackInfo& args) { if (args[5]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[5]).FromJust(); + env->IncreaseWaitingRequestCounter(); uv_queue_work(env->event_loop(), req.release()->work_req(), PBKDF2Request::Work, @@ -4831,10 +4835,13 @@ void RandomBytesCheck(RandomBytesRequest* req, Local (*argv)[2]) { void RandomBytesAfter(uv_work_t* work_req, int status) { - CHECK_EQ(status, 0); std::unique_ptr req( ContainerOf(&RandomBytesRequest::work_req_, work_req)); Environment* env = req->env(); + env->DecreaseWaitingRequestCounter(); + if (status == UV_ECANCELED) + return; + CHECK_EQ(status, 0); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); Local argv[2]; @@ -4874,6 +4881,7 @@ void RandomBytes(const FunctionCallbackInfo& args) { if (args[1]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[1]).FromJust(); + env->IncreaseWaitingRequestCounter(); uv_queue_work(env->event_loop(), req.release()->work_req(), RandomBytesWork, @@ -4913,6 +4921,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo& args) { if (args[3]->IsFunction()) { obj->Set(env->context(), env->ondone_string(), args[3]).FromJust(); + env->IncreaseWaitingRequestCounter(); uv_queue_work(env->event_loop(), req.release()->work_req(), RandomBytesWork, diff --git a/src/node_zlib.cc b/src/node_zlib.cc index ec447638e2a..3249905dfbf 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -214,6 +214,7 @@ class ZCtx : public AsyncWrap { } // async version + env->IncreaseWaitingRequestCounter(); uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After); } @@ -361,10 +362,17 @@ class ZCtx : public AsyncWrap { // v8 land! static void After(uv_work_t* work_req, int status) { - CHECK_EQ(status, 0); - ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req); Environment* env = ctx->env(); + ctx->write_in_progress_ = false; + + env->DecreaseWaitingRequestCounter(); + if (status == UV_ECANCELED) { + ctx->Close(); + return; + } + + CHECK_EQ(status, 0); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); @@ -374,7 +382,6 @@ class ZCtx : public AsyncWrap { ctx->write_result_[0] = ctx->strm_.avail_out; ctx->write_result_[1] = ctx->strm_.avail_in; - ctx->write_in_progress_ = false; // call the write() cb Local cb = PersistentToLocal(env->isolate(), diff --git a/src/req_wrap-inl.h b/src/req_wrap-inl.h index 54abf74430f..e3b26c1f5c6 100644 --- a/src/req_wrap-inl.h +++ b/src/req_wrap-inl.h @@ -20,6 +20,8 @@ ReqWrap::ReqWrap(Environment* env, // FIXME(bnoordhuis) The fact that a reinterpret_cast is needed is // arguably a good indicator that there should be more than one queue. env->req_wrap_queue()->PushBack(reinterpret_cast*>(this)); + + Reset(); } template @@ -33,6 +35,12 @@ void ReqWrap::Dispatched() { req_.data = this; } +template +void ReqWrap::Reset() { + original_callback_ = nullptr; + req_.data = nullptr; +} + template ReqWrap* ReqWrap::from_req(T* req) { return ContainerOf(&ReqWrap::req_, req); @@ -40,7 +48,8 @@ ReqWrap* ReqWrap::from_req(T* req) { template void ReqWrap::Cancel() { - uv_cancel(reinterpret_cast(&req_)); + if (req_.data == this) // Only cancel if already dispatched. + uv_cancel(reinterpret_cast(&req_)); } // Below is dark template magic designed to invoke libuv functions that @@ -95,7 +104,7 @@ struct CallLibuvFunction { template struct MakeLibuvRequestCallback { static T For(ReqWrap* req_wrap, T v) { - static_assert(!std::is_function::value, + static_assert(!is_callable::value, "MakeLibuvRequestCallback missed a callback"); return v; } @@ -109,6 +118,7 @@ struct MakeLibuvRequestCallback { static void Wrapper(ReqT* req, Args... args) { ReqWrap* req_wrap = ContainerOf(&ReqWrap::req_, req); + req_wrap->env()->DecreaseWaitingRequestCounter(); F original_callback = reinterpret_cast(req_wrap->original_callback_); original_callback(req, args...); } @@ -128,23 +138,26 @@ int ReqWrap::Dispatch(LibuvFunction fn, Args... args) { // This expands as: // - // return fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...) - // ^ ^ ^ - // | | | - // \-- Omitted if `fn` has no | | - // first `uv_loop_t*` argument | | - // | | - // A function callback whose first argument | | - // matches the libuv request type is replaced ---/ | - // by the `Wrapper` method defined above | - // | - // Other (non-function) arguments are passed -----/ - // through verbatim - return CallLibuvFunction::Call( + // int err = fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...) + // ^ ^ ^ + // | | | + // \-- Omitted if `fn` has no | | + // first `uv_loop_t*` argument | | + // | | + // A function callback whose first argument | | + // matches the libuv request type is replaced ---/ | + // by the `Wrapper` method defined above | + // | + // Other (non-function) arguments are passed -----/ + // through verbatim + int err = CallLibuvFunction::Call( fn, env()->event_loop(), req(), MakeLibuvRequestCallback::For(this, args)...); + if (err >= 0) + env()->IncreaseWaitingRequestCounter(); + return err; } } // namespace node diff --git a/src/req_wrap.h b/src/req_wrap.h index d1818172182..8f8d0cf2885 100644 --- a/src/req_wrap.h +++ b/src/req_wrap.h @@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap { // Call this after the req has been dispatched, if that did not already // happen by using Dispatch(). inline void Dispatched(); + // Call this after a request has finished, if re-using this object is planned. + inline void Reset(); T* req() { return &req_; } inline void Cancel(); diff --git a/src/util.h b/src/util.h index 7a1c6c109fd..2c66104e9d2 100644 --- a/src/util.h +++ b/src/util.h @@ -447,8 +447,16 @@ struct MallocedBuffer { MallocedBuffer& operator=(const MallocedBuffer&) = delete; }; -} // namespace node +// Test whether some value can be called with (). +template +struct is_callable : std::is_function { }; +template +struct is_callable::value + >::type> : std::true_type { }; + +} // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS