src: add native debugging code to workers
Now that we have better native debugging utilities in core, let’s use them :) PR-URL: https://github.com/nodejs/node/pull/21423 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: Matheus Marchini <matheus@sthima.com>
This commit is contained in:
parent
c403eeb7fd
commit
018d6186a7
@ -803,7 +803,8 @@ void EmitAsyncDestroy(Isolate* isolate, async_context asyncContext) {
|
|||||||
|
|
||||||
std::string AsyncWrap::diagnostic_name() const {
|
std::string AsyncWrap::diagnostic_name() const {
|
||||||
return std::string(provider_names[provider_type()]) +
|
return std::string(provider_names[provider_type()]) +
|
||||||
" (" + std::to_string(static_cast<int64_t>(async_id_)) + ")";
|
" (" + std::to_string(env()->thread_id()) + ":" +
|
||||||
|
std::to_string(static_cast<int64_t>(async_id_)) + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace node
|
} // namespace node
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#include "debug_utils.h"
|
||||||
#include "node_messaging.h"
|
#include "node_messaging.h"
|
||||||
#include "node_internals.h"
|
#include "node_internals.h"
|
||||||
#include "node_buffer.h"
|
#include "node_buffer.h"
|
||||||
@ -305,8 +306,10 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
|
|||||||
Mutex::ScopedLock lock(mutex_);
|
Mutex::ScopedLock lock(mutex_);
|
||||||
incoming_messages_.emplace_back(std::move(message));
|
incoming_messages_.emplace_back(std::move(message));
|
||||||
|
|
||||||
if (owner_ != nullptr)
|
if (owner_ != nullptr) {
|
||||||
|
Debug(owner_, "Adding message to incoming queue");
|
||||||
owner_->TriggerAsync();
|
owner_->TriggerAsync();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MessagePortData::IsSiblingClosed() const {
|
bool MessagePortData::IsSiblingClosed() const {
|
||||||
@ -380,6 +383,8 @@ MessagePort::MessagePort(Environment* env,
|
|||||||
Local<Function> init = fn.As<Function>();
|
Local<Function> init = fn.As<Function>();
|
||||||
USE(init->Call(context, wrap, 0, nullptr));
|
USE(init->Call(context, wrap, 0, nullptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Debug(this, "Created message port");
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessagePort::AddToIncomingQueue(Message&& message) {
|
void MessagePort::AddToIncomingQueue(Message&& message) {
|
||||||
@ -396,6 +401,8 @@ void MessagePort::TriggerAsync() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
|
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
|
||||||
|
Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));
|
||||||
|
|
||||||
if (data_) {
|
if (data_) {
|
||||||
// Wrap this call with accessing the mutex, so that TriggerAsync()
|
// Wrap this call with accessing the mutex, so that TriggerAsync()
|
||||||
// can check IsHandleClosing() without race conditions.
|
// can check IsHandleClosing() without race conditions.
|
||||||
@ -447,6 +454,7 @@ MessagePort* MessagePort::New(
|
|||||||
}
|
}
|
||||||
|
|
||||||
void MessagePort::OnMessage() {
|
void MessagePort::OnMessage() {
|
||||||
|
Debug(this, "Running MessagePort::OnMessage()");
|
||||||
HandleScope handle_scope(env()->isolate());
|
HandleScope handle_scope(env()->isolate());
|
||||||
Local<Context> context = object(env()->isolate())->CreationContext();
|
Local<Context> context = object(env()->isolate())->CreationContext();
|
||||||
|
|
||||||
@ -461,11 +469,15 @@ void MessagePort::OnMessage() {
|
|||||||
Mutex::ScopedLock lock(data_->mutex_);
|
Mutex::ScopedLock lock(data_->mutex_);
|
||||||
|
|
||||||
if (stop_event_loop_) {
|
if (stop_event_loop_) {
|
||||||
|
Debug(this, "MessagePort stops loop as requested");
|
||||||
CHECK(!data_->receiving_messages_);
|
CHECK(!data_->receiving_messages_);
|
||||||
uv_stop(env()->event_loop());
|
uv_stop(env()->event_loop());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Debug(this, "MessagePort has message, receiving = %d",
|
||||||
|
static_cast<int>(data_->receiving_messages_));
|
||||||
|
|
||||||
if (!data_->receiving_messages_)
|
if (!data_->receiving_messages_)
|
||||||
break;
|
break;
|
||||||
if (data_->incoming_messages_.empty())
|
if (data_->incoming_messages_.empty())
|
||||||
@ -475,6 +487,7 @@ void MessagePort::OnMessage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!env()->can_call_into_js()) {
|
if (!env()->can_call_into_js()) {
|
||||||
|
Debug(this, "MessagePort drains queue because !can_call_into_js()");
|
||||||
// In this case there is nothing to do but to drain the current queue.
|
// In this case there is nothing to do but to drain the current queue.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -508,6 +521,7 @@ bool MessagePort::IsSiblingClosed() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void MessagePort::OnClose() {
|
void MessagePort::OnClose() {
|
||||||
|
Debug(this, "MessagePort::OnClose()");
|
||||||
if (data_) {
|
if (data_) {
|
||||||
data_->owner_ = nullptr;
|
data_->owner_ = nullptr;
|
||||||
data_->Disentangle();
|
data_->Disentangle();
|
||||||
@ -557,6 +571,7 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
|
|||||||
|
|
||||||
void MessagePort::Start() {
|
void MessagePort::Start() {
|
||||||
Mutex::ScopedLock lock(data_->mutex_);
|
Mutex::ScopedLock lock(data_->mutex_);
|
||||||
|
Debug(this, "Start receiving messages");
|
||||||
data_->receiving_messages_ = true;
|
data_->receiving_messages_ = true;
|
||||||
if (!data_->incoming_messages_.empty())
|
if (!data_->incoming_messages_.empty())
|
||||||
TriggerAsync();
|
TriggerAsync();
|
||||||
@ -564,6 +579,7 @@ void MessagePort::Start() {
|
|||||||
|
|
||||||
void MessagePort::Stop() {
|
void MessagePort::Stop() {
|
||||||
Mutex::ScopedLock lock(data_->mutex_);
|
Mutex::ScopedLock lock(data_->mutex_);
|
||||||
|
Debug(this, "Stop receiving messages");
|
||||||
data_->receiving_messages_ = false;
|
data_->receiving_messages_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,6 +588,7 @@ void MessagePort::StopEventLoop() {
|
|||||||
data_->receiving_messages_ = false;
|
data_->receiving_messages_ = false;
|
||||||
stop_event_loop_ = true;
|
stop_event_loop_ = true;
|
||||||
|
|
||||||
|
Debug(this, "Received StopEventLoop request");
|
||||||
TriggerAsync();
|
TriggerAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,8 @@ Worker::Worker(Environment* env, Local<Object> wrap)
|
|||||||
Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
|
Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
|
||||||
thread_id_ = next_thread_id++;
|
thread_id_ = next_thread_id++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Debug(this, "Creating worker with id %llu", thread_id_);
|
||||||
wrap->Set(env->context(),
|
wrap->Set(env->context(),
|
||||||
env->thread_id_string(),
|
env->thread_id_string(),
|
||||||
Number::New(env->isolate(),
|
Number::New(env->isolate(),
|
||||||
@ -107,6 +109,8 @@ Worker::Worker(Environment* env, Local<Object> wrap)
|
|||||||
|
|
||||||
// The new isolate won't be bothered on this thread again.
|
// The new isolate won't be bothered on this thread again.
|
||||||
isolate_->DiscardThreadSpecificMetadata();
|
isolate_->DiscardThreadSpecificMetadata();
|
||||||
|
|
||||||
|
Debug(this, "Set up worker with id %llu", thread_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Worker::is_stopped() const {
|
bool Worker::is_stopped() const {
|
||||||
@ -123,6 +127,7 @@ void Worker::Run() {
|
|||||||
MultiIsolatePlatform* platform = isolate_data_->platform();
|
MultiIsolatePlatform* platform = isolate_data_->platform();
|
||||||
CHECK_NE(platform, nullptr);
|
CHECK_NE(platform, nullptr);
|
||||||
|
|
||||||
|
Debug(this, "Starting worker with id %llu", thread_id_);
|
||||||
{
|
{
|
||||||
Locker locker(isolate_);
|
Locker locker(isolate_);
|
||||||
Isolate::Scope isolate_scope(isolate_);
|
Isolate::Scope isolate_scope(isolate_);
|
||||||
@ -143,6 +148,8 @@ void Worker::Run() {
|
|||||||
// within it.
|
// within it.
|
||||||
if (child_port_ != nullptr)
|
if (child_port_ != nullptr)
|
||||||
env_->set_message_port(child_port_->object(isolate_));
|
env_->set_message_port(child_port_->object(isolate_));
|
||||||
|
|
||||||
|
Debug(this, "Created message port for worker %llu", thread_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!is_stopped()) {
|
if (!is_stopped()) {
|
||||||
@ -152,6 +159,8 @@ void Worker::Run() {
|
|||||||
// This loads the Node bootstrapping code.
|
// This loads the Node bootstrapping code.
|
||||||
LoadEnvironment(env_.get());
|
LoadEnvironment(env_.get());
|
||||||
env_->async_hooks()->pop_async_id(1);
|
env_->async_hooks()->pop_async_id(1);
|
||||||
|
|
||||||
|
Debug(this, "Loaded environment for worker %llu", thread_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -189,6 +198,9 @@ void Worker::Run() {
|
|||||||
Mutex::ScopedLock lock(mutex_);
|
Mutex::ScopedLock lock(mutex_);
|
||||||
if (exit_code_ == 0 && !stopped)
|
if (exit_code_ == 0 && !stopped)
|
||||||
exit_code_ = exit_code;
|
exit_code_ = exit_code;
|
||||||
|
|
||||||
|
Debug(this, "Exiting thread for worker %llu with exit code %d",
|
||||||
|
thread_id_, exit_code_);
|
||||||
}
|
}
|
||||||
|
|
||||||
env_->set_can_call_into_js(false);
|
env_->set_can_call_into_js(false);
|
||||||
@ -237,12 +249,15 @@ void Worker::Run() {
|
|||||||
scheduled_on_thread_stopped_ = true;
|
scheduled_on_thread_stopped_ = true;
|
||||||
uv_async_send(thread_exit_async_.get());
|
uv_async_send(thread_exit_async_.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Debug(this, "Worker %llu thread stops", thread_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Worker::DisposeIsolate() {
|
void Worker::DisposeIsolate() {
|
||||||
if (isolate_ == nullptr)
|
if (isolate_ == nullptr)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
Debug(this, "Worker %llu dispose isolate", thread_id_);
|
||||||
CHECK(isolate_data_);
|
CHECK(isolate_data_);
|
||||||
MultiIsolatePlatform* platform = isolate_data_->platform();
|
MultiIsolatePlatform* platform = isolate_data_->platform();
|
||||||
platform->CancelPendingDelayedTasks(isolate_);
|
platform->CancelPendingDelayedTasks(isolate_);
|
||||||
@ -275,6 +290,8 @@ void Worker::OnThreadStopped() {
|
|||||||
Mutex::ScopedLock lock(mutex_);
|
Mutex::ScopedLock lock(mutex_);
|
||||||
scheduled_on_thread_stopped_ = false;
|
scheduled_on_thread_stopped_ = false;
|
||||||
|
|
||||||
|
Debug(this, "Worker %llu thread stopped", thread_id_);
|
||||||
|
|
||||||
{
|
{
|
||||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||||
CHECK(stopped_);
|
CHECK(stopped_);
|
||||||
@ -318,6 +335,8 @@ Worker::~Worker() {
|
|||||||
// This has most likely already happened within the worker thread -- this
|
// This has most likely already happened within the worker thread -- this
|
||||||
// is just in case Worker creation failed early.
|
// is just in case Worker creation failed early.
|
||||||
DisposeIsolate();
|
DisposeIsolate();
|
||||||
|
|
||||||
|
Debug(this, "Worker %llu destroyed", thread_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
void Worker::New(const FunctionCallbackInfo<Value>& args) {
|
||||||
@ -371,6 +390,9 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
|
|||||||
void Worker::Exit(int code) {
|
void Worker::Exit(int code) {
|
||||||
Mutex::ScopedLock lock(mutex_);
|
Mutex::ScopedLock lock(mutex_);
|
||||||
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
Mutex::ScopedLock stopped_lock(stopped_mutex_);
|
||||||
|
|
||||||
|
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
|
||||||
|
|
||||||
if (!stopped_) {
|
if (!stopped_) {
|
||||||
CHECK_NE(env_, nullptr);
|
CHECK_NE(env_, nullptr);
|
||||||
stopped_ = true;
|
stopped_ = true;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user