worker: set up child Isolate inside Worker thread

Refs: https://github.com/nodejs/node/issues/24016

PR-URL: https://github.com/nodejs/node/pull/26011
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
This commit is contained in:
Anna Henningsen 2019-02-08 19:23:09 +01:00
parent 58ba8bfc46
commit 5bc6e493c0
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
4 changed files with 203 additions and 174 deletions

View File

@ -885,12 +885,14 @@ bool Agent::IsActive() {
return io_ != nullptr || client_->IsActive();
}
void Agent::AddWorkerInspector(int thread_id,
const std::string& url,
Agent* agent) {
CHECK_NOT_NULL(client_);
agent->parent_handle_ =
client_->getWorkerManager()->NewParentHandle(thread_id, url);
void Agent::SetParentHandle(
std::unique_ptr<ParentInspectorHandle> parent_handle) {
parent_handle_ = std::move(parent_handle);
}
std::unique_ptr<ParentInspectorHandle> Agent::GetParentHandle(
int thread_id, const std::string& url) {
return client_->getWorkerManager()->NewParentHandle(thread_id, url);
}
void Agent::WaitForConnect() {

View File

@ -85,7 +85,9 @@ class Agent {
void EnableAsyncHook();
void DisableAsyncHook();
void AddWorkerInspector(int thread_id, const std::string& url, Agent* agent);
void SetParentHandle(std::unique_ptr<ParentInspectorHandle> parent_handle);
std::unique_ptr<ParentInspectorHandle> GetParentHandle(
int thread_id, const std::string& url);
// Called to create inspector sessions that can be used from the main thread.
// The inspector responds by using the delegate to send messages back.

View File

@ -8,6 +8,10 @@
#include "async_wrap.h"
#include "async_wrap-inl.h"
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
#include "inspector/worker_inspector.h" // ParentInspectorHandle
#endif
#include <string>
#include <vector>
@ -35,34 +39,21 @@ namespace worker {
namespace {
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
void StartWorkerInspector(Environment* child, const std::string& url) {
void StartWorkerInspector(
Environment* child,
std::unique_ptr<inspector::ParentInspectorHandle> parent_handle,
const std::string& url) {
child->inspector_agent()->SetParentHandle(std::move(parent_handle));
child->inspector_agent()->Start(url,
child->options()->debug_options(),
child->inspector_host_port(),
false);
}
void AddWorkerInspector(Environment* parent,
Environment* child,
int id,
const std::string& url) {
parent->inspector_agent()->AddWorkerInspector(id, url,
child->inspector_agent());
}
void WaitForWorkerInspectorToStop(Environment* child) {
child->inspector_agent()->WaitForDisconnect();
child->inspector_agent()->Stop();
}
#else
// No-ops
void StartWorkerInspector(Environment* child, const std::string& url) {}
void AddWorkerInspector(Environment* parent,
Environment* child,
int id,
const std::string& url) {}
void WaitForWorkerInspectorToStop(Environment* child) {}
#endif
} // anonymous namespace
@ -71,9 +62,13 @@ Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
std::shared_ptr<PerIsolateOptions> per_isolate_opts)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), url_(url),
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
url_(url),
per_isolate_opts_(per_isolate_opts),
platform_(env->isolate_data()->platform()),
profiler_idle_notifier_started_(env->profiler_idle_notifier_started()),
thread_id_(Environment::AllocateThreadId()) {
Debug(this, "Creating new worker instance at %p", static_cast<void*>(this));
Debug(this, "Creating new worker instance with thread id %llu", thread_id_);
// Set up everything that needs to be set up in the parent environment.
parent_port_ = MessagePort::New(env, env->context());
@ -89,57 +84,17 @@ Worker::Worker(Environment* env,
env->message_port_string(),
parent_port_->object()).FromJust();
array_buffer_allocator_.reset(CreateArrayBufferAllocator());
CHECK_EQ(uv_loop_init(&loop_), 0);
isolate_ = NewIsolate(array_buffer_allocator_.get(), &loop_);
CHECK_NOT_NULL(isolate_);
{
// Enter an environment capable of executing code in the child Isolate
// (and only in it).
Locker locker(isolate_);
Isolate::Scope isolate_scope(isolate_);
HandleScope handle_scope(isolate_);
isolate_data_.reset(CreateIsolateData(isolate_,
&loop_,
env->isolate_data()->platform(),
array_buffer_allocator_.get()));
if (per_isolate_opts != nullptr) {
isolate_data_->set_options(per_isolate_opts);
}
CHECK(isolate_data_);
Local<Context> context = NewContext(isolate_);
Context::Scope context_scope(context);
// TODO(addaleax): Use CreateEnvironment(), or generally another public API.
env_.reset(new Environment(
isolate_data_.get(), context, Flags::kNoFlags, thread_id_));
CHECK_NOT_NULL(env_);
env_->set_abort_on_uncaught_exception(false);
env_->set_worker_context(this);
env_->Start(env->profiler_idle_notifier_started());
env_->ProcessCliArgs(std::vector<std::string>{},
std::vector<std::string>{});
// Done while on the parent thread
AddWorkerInspector(env, env_.get(), thread_id_, url_);
}
// The new isolate won't be bothered on this thread again.
isolate_->DiscardThreadSpecificMetadata();
wrap->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
object()->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
.FromJust();
Debug(this,
"Set up worker at %p with id %llu",
static_cast<void*>(this),
thread_id_);
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
inspector_parent_handle_ =
env->inspector_agent()->GetParentHandle(thread_id_, url);
#endif
Debug(this, "Preparation for worker %llu finished", thread_id_);
}
bool Worker::is_stopped() const {
@ -147,14 +102,79 @@ bool Worker::is_stopped() const {
return stopped_;
}
// This class contains data that is only relevant to the child thread itself,
// and only while it is running.
// (Eventually, the Environment instance should probably also be moved here.)
class WorkerThreadData {
public:
explicit WorkerThreadData(Worker* w)
: w_(w),
array_buffer_allocator_(CreateArrayBufferAllocator()) {
CHECK_EQ(uv_loop_init(&loop_), 0);
Isolate* isolate = NewIsolate(array_buffer_allocator_.get(), &loop_);
CHECK_NOT_NULL(isolate);
{
Locker locker(isolate);
Isolate::Scope isolate_scope(isolate);
HandleScope handle_scope(isolate);
isolate_data_.reset(CreateIsolateData(isolate,
&loop_,
w_->platform_,
array_buffer_allocator_.get()));
CHECK(isolate_data_);
if (w_->per_isolate_opts_)
isolate_data_->set_options(std::move(w_->per_isolate_opts_));
}
Mutex::ScopedLock lock(w_->mutex_);
w_->isolate_ = isolate;
}
~WorkerThreadData() {
Debug(w_, "Worker %llu dispose isolate", w_->thread_id_);
Isolate* isolate;
{
Mutex::ScopedLock lock(w_->mutex_);
isolate = w_->isolate_;
w_->isolate_ = nullptr;
}
w_->platform_->CancelPendingDelayedTasks(isolate);
isolate_data_.reset();
w_->platform_->UnregisterIsolate(isolate);
isolate->Dispose();
// Need to run the loop one more time to close the platform's uv_async_t
uv_run(&loop_, UV_RUN_ONCE);
CheckedUvLoopClose(&loop_);
}
private:
Worker* const w_;
uv_loop_t loop_;
DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
array_buffer_allocator_;
DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
friend class Worker;
};
void Worker::Run() {
std::string name = "WorkerThread ";
name += std::to_string(thread_id_);
TRACE_EVENT_METADATA1(
"__metadata", "thread_name", "name",
TRACE_STR_COPY(name.c_str()));
MultiIsolatePlatform* platform = isolate_data_->platform();
CHECK_NOT_NULL(platform);
CHECK_NOT_NULL(platform_);
Debug(this, "Creating isolate for worker with id %llu", thread_id_);
WorkerThreadData data(this);
Debug(this, "Starting worker with id %llu", thread_id_);
{
@ -163,10 +183,73 @@ void Worker::Run() {
SealHandleScope outer_seal(isolate_);
bool inspector_started = false;
{
Context::Scope context_scope(env_->context());
HandleScope handle_scope(isolate_);
DeleteFnPtr<Environment, FreeEnvironment> env_;
OnScopeLeave cleanup_env([&]() {
if (!env_) return;
env_->set_can_call_into_js(false);
Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
// Grab the parent-to-child channel and render is unusable.
MessagePort* child_port;
{
Mutex::ScopedLock lock(mutex_);
child_port = child_port_;
child_port_ = nullptr;
}
{
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
if (inspector_started)
WaitForWorkerInspectorToStop(env_.get());
#endif
{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
stopped_ = true;
}
env_->RunCleanup();
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
platform_->DrainTasks(isolate_);
}
});
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
if (is_stopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
// TODO(addaleax): Use CreateEnvironment(), or generally another
// public API.
env_.reset(new Environment(data.isolate_data_.get(),
context,
Environment::kNoFlags,
thread_id_));
CHECK_NOT_NULL(env_);
env_->set_abort_on_uncaught_exception(false);
env_->set_worker_context(this);
env_->Start(profiler_idle_notifier_started_);
env_->ProcessCliArgs(std::vector<std::string>{},
std::vector<std::string>{});
}
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@ -182,8 +265,13 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
if (!is_stopped()) {
StartWorkerInspector(env_.get(), url_);
if (is_stopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
std::move(inspector_parent_handle_),
url_);
#endif
inspector_started = true;
HandleScope handle_scope(isolate_);
@ -198,6 +286,7 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
if (is_stopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
@ -205,12 +294,12 @@ void Worker::Run() {
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
if (is_stopped()) break;
uv_run(&loop_, UV_RUN_DEFAULT);
uv_run(&data.loop_, UV_RUN_DEFAULT);
if (is_stopped()) break;
platform->DrainTasks(isolate_);
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&loop_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped())
continue;
@ -218,7 +307,7 @@ void Worker::Run() {
// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(&loop_);
more = uv_loop_alive(&data.loop_);
} while (more == true);
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
@ -237,79 +326,11 @@ void Worker::Run() {
Debug(this, "Exiting thread for worker %llu with exit code %d",
thread_id_, exit_code_);
}
env_->set_can_call_into_js(false);
Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
// Grab the parent-to-child channel and render is unusable.
MessagePort* child_port;
{
Mutex::ScopedLock lock(mutex_);
child_port = child_port_;
child_port_ = nullptr;
}
{
Context::Scope context_scope(env_->context());
child_port->Close();
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
if (inspector_started)
WaitForWorkerInspectorToStop(env_.get());
{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
stopped_ = true;
}
env_->RunCleanup();
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
platform->DrainTasks(isolate_);
}
env_.reset();
}
DisposeIsolate();
{
Mutex::ScopedLock lock(mutex_);
CHECK(thread_exit_async_);
scheduled_on_thread_stopped_ = true;
uv_async_send(thread_exit_async_.get());
}
Debug(this, "Worker %llu thread stops", thread_id_);
}
void Worker::DisposeIsolate() {
if (env_) {
CHECK_NOT_NULL(isolate_);
Locker locker(isolate_);
Isolate::Scope isolate_scope(isolate_);
env_.reset();
}
if (isolate_ == nullptr)
return;
Debug(this, "Worker %llu dispose isolate", thread_id_);
CHECK(isolate_data_);
MultiIsolatePlatform* platform = isolate_data_->platform();
platform->CancelPendingDelayedTasks(isolate_);
isolate_data_.reset();
platform->UnregisterIsolate(isolate_);
isolate_->Dispose();
isolate_ = nullptr;
}
void Worker::JoinThread() {
if (thread_joined_)
return;
@ -340,7 +361,6 @@ void Worker::OnThreadStopped() {
CHECK(stopped_);
}
CHECK_NULL(child_port_);
parent_port_ = nullptr;
}
@ -370,16 +390,9 @@ Worker::~Worker() {
CHECK(stopped_);
CHECK(thread_joined_);
CHECK_NULL(child_port_);
// This has most likely already happened within the worker thread -- this
// is just in case Worker creation failed early.
DisposeIsolate();
// Need to run the loop one more time to close the platform's uv_async_t
uv_run(&loop_, UV_RUN_ONCE);
CheckedUvLoopClose(&loop_);
Debug(this, "Worker %llu destroyed", thread_id_);
}
@ -476,7 +489,13 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
}), 0);
CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
static_cast<Worker*>(arg)->Run();
Worker* w = static_cast<Worker*>(arg);
w->Run();
Mutex::ScopedLock lock(w->mutex_);
CHECK(w->thread_exit_async_);
w->scheduled_on_thread_stopped_ = true;
uv_async_send(w->thread_exit_async_.get());
}, static_cast<void*>(w)), 0);
}
@ -510,12 +529,12 @@ void Worker::Exit(int code) {
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
if (!stopped_) {
CHECK_NOT_NULL(env_);
stopped_ = true;
exit_code_ = code;
if (child_port_ != nullptr)
child_port_->StopEventLoop();
isolate_->TerminateExecution();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}
}

View File

@ -9,6 +9,8 @@
namespace node {
namespace worker {
class WorkerThreadData;
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
@ -49,17 +51,19 @@ class Worker : public AsyncWrap {
private:
void OnThreadStopped();
void DisposeIsolate();
uv_loop_t loop_;
DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
DeleteFnPtr<Environment, FreeEnvironment> env_;
const std::string url_;
std::shared_ptr<PerIsolateOptions> per_isolate_opts_;
MultiIsolatePlatform* platform_;
v8::Isolate* isolate_ = nullptr;
DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
array_buffer_allocator_;
bool profiler_idle_notifier_started_;
uv_thread_t tid_;
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
std::unique_ptr<inspector::ParentInspectorHandle> inspector_parent_handle_;
#endif
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;
@ -79,12 +83,14 @@ class Worker : public AsyncWrap {
std::unique_ptr<MessagePortData> child_port_data_;
// The child port is always kept alive by the child Environment's persistent
// handle to it.
// The child port is kept alive by the child Environment's persistent
// handle to it, as long as that child Environment exists.
MessagePort* child_port_ = nullptr;
// This is always kept alive because the JS object associated with the Worker
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;
friend class WorkerThreadData;
};
} // namespace worker