MDEV-24295 Reduce wakeups by tpool maintenance timer, when server is idle
If maintenance timer does not do much for prolonged time, it will wake up less frequently, once every 4 seconds instead of once every 0.4 second. It will wakeup more often if thread creation is throttled, to avoid stalls.
This commit is contained in:
parent
111963477b
commit
5bb5d4ad3a
@ -212,6 +212,17 @@ class thread_pool_generic : public thread_pool
|
|||||||
/** True, if threadpool is being shutdown, false otherwise */
|
/** True, if threadpool is being shutdown, false otherwise */
|
||||||
bool m_in_shutdown;
|
bool m_in_shutdown;
|
||||||
|
|
||||||
|
/** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
|
||||||
|
enum class timer_state_t
|
||||||
|
{
|
||||||
|
OFF, ON
|
||||||
|
};
|
||||||
|
timer_state_t m_timer_state= timer_state_t::OFF;
|
||||||
|
void switch_timer(timer_state_t state);
|
||||||
|
|
||||||
|
/* Updates idle_since, and maybe switches the timer off */
|
||||||
|
void check_idle(std::chrono::system_clock::time_point now);
|
||||||
|
|
||||||
/** time point when timer last ran, used as a coarse clock. */
|
/** time point when timer last ran, used as a coarse clock. */
|
||||||
std::chrono::system_clock::time_point m_timestamp;
|
std::chrono::system_clock::time_point m_timestamp;
|
||||||
|
|
||||||
@ -233,7 +244,6 @@ class thread_pool_generic : public thread_pool
|
|||||||
/* maintenance related statistics (see maintenance()) */
|
/* maintenance related statistics (see maintenance()) */
|
||||||
size_t m_last_thread_count;
|
size_t m_last_thread_count;
|
||||||
unsigned long long m_last_activity;
|
unsigned long long m_last_activity;
|
||||||
std::unique_ptr<timer> m_maintenance_timer_task;
|
|
||||||
|
|
||||||
void worker_main(worker_data *thread_data);
|
void worker_main(worker_data *thread_data);
|
||||||
void worker_end(worker_data* thread_data);
|
void worker_end(worker_data* thread_data);
|
||||||
@ -357,6 +367,22 @@ public:
|
|||||||
thr_timer_settime(this, 1000ULL * initial_delay_ms);
|
thr_timer_settime(this, 1000ULL * initial_delay_ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Change only period of a periodic timer
|
||||||
|
(after the next execution). Workarounds
|
||||||
|
mysys timer deadlocks
|
||||||
|
*/
|
||||||
|
void set_period(int period_ms)
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(m_mtx);
|
||||||
|
if (!m_on)
|
||||||
|
return;
|
||||||
|
if (!m_pool)
|
||||||
|
thr_timer_set_period(this, 1000ULL * period_ms);
|
||||||
|
else
|
||||||
|
m_period = period_ms;
|
||||||
|
}
|
||||||
|
|
||||||
void disarm() override
|
void disarm() override
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(m_mtx);
|
std::unique_lock<std::mutex> lk(m_mtx);
|
||||||
@ -380,7 +406,7 @@ public:
|
|||||||
disarm();
|
disarm();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
timer_generic m_maintenance_timer;
|
||||||
virtual timer* create_timer(callback_func func, void *data) override
|
virtual timer* create_timer(callback_func func, void *data) override
|
||||||
{
|
{
|
||||||
return new timer_generic(func, data, this);
|
return new timer_generic(func, data, this);
|
||||||
@ -526,6 +552,52 @@ void thread_pool_generic::worker_main(worker_data *thread_var)
|
|||||||
worker_end(thread_var);
|
worker_end(thread_var);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Check if threadpool had been idle for a while
|
||||||
|
Switch off maintenance timer if it is in idle state
|
||||||
|
for too long.
|
||||||
|
|
||||||
|
Helper function, to be used inside maintenance callback,
|
||||||
|
before m_last_activity is updated
|
||||||
|
*/
|
||||||
|
constexpr auto invalid_timestamp= std::chrono::system_clock::time_point::max();
|
||||||
|
constexpr auto max_idle_time= std::chrono::minutes(1);
|
||||||
|
|
||||||
|
/* Time since maintenance timer had nothing to do */
|
||||||
|
static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
|
||||||
|
void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
|
||||||
|
{
|
||||||
|
DBUG_ASSERT(m_task_queue.empty());
|
||||||
|
|
||||||
|
/*
|
||||||
|
We think that there is no activity, if there were at most 2 tasks
|
||||||
|
since last time, and there is a spare thread.
|
||||||
|
The 2 tasks (and not 0) is to account for some periodic timers.
|
||||||
|
*/
|
||||||
|
bool idle= m_standby_threads.m_count > 0;
|
||||||
|
|
||||||
|
if (!idle)
|
||||||
|
{
|
||||||
|
idle_since= invalid_timestamp;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (idle_since == invalid_timestamp)
|
||||||
|
{
|
||||||
|
idle_since= now;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Switch timer off after 1 minute of idle time */
|
||||||
|
if (now - idle_since > max_idle_time)
|
||||||
|
{
|
||||||
|
idle_since= invalid_timestamp;
|
||||||
|
switch_timer(timer_state_t::OFF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Periodic job to fix thread count and concurrency,
|
Periodic job to fix thread count and concurrency,
|
||||||
in case of long tasks, etc
|
in case of long tasks, etc
|
||||||
@ -555,6 +627,7 @@ void thread_pool_generic::maintenance()
|
|||||||
|
|
||||||
if (m_task_queue.empty())
|
if (m_task_queue.empty())
|
||||||
{
|
{
|
||||||
|
check_idle(m_timestamp);
|
||||||
m_last_activity = m_tasks_dequeued + m_wakeups;
|
m_last_activity = m_tasks_dequeued + m_wakeups;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -622,7 +695,12 @@ bool thread_pool_generic::add_thread()
|
|||||||
if (now - m_last_thread_creation <
|
if (now - m_last_thread_creation <
|
||||||
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
|
std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
|
||||||
{
|
{
|
||||||
/* Throttle thread creation.*/
|
/*
|
||||||
|
Throttle thread creation and wakeup deadlock detection timer,
|
||||||
|
if is it off.
|
||||||
|
*/
|
||||||
|
switch_timer(timer_state_t::ON);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -693,7 +771,7 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
|
|||||||
m_max_threads(max_threads),
|
m_max_threads(max_threads),
|
||||||
m_last_thread_count(),
|
m_last_thread_count(),
|
||||||
m_last_activity(),
|
m_last_activity(),
|
||||||
m_maintenance_timer_task()
|
m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
|
||||||
{
|
{
|
||||||
|
|
||||||
if (m_max_threads < m_concurrency)
|
if (m_max_threads < m_concurrency)
|
||||||
@ -703,11 +781,8 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
|
|||||||
if (!m_concurrency)
|
if (!m_concurrency)
|
||||||
m_concurrency = 1;
|
m_concurrency = 1;
|
||||||
|
|
||||||
if (min_threads < max_threads)
|
// start the timer
|
||||||
{
|
m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
|
||||||
m_maintenance_timer_task.reset(new timer_generic(thread_pool_generic::maintenance_func, this, nullptr));
|
|
||||||
m_maintenance_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -780,6 +855,30 @@ void thread_pool_generic::wait_end()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void thread_pool_generic::switch_timer(timer_state_t state)
|
||||||
|
{
|
||||||
|
if (m_timer_state == state)
|
||||||
|
return;
|
||||||
|
/*
|
||||||
|
We can't use timer::set_time, because mysys timers are deadlock
|
||||||
|
prone.
|
||||||
|
|
||||||
|
Instead, to switch off we increase the timer period
|
||||||
|
and decrease period to switch on.
|
||||||
|
|
||||||
|
This might introduce delays in thread creation when needed,
|
||||||
|
as period will only be changed when timer fires next time.
|
||||||
|
For this reason, we can't use very long periods for the "off" state.
|
||||||
|
*/
|
||||||
|
m_timer_state= state;
|
||||||
|
long long period= (state == timer_state_t::OFF) ?
|
||||||
|
m_timer_interval.count()*10: m_timer_interval.count();
|
||||||
|
|
||||||
|
m_maintenance_timer.set_period((int)period);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Wake up all workers, and wait until they are gone
|
Wake up all workers, and wait until they are gone
|
||||||
Stop the timer.
|
Stop the timer.
|
||||||
@ -794,7 +893,7 @@ thread_pool_generic::~thread_pool_generic()
|
|||||||
m_aio.reset();
|
m_aio.reset();
|
||||||
|
|
||||||
/* Also stop the maintanence task early. */
|
/* Also stop the maintanence task early. */
|
||||||
m_maintenance_timer_task.reset();
|
m_maintenance_timer.disarm();
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lk(m_mtx);
|
std::unique_lock<std::mutex> lk(m_mtx);
|
||||||
m_in_shutdown= true;
|
m_in_shutdown= true;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user