worker: refactor thread life cycle management
The current mechanism of uses two async handles, one owned by the creator of the worker thread to terminate a running worker, and another one employed by the worker to interrupt its creator on its natural termination. The force termination piggybacks on the message- passing mechanism to inform the worker to quiesce. Also there are few flags that represent the other thread's state / request state because certain code path is shared by multiple control flows, and there are certain code path where the async handles may not have come to life. Refactor into an AsyncRequest abstraction that exposes routines to install a handle as well as to save a state. PR-URL: https://github.com/nodejs/node/pull/26099 Refs: https://github.com/nodejs/node/pull/21283 Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
584305841d
commit
d14cba401a
@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
|
||||
// Get the head of the message queue.
|
||||
Mutex::ScopedLock lock(data_->mutex_);
|
||||
|
||||
if (stop_event_loop_) {
|
||||
Debug(this, "MessagePort stops loop as requested");
|
||||
CHECK(!data_->receiving_messages_);
|
||||
uv_stop(env()->event_loop());
|
||||
break;
|
||||
}
|
||||
|
||||
Debug(this, "MessagePort has message, receiving = %d",
|
||||
static_cast<int>(data_->receiving_messages_));
|
||||
|
||||
@ -740,15 +733,6 @@ void MessagePort::Stop() {
|
||||
data_->receiving_messages_ = false;
|
||||
}
|
||||
|
||||
void MessagePort::StopEventLoop() {
|
||||
Mutex::ScopedLock lock(data_->mutex_);
|
||||
data_->receiving_messages_ = false;
|
||||
stop_event_loop_ = true;
|
||||
|
||||
Debug(this, "Received StopEventLoop request");
|
||||
TriggerAsync();
|
||||
}
|
||||
|
||||
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
MessagePort* port;
|
||||
|
@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
|
||||
void Start();
|
||||
// Stop processing messages on this port as a receiving end.
|
||||
void Stop();
|
||||
// Stop processing messages on this port as a receiving end,
|
||||
// and stop the event loop that this port is associated with.
|
||||
void StopEventLoop();
|
||||
|
||||
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
|
||||
inline uv_async_t* async();
|
||||
|
||||
std::unique_ptr<MessagePortData> data_ = nullptr;
|
||||
bool stop_event_loop_ = false;
|
||||
|
||||
friend class MessagePortData;
|
||||
};
|
||||
|
@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
env_ = env;
|
||||
async_ = new uv_async_t;
|
||||
if (data != nullptr) async_->data = data;
|
||||
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
|
||||
}
|
||||
|
||||
void AsyncRequest::Uninstall() {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
if (async_ != nullptr)
|
||||
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
|
||||
}
|
||||
|
||||
void AsyncRequest::Stop() {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
stop_ = true;
|
||||
if (async_ != nullptr) uv_async_send(async_);
|
||||
}
|
||||
|
||||
void AsyncRequest::SetStopped(bool flag) {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
stop_ = flag;
|
||||
}
|
||||
|
||||
bool AsyncRequest::IsStopped() const {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
return stop_;
|
||||
}
|
||||
|
||||
uv_async_t* AsyncRequest::GetHandle() {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
return async_;
|
||||
}
|
||||
|
||||
void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
|
||||
}
|
||||
|
||||
Worker::Worker(Environment* env,
|
||||
Local<Object> wrap,
|
||||
const std::string& url,
|
||||
@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
|
||||
}
|
||||
|
||||
bool Worker::is_stopped() const {
|
||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||
return stopped_;
|
||||
return thread_stopper_.IsStopped();
|
||||
}
|
||||
|
||||
// This class contains data that is only relevant to the child thread itself,
|
||||
@ -207,6 +246,8 @@ void Worker::Run() {
|
||||
Context::Scope context_scope(env_->context());
|
||||
if (child_port != nullptr)
|
||||
child_port->Close();
|
||||
thread_stopper_.Uninstall();
|
||||
thread_stopper_.SetStopped(true);
|
||||
env_->stop_sub_worker_contexts();
|
||||
env_->RunCleanup();
|
||||
RunAtExit(env_.get());
|
||||
@ -215,11 +256,6 @@ void Worker::Run() {
|
||||
WaitForWorkerInspectorToStop(env_.get());
|
||||
#endif
|
||||
|
||||
{
|
||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||
stopped_ = true;
|
||||
}
|
||||
|
||||
// This call needs to be made while the `Environment` is still alive
|
||||
// because we assume that it is available for async tracking in the
|
||||
// NodePlatform implementation.
|
||||
@ -227,11 +263,12 @@ void Worker::Run() {
|
||||
}
|
||||
});
|
||||
|
||||
if (thread_stopper_.IsStopped()) return;
|
||||
{
|
||||
HandleScope handle_scope(isolate_);
|
||||
Local<Context> context = NewContext(isolate_);
|
||||
if (is_stopped()) return;
|
||||
|
||||
if (thread_stopper_.IsStopped()) return;
|
||||
CHECK(!context.IsEmpty());
|
||||
Context::Scope context_scope(context);
|
||||
{
|
||||
@ -253,6 +290,14 @@ void Worker::Run() {
|
||||
Debug(this, "Created Environment for worker with id %llu", thread_id_);
|
||||
|
||||
if (is_stopped()) return;
|
||||
thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
|
||||
Environment* env_ = static_cast<Environment*>(handle->data);
|
||||
uv_stop(env_->event_loop());
|
||||
});
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
|
||||
|
||||
Debug(this, "Created Environment for worker with id %llu", thread_id_);
|
||||
if (thread_stopper_.IsStopped()) return;
|
||||
{
|
||||
HandleScope handle_scope(isolate_);
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
@ -268,7 +313,7 @@ void Worker::Run() {
|
||||
Debug(this, "Created message port for worker %llu", thread_id_);
|
||||
}
|
||||
|
||||
if (is_stopped()) return;
|
||||
if (thread_stopper_.IsStopped()) return;
|
||||
{
|
||||
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
|
||||
StartWorkerInspector(env_.get(),
|
||||
@ -289,22 +334,21 @@ void Worker::Run() {
|
||||
Debug(this, "Loaded environment for worker %llu", thread_id_);
|
||||
}
|
||||
|
||||
if (is_stopped()) return;
|
||||
if (thread_stopper_.IsStopped()) return;
|
||||
{
|
||||
SealHandleScope seal(isolate_);
|
||||
bool more;
|
||||
env_->performance_state()->Mark(
|
||||
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
|
||||
do {
|
||||
if (is_stopped()) break;
|
||||
if (thread_stopper_.IsStopped()) break;
|
||||
uv_run(&data.loop_, UV_RUN_DEFAULT);
|
||||
if (is_stopped()) break;
|
||||
if (thread_stopper_.IsStopped()) break;
|
||||
|
||||
platform_->DrainTasks(isolate_);
|
||||
|
||||
more = uv_loop_alive(&data.loop_);
|
||||
if (more && !is_stopped())
|
||||
continue;
|
||||
if (more && !thread_stopper_.IsStopped()) continue;
|
||||
|
||||
EmitBeforeExit(env_.get());
|
||||
|
||||
@ -319,7 +363,7 @@ void Worker::Run() {
|
||||
|
||||
{
|
||||
int exit_code;
|
||||
bool stopped = is_stopped();
|
||||
bool stopped = thread_stopper_.IsStopped();
|
||||
if (!stopped)
|
||||
exit_code = EmitExit(env_.get());
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
@ -341,34 +385,11 @@ void Worker::JoinThread() {
|
||||
thread_joined_ = true;
|
||||
|
||||
env()->remove_sub_worker_context(this);
|
||||
|
||||
if (thread_exit_async_) {
|
||||
env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
|
||||
delete async;
|
||||
});
|
||||
|
||||
if (scheduled_on_thread_stopped_)
|
||||
OnThreadStopped();
|
||||
}
|
||||
OnThreadStopped();
|
||||
on_thread_finished_.Uninstall();
|
||||
}
|
||||
|
||||
void Worker::OnThreadStopped() {
|
||||
{
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
scheduled_on_thread_stopped_ = false;
|
||||
|
||||
Debug(this, "Worker %llu thread stopped", thread_id_);
|
||||
|
||||
{
|
||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||
CHECK(stopped_);
|
||||
}
|
||||
|
||||
parent_port_ = nullptr;
|
||||
}
|
||||
|
||||
JoinThread();
|
||||
|
||||
{
|
||||
HandleScope handle_scope(env()->isolate());
|
||||
Context::Scope context_scope(env()->context());
|
||||
@ -391,7 +412,7 @@ Worker::~Worker() {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
JoinThread();
|
||||
|
||||
CHECK(stopped_);
|
||||
CHECK(thread_stopper_.IsStopped());
|
||||
CHECK(thread_joined_);
|
||||
|
||||
// This has most likely already happened within the worker thread -- this
|
||||
@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
|
||||
Mutex::ScopedLock lock(w->mutex_);
|
||||
|
||||
w->env()->add_sub_worker_context(w);
|
||||
w->stopped_ = false;
|
||||
w->thread_joined_ = false;
|
||||
w->thread_stopper_.SetStopped(false);
|
||||
|
||||
w->thread_exit_async_.reset(new uv_async_t);
|
||||
w->thread_exit_async_->data = w;
|
||||
CHECK_EQ(uv_async_init(w->env()->event_loop(),
|
||||
w->thread_exit_async_.get(),
|
||||
[](uv_async_t* handle) {
|
||||
static_cast<Worker*>(handle->data)->OnThreadStopped();
|
||||
}), 0);
|
||||
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
|
||||
Worker* w_ = static_cast<Worker*>(handle->data);
|
||||
CHECK(w_->thread_stopper_.IsStopped());
|
||||
w_->parent_port_ = nullptr;
|
||||
w_->JoinThread();
|
||||
});
|
||||
|
||||
uv_thread_options_t thread_options;
|
||||
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
|
||||
@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
|
||||
w->Run();
|
||||
|
||||
Mutex::ScopedLock lock(w->mutex_);
|
||||
CHECK(w->thread_exit_async_);
|
||||
w->scheduled_on_thread_stopped_ = true;
|
||||
uv_async_send(w->thread_exit_async_.get());
|
||||
w->on_thread_finished_.Stop();
|
||||
}, static_cast<void*>(w)), 0);
|
||||
}
|
||||
|
||||
@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
|
||||
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
if (w->thread_exit_async_)
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
|
||||
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
|
||||
}
|
||||
|
||||
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
|
||||
Worker* w;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
|
||||
if (w->thread_exit_async_)
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
|
||||
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
|
||||
}
|
||||
|
||||
void Worker::Exit(int code) {
|
||||
Mutex::ScopedLock lock(mutex_);
|
||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||
|
||||
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
|
||||
|
||||
if (!stopped_) {
|
||||
stopped_ = true;
|
||||
if (!thread_stopper_.IsStopped()) {
|
||||
exit_code_ = code;
|
||||
if (child_port_ != nullptr)
|
||||
child_port_->StopEventLoop();
|
||||
Debug(this, "Received StopEventLoop request");
|
||||
thread_stopper_.Stop();
|
||||
if (isolate_ != nullptr)
|
||||
isolate_->TerminateExecution();
|
||||
}
|
||||
|
@ -3,14 +3,35 @@
|
||||
|
||||
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#include "node_messaging.h"
|
||||
#include <unordered_map>
|
||||
#include "node_messaging.h"
|
||||
#include "uv.h"
|
||||
|
||||
namespace node {
|
||||
namespace worker {
|
||||
|
||||
class WorkerThreadData;
|
||||
|
||||
class AsyncRequest : public MemoryRetainer {
|
||||
public:
|
||||
AsyncRequest() {}
|
||||
void Install(Environment* env, void* data, uv_async_cb target);
|
||||
void Uninstall();
|
||||
void Stop();
|
||||
void SetStopped(bool flag);
|
||||
bool IsStopped() const;
|
||||
uv_async_t* GetHandle();
|
||||
void MemoryInfo(MemoryTracker* tracker) const override;
|
||||
SET_MEMORY_INFO_NAME(AsyncRequest)
|
||||
SET_SELF_SIZE(AsyncRequest)
|
||||
|
||||
private:
|
||||
Environment* env_;
|
||||
uv_async_t* async_ = nullptr;
|
||||
mutable Mutex mutex_;
|
||||
bool stop_ = true;
|
||||
};
|
||||
|
||||
// A worker thread, as represented in its parent thread.
|
||||
class Worker : public AsyncWrap {
|
||||
public:
|
||||
@ -31,11 +52,9 @@ class Worker : public AsyncWrap {
|
||||
void JoinThread();
|
||||
|
||||
void MemoryInfo(MemoryTracker* tracker) const override {
|
||||
tracker->TrackFieldWithSize(
|
||||
"isolate_data", sizeof(IsolateData), "IsolateData");
|
||||
tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
|
||||
tracker->TrackField("thread_exit_async", *thread_exit_async_);
|
||||
tracker->TrackField("parent_port", parent_port_);
|
||||
tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
|
||||
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
|
||||
}
|
||||
|
||||
SET_MEMORY_INFO_NAME(Worker)
|
||||
@ -67,16 +86,6 @@ class Worker : public AsyncWrap {
|
||||
// This mutex protects access to all variables listed below it.
|
||||
mutable Mutex mutex_;
|
||||
|
||||
// Currently only used for telling the parent thread that the child
|
||||
// thread exited.
|
||||
std::unique_ptr<uv_async_t> thread_exit_async_;
|
||||
bool scheduled_on_thread_stopped_ = false;
|
||||
|
||||
// This mutex only protects stopped_. If both locks are acquired, this needs
|
||||
// to be the latter one.
|
||||
mutable Mutex stopped_mutex_;
|
||||
bool stopped_ = true;
|
||||
|
||||
bool thread_joined_ = true;
|
||||
int exit_code_ = 0;
|
||||
uint64_t thread_id_ = -1;
|
||||
@ -96,6 +105,9 @@ class Worker : public AsyncWrap {
|
||||
// instance refers to it via its [kPort] property.
|
||||
MessagePort* parent_port_ = nullptr;
|
||||
|
||||
AsyncRequest thread_stopper_;
|
||||
AsyncRequest on_thread_finished_;
|
||||
|
||||
friend class WorkerThreadData;
|
||||
};
|
||||
|
||||
|
@ -9,8 +9,8 @@ const worker = new Worker('setInterval(() => {}, 100);', { eval: true });
|
||||
validateSnapshotNodes('Node / Worker', [
|
||||
{
|
||||
children: [
|
||||
{ node_name: 'Node / uv_async_t', edge_name: 'thread_exit_async' },
|
||||
{ node_name: 'Node / Environment', edge_name: 'env' },
|
||||
{ node_name: 'Node / AsyncRequest', edge_name: 'thread_stopper_' },
|
||||
{ node_name: 'Node / AsyncRequest', edge_name: 'on_thread_finished_' },
|
||||
{ node_name: 'Node / MessagePort', edge_name: 'parent_port' },
|
||||
{ node_name: 'Worker', edge_name: 'wrapped' }
|
||||
]
|
||||
|
Loading…
x
Reference in New Issue
Block a user