Revert "MDEV-33840 tpool : switch off maintenance timer when not needed."
This reverts commit 09bae92c16f9c37c931ab3f7932664f55bb9a842.
This commit is contained in:
parent
3a3fe3005d
commit
2ba79aba2b
@ -270,10 +270,10 @@ class thread_pool_generic : public thread_pool
|
|||||||
OFF, ON
|
OFF, ON
|
||||||
};
|
};
|
||||||
timer_state_t m_timer_state= timer_state_t::OFF;
|
timer_state_t m_timer_state= timer_state_t::OFF;
|
||||||
void switch_timer(timer_state_t state,std::unique_lock<std::mutex> &lk);
|
void switch_timer(timer_state_t state);
|
||||||
|
|
||||||
/* Updates idle_since, and maybe switches the timer off */
|
/* Updates idle_since, and maybe switches the timer off */
|
||||||
void check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk);
|
void check_idle(std::chrono::system_clock::time_point now);
|
||||||
|
|
||||||
/** time point when timer last ran, used as a coarse clock. */
|
/** time point when timer last ran, used as a coarse clock. */
|
||||||
std::chrono::system_clock::time_point m_timestamp;
|
std::chrono::system_clock::time_point m_timestamp;
|
||||||
@ -306,9 +306,9 @@ class thread_pool_generic : public thread_pool
|
|||||||
{
|
{
|
||||||
((thread_pool_generic *)arg)->maintenance();
|
((thread_pool_generic *)arg)->maintenance();
|
||||||
}
|
}
|
||||||
bool add_thread(std::unique_lock<std::mutex> &lk);
|
bool add_thread();
|
||||||
bool wake(worker_wake_reason reason, task *t = nullptr);
|
bool wake(worker_wake_reason reason, task *t = nullptr);
|
||||||
void maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk);
|
void maybe_wake_or_create_thread();
|
||||||
bool too_many_active_threads();
|
bool too_many_active_threads();
|
||||||
bool get_task(worker_data *thread_var, task **t);
|
bool get_task(worker_data *thread_var, task **t);
|
||||||
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
|
bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
|
||||||
@ -616,11 +616,11 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
|
static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
|
||||||
constexpr auto max_idle_time= std::chrono::seconds(20);
|
constexpr auto max_idle_time= std::chrono::minutes(1);
|
||||||
|
|
||||||
/* Time since maintenance timer had nothing to do */
|
/* Time since maintenance timer had nothing to do */
|
||||||
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
|
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
|
||||||
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now, std::unique_lock<std::mutex> &lk)
|
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
|
||||||
{
|
{
|
||||||
DBUG_ASSERT(m_task_queue.empty());
|
DBUG_ASSERT(m_task_queue.empty());
|
||||||
|
|
||||||
@ -647,7 +647,7 @@ void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now,
|
|||||||
if (now - idle_since > max_idle_time)
|
if (now - idle_since > max_idle_time)
|
||||||
{
|
{
|
||||||
idle_since= invalid_timestamp;
|
idle_since= invalid_timestamp;
|
||||||
switch_timer(timer_state_t::OFF,lk);
|
switch_timer(timer_state_t::OFF);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -681,7 +681,7 @@ void thread_pool_generic::maintenance()
|
|||||||
|
|
||||||
if (m_task_queue.empty())
|
if (m_task_queue.empty())
|
||||||
{
|
{
|
||||||
check_idle(m_timestamp, lk);
|
check_idle(m_timestamp);
|
||||||
m_last_activity = m_tasks_dequeued + m_wakeups;
|
m_last_activity = m_tasks_dequeued + m_wakeups;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -701,7 +701,7 @@ void thread_pool_generic::maintenance()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
maybe_wake_or_create_thread(lk);
|
maybe_wake_or_create_thread();
|
||||||
|
|
||||||
size_t thread_cnt = (int)thread_count();
|
size_t thread_cnt = (int)thread_count();
|
||||||
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
|
if (m_last_activity == m_tasks_dequeued + m_wakeups &&
|
||||||
@ -709,7 +709,7 @@ void thread_pool_generic::maintenance()
|
|||||||
{
|
{
|
||||||
// no progress made since last iteration. create new
|
// no progress made since last iteration. create new
|
||||||
// thread
|
// thread
|
||||||
add_thread(lk);
|
add_thread();
|
||||||
}
|
}
|
||||||
m_last_activity = m_tasks_dequeued + m_wakeups;
|
m_last_activity = m_tasks_dequeued + m_wakeups;
|
||||||
m_last_thread_count= thread_cnt;
|
m_last_thread_count= thread_cnt;
|
||||||
@ -736,14 +736,14 @@ static int throttling_interval_ms(size_t n_threads,size_t concurrency)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Create a new worker.*/
|
/* Create a new worker.*/
|
||||||
bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
|
bool thread_pool_generic::add_thread()
|
||||||
{
|
{
|
||||||
size_t n_threads = thread_count();
|
size_t n_threads = thread_count();
|
||||||
|
|
||||||
if (n_threads >= m_max_threads)
|
if (n_threads >= m_max_threads)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (n_threads >= m_min_threads && m_min_threads != m_max_threads)
|
if (n_threads >= m_min_threads)
|
||||||
{
|
{
|
||||||
auto now = std::chrono::system_clock::now();
|
auto now = std::chrono::system_clock::now();
|
||||||
if (now - m_last_thread_creation <
|
if (now - m_last_thread_creation <
|
||||||
@ -753,7 +753,7 @@ bool thread_pool_generic::add_thread(std::unique_lock<std::mutex> &lk)
|
|||||||
Throttle thread creation and wakeup deadlock detection timer,
|
Throttle thread creation and wakeup deadlock detection timer,
|
||||||
if is it off.
|
if is it off.
|
||||||
*/
|
*/
|
||||||
switch_timer(timer_state_t::ON, lk);
|
switch_timer(timer_state_t::ON);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -835,10 +835,12 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
|
|||||||
if (!m_concurrency)
|
if (!m_concurrency)
|
||||||
m_concurrency = 1;
|
m_concurrency = 1;
|
||||||
|
|
||||||
|
// start the timer
|
||||||
|
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mutex> &lk)
|
void thread_pool_generic::maybe_wake_or_create_thread()
|
||||||
{
|
{
|
||||||
if (m_task_queue.empty())
|
if (m_task_queue.empty())
|
||||||
return;
|
return;
|
||||||
@ -851,7 +853,7 @@ void thread_pool_generic::maybe_wake_or_create_thread(std::unique_lock<std::mute
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
add_thread(lk);
|
add_thread();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -870,7 +872,7 @@ void thread_pool_generic::submit_task(task* task)
|
|||||||
task->add_ref();
|
task->add_ref();
|
||||||
m_tasks_enqueued++;
|
m_tasks_enqueued++;
|
||||||
m_task_queue.push(task);
|
m_task_queue.push(task);
|
||||||
maybe_wake_or_create_thread(lk);
|
maybe_wake_or_create_thread();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -893,7 +895,7 @@ void thread_pool_generic::wait_begin()
|
|||||||
m_waiting_task_count++;
|
m_waiting_task_count++;
|
||||||
|
|
||||||
/* Maintain concurrency */
|
/* Maintain concurrency */
|
||||||
maybe_wake_or_create_thread(lk);
|
maybe_wake_or_create_thread();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -908,30 +910,26 @@ void thread_pool_generic::wait_end()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void thread_pool_generic::switch_timer(timer_state_t state, std::unique_lock<std::mutex> &lk)
|
void thread_pool_generic::switch_timer(timer_state_t state)
|
||||||
{
|
{
|
||||||
if (m_timer_state == state)
|
if (m_timer_state == state)
|
||||||
return;
|
return;
|
||||||
/* No maintenance timer for fixed threadpool size.*/
|
/*
|
||||||
DBUG_ASSERT(m_min_threads != m_max_threads);
|
We can't use timer::set_time, because mysys timers are deadlock
|
||||||
DBUG_ASSERT(lk.owns_lock());
|
prone.
|
||||||
|
|
||||||
|
Instead, to switch off we increase the timer period
|
||||||
|
and decrease period to switch on.
|
||||||
|
|
||||||
|
This might introduce delays in thread creation when needed,
|
||||||
|
as period will only be changed when timer fires next time.
|
||||||
|
For this reason, we can't use very long periods for the "off" state.
|
||||||
|
*/
|
||||||
m_timer_state= state;
|
m_timer_state= state;
|
||||||
if(state == timer_state_t::OFF)
|
long long period= (state == timer_state_t::OFF) ?
|
||||||
{
|
m_timer_interval.count()*10: m_timer_interval.count();
|
||||||
m_maintenance_timer.set_period(0);
|
|
||||||
}
|
m_maintenance_timer.set_period((int)period);
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
It is necessary to unlock the thread_pool::m_mtx
|
|
||||||
to avoid the deadlock with thr_timer's LOCK_timer.
|
|
||||||
Otherwise, lock order would be violated.
|
|
||||||
*/
|
|
||||||
lk.unlock();
|
|
||||||
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
|
|
||||||
lk.lock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user