src: use modern v8::Platform worker threads APIs

Precursor to removing deprecated APIs on the v8 side @
https://chromium-review.googlesource.com/c/v8/v8/+/1045310

PR-URL: https://github.com/nodejs/node/pull/21079
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Yang Guo <yangguo@chromium.org>
This commit is contained in:
Gabriel Charette 2018-05-09 12:20:04 -04:00 committed by Michaël Zasso
parent 8bcb12848f
commit 0f3c2c64d2
No known key found for this signature in database
GPG Key ID: 770F7A9A5AE15600
5 changed files with 56 additions and 58 deletions

View File

@ -404,7 +404,7 @@ static struct {
} }
void DrainVMTasks(Isolate* isolate) { void DrainVMTasks(Isolate* isolate) {
platform_->DrainBackgroundTasks(isolate); platform_->DrainTasks(isolate);
} }
void CancelVMTasks(Isolate* isolate) { void CancelVMTasks(Isolate* isolate) {

View File

@ -229,7 +229,7 @@ class MultiIsolatePlatform : public v8::Platform {
// posted during flushing of the queue are postponed until the next // posted during flushing of the queue are postponed until the next
// flushing. // flushing.
virtual bool FlushForegroundTasks(v8::Isolate* isolate) = 0; virtual bool FlushForegroundTasks(v8::Isolate* isolate) = 0;
virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0; virtual void DrainTasks(v8::Isolate* isolate) = 0;
virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0; virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0;
// These will be called by the `IsolateData` creation/destruction functions. // These will be called by the `IsolateData` creation/destruction functions.

View File

@ -15,50 +15,52 @@ using v8::Platform;
using v8::Task; using v8::Task;
using v8::TracingController; using v8::TracingController;
static void BackgroundRunner(void* data) { namespace {
static void WorkerThreadMain(void* data) {
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
"BackgroundTaskRunner"); "BackgroundTaskRunner");
TaskQueue<Task> *background_tasks = static_cast<TaskQueue<Task> *>(data); TaskQueue<Task>* pending_worker_tasks = static_cast<TaskQueue<Task>*>(data);
while (std::unique_ptr<Task> task = background_tasks->BlockingPop()) { while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
task->Run(); task->Run();
background_tasks->NotifyOfCompletion(); pending_worker_tasks->NotifyOfCompletion();
} }
} }
BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) { } // namespace
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
for (int i = 0; i < thread_pool_size; i++) { for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() }; std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0) if (uv_thread_create(t.get(), WorkerThreadMain,
&pending_worker_tasks_) != 0) {
break; break;
}
threads_.push_back(std::move(t)); threads_.push_back(std::move(t));
} }
} }
void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) { void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
background_tasks_.Push(std::move(task)); pending_worker_tasks_.Push(std::move(task));
} }
void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) { void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
UNREACHABLE(); UNREACHABLE();
} }
void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task, void WorkerThreadsTaskRunner::BlockingDrain() {
double delay_in_seconds) { pending_worker_tasks_.BlockingDrain();
UNREACHABLE();
} }
void BackgroundTaskRunner::BlockingDrain() { void WorkerThreadsTaskRunner::Shutdown() {
background_tasks_.BlockingDrain(); pending_worker_tasks_.Stop();
}
void BackgroundTaskRunner::Shutdown() {
background_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) { for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get())); CHECK_EQ(0, uv_thread_join(threads_[i].get()));
} }
} }
size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const { int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
return threads_.size(); return threads_.size();
} }
@ -131,8 +133,8 @@ NodePlatform::NodePlatform(int thread_pool_size,
TracingController* controller = new TracingController(); TracingController* controller = new TracingController();
tracing_controller_.reset(controller); tracing_controller_.reset(controller);
} }
background_task_runner_ = worker_thread_task_runner_ =
std::make_shared<BackgroundTaskRunner>(thread_pool_size); std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
} }
void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) { void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) {
@ -160,7 +162,7 @@ void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) {
} }
void NodePlatform::Shutdown() { void NodePlatform::Shutdown() {
background_task_runner_->Shutdown(); worker_thread_task_runner_->Shutdown();
{ {
Mutex::ScopedLock lock(per_isolate_mutex_); Mutex::ScopedLock lock(per_isolate_mutex_);
@ -168,8 +170,8 @@ void NodePlatform::Shutdown() {
} }
} }
size_t NodePlatform::NumberOfAvailableBackgroundThreads() { int NodePlatform::NumberOfWorkerThreads() {
return background_task_runner_->NumberOfAvailableBackgroundThreads(); return worker_thread_task_runner_->NumberOfWorkerThreads();
} }
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) { void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
@ -201,15 +203,12 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() {
scheduled_delayed_tasks_.clear(); scheduled_delayed_tasks_.clear();
} }
void NodePlatform::DrainBackgroundTasks(Isolate* isolate) { void NodePlatform::DrainTasks(Isolate* isolate) {
std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate); std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
do { do {
// Right now, there is no way to drain only background tasks associated // Worker tasks aren't associated with an Isolate.
// with a specific isolate, so this sometimes does more work than worker_thread_task_runner_->BlockingDrain();
// necessary. In the long run, that functionality is probably going to
// be available anyway, though.
background_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal()); } while (per_isolate->FlushForegroundTasksInternal());
} }
@ -249,11 +248,17 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
return did_work; return did_work;
} }
void NodePlatform::CallOnBackgroundThread(Task* task, void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
ExpectedRuntime expected_runtime) { worker_thread_task_runner_->PostTask(std::move(task));
background_task_runner_->PostTask(std::unique_ptr<Task>(task));
} }
void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
worker_thread_task_runner_->PostDelayedTask(std::move(task),
delay_in_seconds);
}
std::shared_ptr<PerIsolatePlatformData> std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) { NodePlatform::ForIsolate(Isolate* isolate) {
Mutex::ScopedLock lock(per_isolate_mutex_); Mutex::ScopedLock lock(per_isolate_mutex_);
@ -283,11 +288,6 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; } bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
std::shared_ptr<v8::TaskRunner>
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) {
return background_task_runner_;
}
std::shared_ptr<v8::TaskRunner> std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) { NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
return ForIsolate(isolate); return ForIsolate(isolate);

View File

@ -93,23 +93,22 @@ class PerIsolatePlatformData :
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_; std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
}; };
// This acts as the single background task runner for all Isolates. // This acts as the single worker thread task runner for all Isolates.
class BackgroundTaskRunner : public v8::TaskRunner { class WorkerThreadsTaskRunner {
public: public:
explicit BackgroundTaskRunner(int thread_pool_size); explicit WorkerThreadsTaskRunner(int thread_pool_size);
void PostTask(std::unique_ptr<v8::Task> task) override; void PostTask(std::unique_ptr<v8::Task> task);
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostDelayedTask(std::unique_ptr<v8::Task> task, void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override; double delay_in_seconds);
bool IdleTasksEnabled() override { return false; };
void BlockingDrain(); void BlockingDrain();
void Shutdown(); void Shutdown();
size_t NumberOfAvailableBackgroundThreads() const; int NumberOfWorkerThreads() const;
private: private:
TaskQueue<v8::Task> background_tasks_; TaskQueue<v8::Task> pending_worker_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_; std::vector<std::unique_ptr<uv_thread_t>> threads_;
}; };
@ -118,14 +117,15 @@ class NodePlatform : public MultiIsolatePlatform {
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller); NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller);
virtual ~NodePlatform() {} virtual ~NodePlatform() {}
void DrainBackgroundTasks(v8::Isolate* isolate) override; void DrainTasks(v8::Isolate* isolate) override;
void CancelPendingDelayedTasks(v8::Isolate* isolate) override; void CancelPendingDelayedTasks(v8::Isolate* isolate) override;
void Shutdown(); void Shutdown();
// v8::Platform implementation. // v8::Platform implementation.
size_t NumberOfAvailableBackgroundThreads() override; int NumberOfWorkerThreads() override;
void CallOnBackgroundThread(v8::Task* task, void CallOnWorkerThread(std::unique_ptr<v8::Task> task) override;
ExpectedRuntime expected_runtime) override; void CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override; void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override;
void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task, void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task,
double delay_in_seconds) override; double delay_in_seconds) override;
@ -138,8 +138,6 @@ class NodePlatform : public MultiIsolatePlatform {
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override; void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override; void UnregisterIsolate(IsolateData* isolate_data) override;
std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
v8::Isolate* isolate) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner( std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate) override; v8::Isolate* isolate) override;
@ -151,7 +149,7 @@ class NodePlatform : public MultiIsolatePlatform {
std::shared_ptr<PerIsolatePlatformData>> per_isolate_; std::shared_ptr<PerIsolatePlatformData>> per_isolate_;
std::unique_ptr<v8::TracingController> tracing_controller_; std::unique_ptr<v8::TracingController> tracing_controller_;
std::shared_ptr<BackgroundTaskRunner> background_task_runner_; std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_;
}; };
} // namespace node } // namespace node

View File

@ -173,7 +173,7 @@ void Worker::Run() {
uv_run(&loop_, UV_RUN_DEFAULT); uv_run(&loop_, UV_RUN_DEFAULT);
if (is_stopped()) break; if (is_stopped()) break;
platform->DrainBackgroundTasks(isolate_); platform->DrainTasks(isolate_);
more = uv_loop_alive(&loop_); more = uv_loop_alive(&loop_);
if (more && !is_stopped()) if (more && !is_stopped())
@ -232,7 +232,7 @@ void Worker::Run() {
// This call needs to be made while the `Environment` is still alive // This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the // because we assume that it is available for async tracking in the
// NodePlatform implementation. // NodePlatform implementation.
platform->DrainBackgroundTasks(isolate_); platform->DrainTasks(isolate_);
} }
env_.reset(); env_.reset();