From b0e7712fb2a207e116e58252cf471d50aebc728f Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 7 Jul 2022 15:22:55 +0200 Subject: [PATCH] MEDIUM: task/thread: move the task shared wait queues per thread group Their migration was postponed for convenience only but now's time for having the shared wait queues per thread group and not just per process, otherwise the WQ lock uses a huge amount of CPU alone. --- include/haproxy/task.h | 22 +++++--------- include/haproxy/tinfo-t.h | 4 +++ src/task.c | 61 +++++++++++++++++---------------------- 3 files changed, 39 insertions(+), 48 deletions(-) diff --git a/include/haproxy/task.h b/include/haproxy/task.h index a79319eb5..9bb09a486 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -94,12 +94,6 @@ extern struct pool_head *pool_head_task; extern struct pool_head *pool_head_tasklet; extern struct pool_head *pool_head_notification; -#ifdef USE_THREAD -extern struct eb_root timers; /* sorted timers tree, global */ -#endif - -__decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */ - void __tasklet_wakeup_on(struct tasklet *tl, int thr); struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl); void task_kill(struct task *t); @@ -273,10 +267,10 @@ static inline struct task *task_unlink_wq(struct task *t) locked = t->tid < 0; BUG_ON(t->tid >= 0 && t->tid != tid); if (locked) - HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); __task_unlink_wq(t); if (locked) - HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); } return t; } @@ -303,10 +297,10 @@ static inline void task_queue(struct task *task) #ifdef USE_THREAD if (task->tid < 0) { - HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task, &timers); - HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock); + __task_queue(task, &tg_ctx->timers); + HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); } else #endif { @@ -666,14 +660,14 @@ static inline void task_schedule(struct task *task, int when) #ifdef USE_THREAD if (task->tid < 0) { /* FIXME: is it really needed to lock the WQ during the check ? */ - HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); if (task_in_wq(task)) when = tick_first(when, task->expire); task->expire = when; if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key)) - __task_queue(task, &timers); - HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &wq_lock); + __task_queue(task, &tg_ctx->timers); + HA_RWLOCK_WRUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); } else #endif { diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h index 16baffd60..6df31f117 100644 --- a/include/haproxy/tinfo-t.h +++ b/include/haproxy/tinfo-t.h @@ -69,6 +69,10 @@ struct tgroup_ctx { ulong threads_harmless; /* mask of threads that are not modifying anything */ ulong threads_idle; /* mask of threads idling in the poller */ ulong stopping_threads; /* mask of threads currently stopping */ + + HA_RWLOCK_T wq_lock; /* RW lock related to the wait queue below */ + struct eb_root timers; /* wait queue (sorted timers tree, global, accessed under wq_lock) */ + /* pad to cache line (64B) */ char __pad[0]; /* unused except to check remaining room */ char __end[0] __attribute__((aligned(64))); diff --git a/src/task.c b/src/task.c index 00b2887c0..11a1b2ccd 100644 --- a/src/task.c +++ b/src/task.c @@ -36,13 +36,6 @@ DECLARE_POOL(pool_head_notification, "notification", sizeof(struct notification) unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */ -__decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */ - -#ifdef USE_THREAD -struct eb_root timers; /* sorted timers tree, global, accessed under wq_lock */ -#endif - - /* Flags the task for immediate destruction and puts it into its first * thread's shared tasklet list if not yet queued/running. This will bypass @@ -277,9 +270,9 @@ void __task_wakeup(struct task *t) void __task_queue(struct task *task, struct eb_root *wq) { #ifdef USE_THREAD - BUG_ON((wq == &timers && task->tid >= 0) || + BUG_ON((wq == &tg_ctx->timers && task->tid >= 0) || (wq == &th_ctx->timers && task->tid < 0) || - (wq != &timers && wq != &th_ctx->timers)); + (wq != &tg_ctx->timers && wq != &th_ctx->timers)); #endif /* if this happens the process is doomed anyway, so better catch it now * so that we have the caller in the stack. @@ -367,32 +360,32 @@ void wake_expired_tasks() } #ifdef USE_THREAD - if (eb_is_empty(&timers)) + if (eb_is_empty(&tg_ctx->timers)) goto leave; - HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock); - eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); + HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); + eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK); if (!eb) { - eb = eb32_first(&timers); + eb = eb32_first(&tg_ctx->timers); if (likely(!eb)) { - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); goto leave; } } key = eb->key; if (tick_is_lt(now_ms, key)) { - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); goto leave; } /* There's really something of interest here, let's visit the queue */ - if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &wq_lock)) { + if (HA_RWLOCK_TRYRDTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock)) { /* if we failed to grab the lock it means another thread is * already doing the same here, so let it do the job. */ - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); goto leave; } @@ -400,13 +393,13 @@ void wake_expired_tasks() lookup_next: if (max_processed-- <= 0) break; - eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); + eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK); if (!eb) { /* we might have reached the end of the tree, typically because * is in the first half and we're first scanning the last * half. Let's loop back to the beginning of the tree now. */ - eb = eb32_first(&timers); + eb = eb32_first(&tg_ctx->timers); if (likely(!eb)) break; } @@ -431,20 +424,20 @@ void wake_expired_tasks() if (tick_is_expired(task->expire, now_ms)) { /* expired task, wake it up */ - HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock); __task_unlink_wq(task); - HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock); task_drop_running(task, TASK_WOKEN_TIMER); } else if (task->expire != eb->key) { /* task is not expired but its key doesn't match so let's * update it and skip to next apparently expired task. */ - HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &tg_ctx->wq_lock); __task_unlink_wq(task); if (tick_isset(task->expire)) - __task_queue(task, &timers); - HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock); + __task_queue(task, &tg_ctx->timers); + HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &tg_ctx->wq_lock); task_drop_running(task, 0); goto lookup_next; } @@ -456,7 +449,7 @@ void wake_expired_tasks() } } - HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_SKUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); #endif leave: return; @@ -487,14 +480,14 @@ int next_timer_expiry() ret = eb->key; #ifdef USE_THREAD - if (!eb_is_empty(&timers)) { - HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &wq_lock); - eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK); + if (!eb_is_empty(&tg_ctx->timers)) { + HA_RWLOCK_RDLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); + eb = eb32_lookup_ge(&tg_ctx->timers, now_ms - TIMER_LOOK_BACK); if (!eb) - eb = eb32_first(&timers); + eb = eb32_first(&tg_ctx->timers); if (eb) key = eb->key; - HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &wq_lock); + HA_RWLOCK_RDUNLOCK(TASK_WQ_LOCK, &tg_ctx->wq_lock); if (eb) ret = tick_first(ret, key); } @@ -914,7 +907,7 @@ void mworker_cleantasks() task_destroy(t); } /* cleanup the timers queue */ - tmp_wq = eb32_first(&timers); + tmp_wq = eb32_first(&tg_ctx->timers); while (tmp_wq) { t = eb32_entry(tmp_wq, struct task, wq); tmp_wq = eb32_next(tmp_wq); @@ -944,9 +937,9 @@ static void init_task() { int i, q; -#ifdef USE_THREAD - memset(&timers, 0, sizeof(timers)); -#endif + for (i = 0; i < MAX_TGROUPS; i++) + memset(&ha_tgroup_ctx[i].timers, 0, sizeof(ha_tgroup_ctx[i].timers)); + for (i = 0; i < MAX_THREADS; i++) { for (q = 0; q < TL_CLASSES; q++) LIST_INIT(&ha_thread_ctx[i].tasklets[q]);