src: unify thread pool work

Instead of using the libuv mechanism directly, provide an internal
`ThreadPoolWork` wrapper that takes care of increasing/decreasing
the waiting request counter.

PR-URL: https://github.com/nodejs/node/pull/19377
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2018-05-09 17:40:24 +02:00
parent 2b3150466e
commit c072057049
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
4 changed files with 108 additions and 128 deletions

View File

@ -3338,7 +3338,7 @@ static napi_status ConvertUVErrorCode(int code) {
} }
// Wrapper around uv_work_t which calls user-provided callbacks. // Wrapper around uv_work_t which calls user-provided callbacks.
class Work : public node::AsyncResource { class Work : public node::AsyncResource, public node::ThreadPoolWork {
private: private:
explicit Work(napi_env env, explicit Work(napi_env env,
v8::Local<v8::Object> async_resource, v8::Local<v8::Object> async_resource,
@ -3349,15 +3349,14 @@ class Work : public node::AsyncResource {
: AsyncResource(env->isolate, : AsyncResource(env->isolate,
async_resource, async_resource,
*v8::String::Utf8Value(env->isolate, async_resource_name)), *v8::String::Utf8Value(env->isolate, async_resource_name)),
ThreadPoolWork(node::Environment::GetCurrent(env->isolate)),
_env(env), _env(env),
_data(data), _data(data),
_execute(execute), _execute(execute),
_complete(complete) { _complete(complete) {
memset(&_request, 0, sizeof(_request));
_request.data = this;
} }
~Work() { } virtual ~Work() { }
public: public:
static Work* New(napi_env env, static Work* New(napi_env env,
@ -3374,47 +3373,36 @@ class Work : public node::AsyncResource {
delete work; delete work;
} }
static void ExecuteCallback(uv_work_t* req) { void DoThreadPoolWork() override {
Work* work = static_cast<Work*>(req->data); _execute(_env, _data);
work->_execute(work->_env, work->_data);
} }
static void CompleteCallback(uv_work_t* req, int status) { void AfterThreadPoolWork(int status) {
Work* work = static_cast<Work*>(req->data); if (_complete == nullptr)
return;
if (work->_complete != nullptr) {
napi_env env = work->_env;
// Establish a handle scope here so that every callback doesn't have to. // Establish a handle scope here so that every callback doesn't have to.
// Also it is needed for the exception-handling below. // Also it is needed for the exception-handling below.
v8::HandleScope scope(env->isolate); v8::HandleScope scope(_env->isolate);
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
env_->DecreaseWaitingRequestCounter();
CallbackScope callback_scope(work); CallbackScope callback_scope(this);
NAPI_CALL_INTO_MODULE(env, NAPI_CALL_INTO_MODULE(_env,
work->_complete(env, ConvertUVErrorCode(status), work->_data), _complete(_env, ConvertUVErrorCode(status), _data),
[env] (v8::Local<v8::Value> local_err) { [this] (v8::Local<v8::Value> local_err) {
// If there was an unhandled exception in the complete callback, // If there was an unhandled exception in the complete callback,
// report it as a fatal exception. (There is no JavaScript on the // report it as a fatal exception. (There is no JavaScript on the
// callstack that can possibly handle it.) // callstack that can possibly handle it.)
v8impl::trigger_fatal_exception(env, local_err); v8impl::trigger_fatal_exception(_env, local_err);
}); });
// Note: Don't access `work` after this point because it was // Note: Don't access `work` after this point because it was
// likely deleted by the complete callback. // likely deleted by the complete callback.
} }
}
uv_work_t* Request() {
return &_request;
}
private: private:
napi_env _env; napi_env _env;
void* _data; void* _data;
uv_work_t _request;
napi_async_execute_callback _execute; napi_async_execute_callback _execute;
napi_async_complete_callback _complete; napi_async_complete_callback _complete;
}; };
@ -3491,12 +3479,7 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
node::Environment* env_ = node::Environment::GetCurrent(env->isolate); w->ScheduleWork();
env_->IncreaseWaitingRequestCounter();
CALL_UV(env, uv_queue_work(event_loop,
w->Request(),
uvimpl::Work::ExecuteCallback,
uvimpl::Work::CompleteCallback));
return napi_clear_last_error(env); return napi_clear_last_error(env);
} }
@ -3507,7 +3490,7 @@ napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
CALL_UV(env, uv_cancel(reinterpret_cast<uv_req_t*>(w->Request()))); CALL_UV(env, w->CancelWork());
return napi_clear_last_error(env); return napi_clear_last_error(env);
} }

View File

@ -4556,7 +4556,7 @@ bool ECDH::IsKeyPairValid() {
} }
class PBKDF2Request : public AsyncWrap { class PBKDF2Request : public AsyncWrap, public ThreadPoolWork {
public: public:
PBKDF2Request(Environment* env, PBKDF2Request(Environment* env,
Local<Object> object, Local<Object> object,
@ -4566,6 +4566,7 @@ class PBKDF2Request : public AsyncWrap {
int keylen, int keylen,
int iteration_count) int iteration_count)
: AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST), : AsyncWrap(env, object, AsyncWrap::PROVIDER_PBKDF2REQUEST),
ThreadPoolWork(env),
digest_(digest), digest_(digest),
success_(false), success_(false),
pass_(std::move(pass)), pass_(std::move(pass)),
@ -4574,21 +4575,14 @@ class PBKDF2Request : public AsyncWrap {
iteration_count_(iteration_count) { iteration_count_(iteration_count) {
} }
uv_work_t* work_req() {
return &work_req_;
}
size_t self_size() const override { return sizeof(*this); } size_t self_size() const override { return sizeof(*this); }
static void Work(uv_work_t* work_req); void DoThreadPoolWork() override;
void Work(); void AfterThreadPoolWork(int status) override;
static void After(uv_work_t* work_req, int status);
void After(Local<Value> (*argv)[2]); void After(Local<Value> (*argv)[2]);
void After();
private: private:
uv_work_t work_req_;
const EVP_MD* digest_; const EVP_MD* digest_;
bool success_; bool success_;
MallocedBuffer<char> pass_; MallocedBuffer<char> pass_;
@ -4598,7 +4592,7 @@ class PBKDF2Request : public AsyncWrap {
}; };
void PBKDF2Request::Work() { void PBKDF2Request::DoThreadPoolWork() {
success_ = success_ =
PKCS5_PBKDF2_HMAC( PKCS5_PBKDF2_HMAC(
pass_.data, pass_.size, pass_.data, pass_.size,
@ -4611,12 +4605,6 @@ void PBKDF2Request::Work() {
} }
void PBKDF2Request::Work(uv_work_t* work_req) {
PBKDF2Request* req = ContainerOf(&PBKDF2Request::work_req_, work_req);
req->Work();
}
void PBKDF2Request::After(Local<Value> (*argv)[2]) { void PBKDF2Request::After(Local<Value> (*argv)[2]) {
if (success_) { if (success_) {
(*argv)[0] = Null(env()->isolate()); (*argv)[0] = Null(env()->isolate());
@ -4629,7 +4617,12 @@ void PBKDF2Request::After(Local<Value> (*argv)[2]) {
} }
void PBKDF2Request::After() { void PBKDF2Request::AfterThreadPoolWork(int status) {
std::unique_ptr<PBKDF2Request> req(this);
if (status == UV_ECANCELED)
return;
CHECK_EQ(status, 0);
HandleScope handle_scope(env()->isolate()); HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context()); Context::Scope context_scope(env()->context());
Local<Value> argv[2]; Local<Value> argv[2];
@ -4638,17 +4631,6 @@ void PBKDF2Request::After() {
} }
void PBKDF2Request::After(uv_work_t* work_req, int status) {
std::unique_ptr<PBKDF2Request> req(
ContainerOf(&PBKDF2Request::work_req_, work_req));
req->env()->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED)
return;
CHECK_EQ(status, 0);
req->After();
}
void PBKDF2(const FunctionCallbackInfo<Value>& args) { void PBKDF2(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args); Environment* env = Environment::GetCurrent(args);
@ -4695,14 +4677,10 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
if (args[5]->IsFunction()) { if (args[5]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust(); obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();
env->IncreaseWaitingRequestCounter(); req.release()->ScheduleWork();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
PBKDF2Request::Work,
PBKDF2Request::After);
} else { } else {
env->PrintSyncTrace(); env->PrintSyncTrace();
req->Work(); req->DoThreadPoolWork();
Local<Value> argv[2]; Local<Value> argv[2];
req->After(&argv); req->After(&argv);
@ -4715,7 +4693,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
// Only instantiate within a valid HandleScope. // Only instantiate within a valid HandleScope.
class RandomBytesRequest : public AsyncWrap { class RandomBytesRequest : public AsyncWrap, public ThreadPoolWork {
public: public:
enum FreeMode { FREE_DATA, DONT_FREE_DATA }; enum FreeMode { FREE_DATA, DONT_FREE_DATA };
@ -4725,16 +4703,13 @@ class RandomBytesRequest : public AsyncWrap {
char* data, char* data,
FreeMode free_mode) FreeMode free_mode)
: AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST), : AsyncWrap(env, object, AsyncWrap::PROVIDER_RANDOMBYTESREQUEST),
ThreadPoolWork(env),
error_(0), error_(0),
size_(size), size_(size),
data_(data), data_(data),
free_mode_(free_mode) { free_mode_(free_mode) {
} }
uv_work_t* work_req() {
return &work_req_;
}
inline size_t size() const { inline size_t size() const {
return size_; return size_;
} }
@ -4772,7 +4747,8 @@ class RandomBytesRequest : public AsyncWrap {
size_t self_size() const override { return sizeof(*this); } size_t self_size() const override { return sizeof(*this); }
uv_work_t work_req_; void DoThreadPoolWork() override;
void AfterThreadPoolWork(int status) override;
private: private:
unsigned long error_; // NOLINT(runtime/int) unsigned long error_; // NOLINT(runtime/int)
@ -4782,21 +4758,17 @@ class RandomBytesRequest : public AsyncWrap {
}; };
void RandomBytesWork(uv_work_t* work_req) { void RandomBytesRequest::DoThreadPoolWork() {
RandomBytesRequest* req =
ContainerOf(&RandomBytesRequest::work_req_, work_req);
// Ensure that OpenSSL's PRNG is properly seeded. // Ensure that OpenSSL's PRNG is properly seeded.
CheckEntropy(); CheckEntropy();
const int r = RAND_bytes(reinterpret_cast<unsigned char*>(req->data()), const int r = RAND_bytes(reinterpret_cast<unsigned char*>(data_), size_);
req->size());
// RAND_bytes() returns 0 on error. // RAND_bytes() returns 0 on error.
if (r == 0) { if (r == 0) {
req->set_error(ERR_get_error()); // NOLINT(runtime/int) set_error(ERR_get_error()); // NOLINT(runtime/int)
} else if (r == -1) { } else if (r == -1) {
req->set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int) set_error(static_cast<unsigned long>(-1)); // NOLINT(runtime/int)
} }
} }
@ -4834,19 +4806,16 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {
} }
void RandomBytesAfter(uv_work_t* work_req, int status) { void RandomBytesRequest::AfterThreadPoolWork(int status) {
std::unique_ptr<RandomBytesRequest> req( std::unique_ptr<RandomBytesRequest> req(this);
ContainerOf(&RandomBytesRequest::work_req_, work_req));
Environment* env = req->env();
env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) if (status == UV_ECANCELED)
return; return;
CHECK_EQ(status, 0); CHECK_EQ(status, 0);
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env->context()); Context::Scope context_scope(env()->context());
Local<Value> argv[2]; Local<Value> argv[2];
RandomBytesCheck(req.get(), &argv); RandomBytesCheck(this, &argv);
req->MakeCallback(env->ondone_string(), arraysize(argv), argv); MakeCallback(env()->ondone_string(), arraysize(argv), argv);
} }
@ -4854,7 +4823,7 @@ void RandomBytesProcessSync(Environment* env,
std::unique_ptr<RandomBytesRequest> req, std::unique_ptr<RandomBytesRequest> req,
Local<Value> (*argv)[2]) { Local<Value> (*argv)[2]) {
env->PrintSyncTrace(); env->PrintSyncTrace();
RandomBytesWork(req->work_req()); req->DoThreadPoolWork();
RandomBytesCheck(req.get(), argv); RandomBytesCheck(req.get(), argv);
if (!(*argv)[0]->IsNull()) if (!(*argv)[0]->IsNull())
@ -4881,11 +4850,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
if (args[1]->IsFunction()) { if (args[1]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust(); obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();
env->IncreaseWaitingRequestCounter(); req.release()->ScheduleWork();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
RandomBytesWork,
RandomBytesAfter);
args.GetReturnValue().Set(obj); args.GetReturnValue().Set(obj);
} else { } else {
Local<Value> argv[2]; Local<Value> argv[2];
@ -4921,11 +4886,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
if (args[3]->IsFunction()) { if (args[3]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust(); obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
env->IncreaseWaitingRequestCounter(); req.release()->ScheduleWork();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
RandomBytesWork,
RandomBytesAfter);
args.GetReturnValue().Set(obj); args.GetReturnValue().Set(obj);
} else { } else {
Local<Value> argv[2]; Local<Value> argv[2];

View File

@ -503,6 +503,41 @@ class InternalCallbackScope {
bool closed_ = false; bool closed_ = false;
}; };
class ThreadPoolWork {
public:
explicit inline ThreadPoolWork(Environment* env) : env_(env) {}
inline void ScheduleWork();
inline int CancelWork();
virtual void DoThreadPoolWork() = 0;
virtual void AfterThreadPoolWork(int status) = 0;
private:
Environment* env_;
uv_work_t work_req_;
};
void ThreadPoolWork::ScheduleWork() {
env_->IncreaseWaitingRequestCounter();
int status = uv_queue_work(
env_->event_loop(),
&work_req_,
[](uv_work_t* req) {
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
self->DoThreadPoolWork();
},
[](uv_work_t* req, int status) {
ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
self->env_->DecreaseWaitingRequestCounter();
self->AfterThreadPoolWork(status);
});
CHECK_EQ(status, 0);
}
int ThreadPoolWork::CancelWork() {
return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_));
}
static inline const char *errno_string(int errorno) { static inline const char *errno_string(int errorno) {
#define ERRNO_CASE(e) case e: return #e; #define ERRNO_CASE(e) case e: return #e;
switch (errorno) { switch (errorno) {

View File

@ -70,10 +70,11 @@ enum node_zlib_mode {
/** /**
* Deflate/Inflate * Deflate/Inflate
*/ */
class ZCtx : public AsyncWrap { class ZCtx : public AsyncWrap, public ThreadPoolWork {
public: public:
ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode) ZCtx(Environment* env, Local<Object> wrap, node_zlib_mode mode)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB), : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_ZLIB),
ThreadPoolWork(env),
dictionary_(nullptr), dictionary_(nullptr),
dictionary_len_(0), dictionary_len_(0),
err_(0), err_(0),
@ -191,9 +192,6 @@ class ZCtx : public AsyncWrap {
CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf))); CHECK(Buffer::IsWithinBounds(out_off, out_len, Buffer::Length(out_buf)));
out = reinterpret_cast<Bytef *>(Buffer::Data(out_buf) + out_off); out = reinterpret_cast<Bytef *>(Buffer::Data(out_buf) + out_off);
// build up the work request
uv_work_t* work_req = &(ctx->work_req_);
ctx->strm_.avail_in = in_len; ctx->strm_.avail_in = in_len;
ctx->strm_.next_in = in; ctx->strm_.next_in = in;
ctx->strm_.avail_out = out_len; ctx->strm_.avail_out = out_len;
@ -203,7 +201,7 @@ class ZCtx : public AsyncWrap {
if (!async) { if (!async) {
// sync version // sync version
env->PrintSyncTrace(); env->PrintSyncTrace();
Process(work_req); ctx->DoThreadPoolWork();
if (CheckError(ctx)) { if (CheckError(ctx)) {
ctx->write_result_[0] = ctx->strm_.avail_out; ctx->write_result_[0] = ctx->strm_.avail_out;
ctx->write_result_[1] = ctx->strm_.avail_in; ctx->write_result_[1] = ctx->strm_.avail_in;
@ -214,18 +212,24 @@ class ZCtx : public AsyncWrap {
} }
// async version // async version
env->IncreaseWaitingRequestCounter(); ctx->ScheduleWork();
uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After);
} }
// TODO(addaleax): Make these methods non-static. It's a significant bunch
// of churn that's better left for a separate PR.
void DoThreadPoolWork() {
Process(this);
}
void AfterThreadPoolWork(int status) {
After(this, status);
}
// thread pool! // thread pool!
// This function may be called multiple times on the uv_work pool // This function may be called multiple times on the uv_work pool
// for a single write() call, until all of the input bytes have // for a single write() call, until all of the input bytes have
// been consumed. // been consumed.
static void Process(uv_work_t* work_req) { static void Process(ZCtx* ctx) {
ZCtx *ctx = ContainerOf(&ZCtx::work_req_, work_req);
const Bytef* next_expected_header_byte = nullptr; const Bytef* next_expected_header_byte = nullptr;
// If the avail_out is left at 0, then it means that it ran out // If the avail_out is left at 0, then it means that it ran out
@ -361,12 +365,10 @@ class ZCtx : public AsyncWrap {
// v8 land! // v8 land!
static void After(uv_work_t* work_req, int status) { static void After(ZCtx* ctx, int status) {
ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req);
Environment* env = ctx->env(); Environment* env = ctx->env();
ctx->write_in_progress_ = false; ctx->write_in_progress_ = false;
env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) { if (status == UV_ECANCELED) {
ctx->Close(); ctx->Close();
return; return;
@ -685,7 +687,6 @@ class ZCtx : public AsyncWrap {
int strategy_; int strategy_;
z_stream strm_; z_stream strm_;
int windowBits_; int windowBits_;
uv_work_t work_req_;
bool write_in_progress_; bool write_in_progress_;
bool pending_close_; bool pending_close_;
unsigned int refs_; unsigned int refs_;