src: implement v8::TaskRunner API in NodePlatform

V8 is switching APIs for scheduling tasks. Implement the new APIs.

Fixes: https://github.com/nodejs/node-v8/issues/24
Refs: c690f54d95

PR-URL: https://github.com/nodejs/node/pull/17134
Fixes: https://github.com/nodejs/node-v8/issues/24
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2017-11-19 14:29:08 +01:00 committed by James M Snell
parent 2540e3dacf
commit f005656719
2 changed files with 119 additions and 44 deletions

View File

@ -24,6 +24,43 @@ static void BackgroundRunner(void* data) {
} }
} }
BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) {
for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0)
break;
threads_.push_back(std::move(t));
}
}
void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) {
background_tasks_.Push(std::move(task));
}
void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
UNREACHABLE();
}
void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
UNREACHABLE();
}
void BackgroundTaskRunner::BlockingDrain() {
background_tasks_.BlockingDrain();
}
void BackgroundTaskRunner::Shutdown() {
background_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
}
}
size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const {
return threads_.size();
}
PerIsolatePlatformData::PerIsolatePlatformData( PerIsolatePlatformData::PerIsolatePlatformData(
v8::Isolate* isolate, uv_loop_t* loop) v8::Isolate* isolate, uv_loop_t* loop)
: isolate_(isolate), loop_(loop) { : isolate_(isolate), loop_(loop) {
@ -38,17 +75,20 @@ void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
platform_data->FlushForegroundTasksInternal(); platform_data->FlushForegroundTasksInternal();
} }
void PerIsolatePlatformData::CallOnForegroundThread( void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
std::unique_ptr<Task> task) { UNREACHABLE();
}
void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
foreground_tasks_.Push(std::move(task)); foreground_tasks_.Push(std::move(task));
uv_async_send(flush_tasks_); uv_async_send(flush_tasks_);
} }
void PerIsolatePlatformData::CallDelayedOnForegroundThread( void PerIsolatePlatformData::PostDelayedTask(
std::unique_ptr<Task> task, double delay_in_seconds) { std::unique_ptr<Task> task, double delay_in_seconds) {
std::unique_ptr<DelayedTask> delayed(new DelayedTask()); std::unique_ptr<DelayedTask> delayed(new DelayedTask());
delayed->task = std::move(task); delayed->task = std::move(task);
delayed->platform_data = this; delayed->platform_data = shared_from_this();
delayed->timeout = delay_in_seconds; delayed->timeout = delay_in_seconds;
foreground_delayed_tasks_.Push(std::move(delayed)); foreground_delayed_tasks_.Push(std::move(delayed));
uv_async_send(flush_tasks_); uv_async_send(flush_tasks_);
@ -80,49 +120,43 @@ NodePlatform::NodePlatform(int thread_pool_size,
TracingController* controller = new TracingController(); TracingController* controller = new TracingController();
tracing_controller_.reset(controller); tracing_controller_.reset(controller);
} }
for (int i = 0; i < thread_pool_size; i++) { background_task_runner_ =
uv_thread_t* t = new uv_thread_t(); std::make_shared<BackgroundTaskRunner>(thread_pool_size);
if (uv_thread_create(t, BackgroundRunner, &background_tasks_) != 0) {
delete t;
break;
}
threads_.push_back(std::unique_ptr<uv_thread_t>(t));
}
} }
void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) { void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) {
Isolate* isolate = isolate_data->isolate(); Isolate* isolate = isolate_data->isolate();
Mutex::ScopedLock lock(per_isolate_mutex_); Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* existing = per_isolate_[isolate]; std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
if (existing != nullptr) if (existing) {
existing->ref(); existing->ref();
else } else {
per_isolate_[isolate] = new PerIsolatePlatformData(isolate, loop); per_isolate_[isolate] =
std::make_shared<PerIsolatePlatformData>(isolate, loop);
}
} }
void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) { void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) {
Isolate* isolate = isolate_data->isolate(); Isolate* isolate = isolate_data->isolate();
Mutex::ScopedLock lock(per_isolate_mutex_); Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* existing = per_isolate_[isolate]; std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
CHECK_NE(existing, nullptr); CHECK(existing);
if (existing->unref() == 0) { if (existing->unref() == 0) {
delete existing;
per_isolate_.erase(isolate); per_isolate_.erase(isolate);
} }
} }
void NodePlatform::Shutdown() { void NodePlatform::Shutdown() {
background_tasks_.Stop(); background_task_runner_->Shutdown();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get())); {
Mutex::ScopedLock lock(per_isolate_mutex_);
per_isolate_.clear();
} }
Mutex::ScopedLock lock(per_isolate_mutex_);
for (const auto& pair : per_isolate_)
delete pair.second;
} }
size_t NodePlatform::NumberOfAvailableBackgroundThreads() { size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return threads_.size(); return background_task_runner_->NumberOfAvailableBackgroundThreads();
} }
void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) { void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
@ -155,14 +189,14 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() {
} }
void NodePlatform::DrainBackgroundTasks(Isolate* isolate) { void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
PerIsolatePlatformData* per_isolate = ForIsolate(isolate); std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);
do { do {
// Right now, there is no way to drain only background tasks associated // Right now, there is no way to drain only background tasks associated
// with a specific isolate, so this sometimes does more work than // with a specific isolate, so this sometimes does more work than
// necessary. In the long run, that functionality is probably going to // necessary. In the long run, that functionality is probably going to
// be available anyway, though. // be available anyway, though.
background_tasks_.BlockingDrain(); background_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal()); } while (per_isolate->FlushForegroundTasksInternal());
} }
@ -198,24 +232,25 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
void NodePlatform::CallOnBackgroundThread(Task* task, void NodePlatform::CallOnBackgroundThread(Task* task,
ExpectedRuntime expected_runtime) { ExpectedRuntime expected_runtime) {
background_tasks_.Push(std::unique_ptr<Task>(task)); background_task_runner_->PostTask(std::unique_ptr<Task>(task));
} }
PerIsolatePlatformData* NodePlatform::ForIsolate(Isolate* isolate) { std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) {
Mutex::ScopedLock lock(per_isolate_mutex_); Mutex::ScopedLock lock(per_isolate_mutex_);
PerIsolatePlatformData* data = per_isolate_[isolate]; std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
CHECK_NE(data, nullptr); CHECK(data);
return data; return data;
} }
void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) { void NodePlatform::CallOnForegroundThread(Isolate* isolate, Task* task) {
ForIsolate(isolate)->CallOnForegroundThread(std::unique_ptr<Task>(task)); ForIsolate(isolate)->PostTask(std::unique_ptr<Task>(task));
} }
void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate, void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
Task* task, Task* task,
double delay_in_seconds) { double delay_in_seconds) {
ForIsolate(isolate)->CallDelayedOnForegroundThread( ForIsolate(isolate)->PostDelayedTask(
std::unique_ptr<Task>(task), delay_in_seconds); std::unique_ptr<Task>(task), delay_in_seconds);
} }
@ -229,6 +264,16 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; } bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }
std::shared_ptr<v8::TaskRunner>
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) {
return background_task_runner_;
}
std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
return ForIsolate(isolate);
}
double NodePlatform::MonotonicallyIncreasingTime() { double NodePlatform::MonotonicallyIncreasingTime() {
// Convert nanos to seconds. // Convert nanos to seconds.
return uv_hrtime() / 1e9; return uv_hrtime() / 1e9;

View File

@ -43,17 +43,22 @@ struct DelayedTask {
std::unique_ptr<v8::Task> task; std::unique_ptr<v8::Task> task;
uv_timer_t timer; uv_timer_t timer;
double timeout; double timeout;
PerIsolatePlatformData* platform_data; std::shared_ptr<PerIsolatePlatformData> platform_data;
}; };
class PerIsolatePlatformData { // This acts as the foreground task runner for a given Isolate.
class PerIsolatePlatformData :
public v8::TaskRunner,
public std::enable_shared_from_this<PerIsolatePlatformData> {
public: public:
PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop); PerIsolatePlatformData(v8::Isolate* isolate, uv_loop_t* loop);
~PerIsolatePlatformData(); ~PerIsolatePlatformData();
void CallOnForegroundThread(std::unique_ptr<v8::Task> task); void PostTask(std::unique_ptr<v8::Task> task) override;
void CallDelayedOnForegroundThread(std::unique_ptr<v8::Task> task, void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
double delay_in_seconds); void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };
void Shutdown(); void Shutdown();
@ -84,6 +89,26 @@ class PerIsolatePlatformData {
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_; std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
}; };
// This acts as the single background task runner for all Isolates.
class BackgroundTaskRunner : public v8::TaskRunner {
public:
explicit BackgroundTaskRunner(int thread_pool_size);
void PostTask(std::unique_ptr<v8::Task> task) override;
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };
void BlockingDrain();
void Shutdown();
size_t NumberOfAvailableBackgroundThreads() const;
private:
TaskQueue<v8::Task> background_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
};
class NodePlatform : public MultiIsolatePlatform { class NodePlatform : public MultiIsolatePlatform {
public: public:
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller); NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller);
@ -109,15 +134,20 @@ class NodePlatform : public MultiIsolatePlatform {
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override; void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override; void UnregisterIsolate(IsolateData* isolate_data) override;
std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
v8::Isolate* isolate) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate) override;
private: private:
PerIsolatePlatformData* ForIsolate(v8::Isolate* isolate); std::shared_ptr<PerIsolatePlatformData> ForIsolate(v8::Isolate* isolate);
Mutex per_isolate_mutex_; Mutex per_isolate_mutex_;
std::unordered_map<v8::Isolate*, PerIsolatePlatformData*> per_isolate_; std::unordered_map<v8::Isolate*,
TaskQueue<v8::Task> background_tasks_; std::shared_ptr<PerIsolatePlatformData>> per_isolate_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
std::unique_ptr<v8::TracingController> tracing_controller_; std::unique_ptr<v8::TracingController> tracing_controller_;
std::shared_ptr<BackgroundTaskRunner> background_task_runner_;
}; };
} // namespace node } // namespace node