From 26b3e5236fc93baeec0df7c6c4c23a20d1985749 Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Wed, 15 Jan 2025 16:44:05 +0100 Subject: [PATCH] MEDIUM: servers/proxies: Switch to using per-tgroup queues. For both servers and proxies, use one connection queue per thread-group, instead of only one. Having only one can lead to severe performance issues on NUMA machines, it is actually trivial to get the watchdog to trigger on an AMD machine, having a server with a maxconn of 96, and an injector that uses 160 concurrent connections. We now have one queue per thread-group, however when dequeueing, we're dequeuing MAX_SELF_USE_QUEUE (currently 9) pendconns from our own queue, before dequeueing one from another thread group, if available, to make sure everybody is still running. --- include/haproxy/defaults.h | 4 + include/haproxy/proxy-t.h | 1 - include/haproxy/server-t.h | 2 - src/backend.c | 3 +- src/proxy.c | 1 - src/queue.c | 190 ++++++++++++++++++++++++++----------- src/server.c | 1 - 7 files changed, 139 insertions(+), 63 deletions(-) diff --git a/include/haproxy/defaults.h b/include/haproxy/defaults.h index 88ab35b46..f70bf4309 100644 --- a/include/haproxy/defaults.h +++ b/include/haproxy/defaults.h @@ -589,4 +589,8 @@ # define DEBUG_MEMORY_POOLS 1 #endif +#ifndef MAX_SELF_USE_QUEUE +#define MAX_SELF_USE_QUEUE 9 +#endif + #endif /* _HAPROXY_DEFAULTS_H */ diff --git a/include/haproxy/proxy-t.h b/include/haproxy/proxy-t.h index d6f9cef4d..d243d072e 100644 --- a/include/haproxy/proxy-t.h +++ b/include/haproxy/proxy-t.h @@ -362,7 +362,6 @@ struct proxy { __decl_thread(HA_RWLOCK_T lock); /* may be taken under the server's lock */ char *id, *desc; /* proxy id (name) and description */ - struct queue queue; /* queued requests (pendconns) */ struct proxy_per_tgroup *per_tgrp; /* array of per-tgroup stuff such as queues */ unsigned int queueslength; /* Sum of the length of each queue */ int totpend; /* total number of pending connections on this instance (for stats) */ diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h index ee266c207..d75171d93 100644 --- a/include/haproxy/server-t.h +++ b/include/haproxy/server-t.h @@ -369,9 +369,7 @@ struct server { unsigned int max_used_conns; /* Max number of used connections (the counter is reset at each connection purges */ unsigned int est_need_conns; /* Estimate on the number of needed connections (max of curr and previous max_used) */ - struct queue queue; /* pending connections */ struct mt_list sess_conns; /* list of private conns managed by a session on this server */ - unsigned int dequeuing; /* non-zero = dequeuing in progress (atomic) */ /* Element below are usd by LB algorithms and must be doable in * parallel to other threads reusing connections above. diff --git a/src/backend.c b/src/backend.c index f0dba4d18..b1703e295 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1029,6 +1029,7 @@ int assign_server_and_queue(struct stream *s) * not full, in which case we have to return FULL. */ if (srv->maxconn) { + struct queue *queue = &srv->per_tgrp[tgid - 1].queue; int served; int got_it = 0; @@ -1037,7 +1038,7 @@ int assign_server_and_queue(struct stream *s) * Try to increment its served, while making sure * it is < maxconn. */ - if (!srv->queue.length && + if (!queue->length && (served = srv->served) < srv_dynamic_maxconn(srv)) { /* * Attempt to increment served, while diff --git a/src/proxy.c b/src/proxy.c index a1a4ee734..a1143fc68 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -1408,7 +1408,6 @@ void init_new_proxy(struct proxy *p) { memset(p, 0, sizeof(struct proxy)); p->obj_type = OBJ_TYPE_PROXY; - queue_init(&p->queue, p, NULL); LIST_INIT(&p->acl); LIST_INIT(&p->http_req_rules); LIST_INIT(&p->http_res_rules); diff --git a/src/queue.c b/src/queue.c index f5ea31de4..0bfdd09f0 100644 --- a/src/queue.c +++ b/src/queue.c @@ -254,7 +254,7 @@ static struct pendconn *pendconn_first(struct eb_root *pendconns) * When a pending connection is dequeued, this function returns 1 if a pendconn * is dequeued, otherwise 0. */ -static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok) +static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok, int tgrp) { struct pendconn *p = NULL; struct pendconn *pp = NULL; @@ -264,18 +264,18 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int int got_it = 0; p = NULL; - if (srv->queue.length) - p = pendconn_first(&srv->queue.head); + if (srv->per_tgrp[tgrp - 1].queue.length) + p = pendconn_first(&srv->per_tgrp[tgrp - 1].queue.head); pp = NULL; - if (px_ok && px->queue.length) { + if (px_ok && px->per_tgrp[tgrp - 1].queue.length) { /* the lock only remains held as long as the pp is * in the proxy's queue. */ - HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock); - pp = pendconn_first(&px->queue.head); + HA_SPIN_LOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); + pp = pendconn_first(&px->per_tgrp[tgrp - 1].queue.head); if (!pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); } if (!p && !pp) @@ -290,7 +290,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int /* No more slot available, give up */ if (!got_it) { if (pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); return 0; } @@ -332,7 +332,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int /* now the element won't go, we can release the proxy */ __pendconn_unlink_prx(pp); - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); pp->strm_flags |= SF_ASSIGNED; pp->target = srv; @@ -346,15 +346,15 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int task_wakeup(pp->strm->task, TASK_WOKEN_RES); HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock); - _HA_ATOMIC_DEC(&px->queue.length); - _HA_ATOMIC_INC(&px->queue.idx); + _HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length); + _HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx); _HA_ATOMIC_DEC(&px->queueslength); return 1; use_p: /* we don't need the px queue lock anymore, we have the server's lock */ if (pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); p->strm_flags |= SF_ASSIGNED; p->target = srv; @@ -368,8 +368,8 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int task_wakeup(p->strm->task, TASK_WOKEN_RES); __pendconn_unlink_srv(p); - _HA_ATOMIC_DEC(&srv->queue.length); - _HA_ATOMIC_INC(&srv->queue.idx); + _HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length); + _HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx); _HA_ATOMIC_DEC(&srv->queueslength); return 1; } @@ -381,10 +381,11 @@ int process_srv_queue(struct server *s) { struct server *ref = s->track ? s->track : s; struct proxy *p = s->proxy; + uint64_t non_empty_tgids = all_tgroups_mask; int maxconn; - int stop = 0; int done = 0; int px_ok; + int cur_tgrp; /* if a server is not usable or backup and must not be used * to dequeue backend requests. @@ -409,27 +410,82 @@ int process_srv_queue(struct server *s) * and would occasionally leave entries in the queue that are never * dequeued. Nobody else uses the dequeuing flag so when seeing it * non-null, we're certain that another thread is waiting on it. + * + * We'll dequeue MAX_SELF_USE_QUEUE items from the queue corresponding + * to our thread group, then we'll get one from a different one, to + * be sure those actually get processsed too. */ - while (!stop && (done < global.tune.maxpollevents || !s->served) && + while (non_empty_tgids != 0 + && (done < global.tune.maxpollevents || !s->served) && s->served < (maxconn = srv_dynamic_maxconn(s))) { - if (HA_ATOMIC_XCHG(&s->dequeuing, 1)) - break; + int self_served; + int to_dequeue; - HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); - while (s->served < maxconn) { + /* + * self_served contains the number of times we dequeued items + * from our own thread-group queue. + */ + self_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].self_served) % (MAX_SELF_USE_QUEUE + 1); + if ((self_served == MAX_SELF_USE_QUEUE && non_empty_tgids != (1UL << (tgid - 1))) || + !(non_empty_tgids & (1UL << (tgid - 1)))) { + int old_served, new_served; + + /* + * We want to dequeue from another queue. The last + * one we used is stored in last_other_tgrp_served. + */ + old_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].last_other_tgrp_served); + do { + new_served = old_served + 1; + + /* + * Find the next tgrp to dequeue from. + * If we're here then we know there is + * at least one tgrp that is not the current + * tgrp that we can dequeue from, so that + * loop will end eventually. + */ + while (new_served == tgid || + new_served == global.nbtgroups + 1 || + !(non_empty_tgids & (1UL << (new_served - 1)))) { + if (new_served == global.nbtgroups + 1) + new_served = 1; + else + new_served++; + } + } while (!_HA_ATOMIC_CAS(&s->per_tgrp[tgid - 1].last_other_tgrp_served, &old_served, new_served)); + cur_tgrp = new_served; + to_dequeue = 1; + } else { + cur_tgrp = tgid; + if (self_served == MAX_SELF_USE_QUEUE) + self_served = 0; + to_dequeue = MAX_SELF_USE_QUEUE - self_served; + } + if (HA_ATOMIC_XCHG(&s->per_tgrp[cur_tgrp - 1].dequeuing, 1)) { + non_empty_tgids &= ~(1UL << (cur_tgrp - 1)); + continue; + } + + HA_SPIN_LOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock); + while (to_dequeue > 0 && s->served < maxconn) { /* * pendconn_process_next_strm() will increment * the served field, only if it is < maxconn. */ - stop = !pendconn_process_next_strm(s, p, px_ok); - if (stop) + if (!pendconn_process_next_strm(s, p, px_ok, cur_tgrp)) { + non_empty_tgids &= ~(1UL << (cur_tgrp - 1)); break; + } + to_dequeue--; + if (cur_tgrp == tgid) + _HA_ATOMIC_INC(&s->per_tgrp[tgid - 1].self_served); done++; if (done >= global.tune.maxpollevents) break; } - HA_ATOMIC_STORE(&s->dequeuing, 0); - HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock); + HA_ATOMIC_STORE(&s->per_tgrp[cur_tgrp - 1].dequeuing, 0); + HA_SPIN_UNLOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock); } if (done) { @@ -440,6 +496,8 @@ int process_srv_queue(struct server *s) p->lbprm.server_take_conn(s); } if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) { + int i; + /* * If there is no task running on the server, and the proxy, * let it known that we are ready, there is a small race @@ -454,10 +512,13 @@ int process_srv_queue(struct server *s) * checked, but before we set ready_srv so it would not see it, * just in case try to run one more stream. */ - if (pendconn_process_next_strm(s, p, px_ok)) { - _HA_ATOMIC_SUB(&p->totpend, 1); - _HA_ATOMIC_ADD(&p->served, 1); - done++; + for (i = 0; i < global.nbtgroups; i++) { + if (pendconn_process_next_strm(s, p, px_ok, i + 1)) { + _HA_ATOMIC_SUB(&p->totpend, 1); + _HA_ATOMIC_ADD(&p->served, 1); + done++; + break; + } } } return done; @@ -510,12 +571,12 @@ struct pendconn *pendconn_add(struct stream *strm) srv = NULL; if (srv) { - q = &srv->queue; + q = &srv->per_tgrp[tgid - 1].queue; max_ptr = &srv->counters.nbpend_max; queueslength = &srv->queueslength; } else { - q = &px->queue; + q = &px->per_tgrp[tgid - 1].queue; max_ptr = &px->be_counters.nbpend_max; queueslength = &px->queueslength; } @@ -550,6 +611,7 @@ int pendconn_redistribute(struct server *s) struct proxy *px = s->proxy; int px_xferred = 0; int xferred = 0; + int i; /* The REDISP option was specified. We will ignore cookie and force to * balance or use the dispatcher. @@ -558,26 +620,33 @@ int pendconn_redistribute(struct server *s) (s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP) goto skip_srv_queue; - HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); - for (node = eb32_first(&s->queue.head); node; node = nodeb) { - nodeb = eb32_next(node); + for (i = 0; i < global.nbtgroups; i++) { + struct queue *queue = &s->per_tgrp[i].queue; + int local_xferred = 0; - p = eb32_entry(node, struct pendconn, node); - if (p->strm_flags & SF_FORCE_PRST) - continue; + HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock); + for (node = eb32_first(&queue->head); node; node = nodeb) { + nodeb = eb32_next(node); - /* it's left to the dispatcher to choose a server */ - __pendconn_unlink_srv(p); - if (!(s->proxy->options & PR_O_REDISP)) - p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED); + p = eb32_entry(node, struct pendconn, node); + if (p->strm_flags & SF_FORCE_PRST) + continue; - task_wakeup(p->strm->task, TASK_WOKEN_RES); - xferred++; + /* it's left to the dispatcher to choose a server */ + __pendconn_unlink_srv(p); + if (!(s->proxy->options & PR_O_REDISP)) + p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED); + + task_wakeup(p->strm->task, TASK_WOKEN_RES); + local_xferred++; + } + HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock); + xferred += local_xferred; + if (local_xferred) + _HA_ATOMIC_SUB(&queue->length, local_xferred); } - HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock); if (xferred) { - _HA_ATOMIC_SUB(&s->queue.length, xferred); _HA_ATOMIC_SUB(&s->queueslength, xferred); _HA_ATOMIC_SUB(&s->proxy->totpend, xferred); } @@ -586,24 +655,31 @@ int pendconn_redistribute(struct server *s) if (px->lbprm.tot_wact || px->lbprm.tot_wbck) goto done; - HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock); - for (node = eb32_first(&px->queue.head); node; node = nodeb) { - nodeb = eb32_next(node); - p = eb32_entry(node, struct pendconn, node); + for (i = 0; i < global.nbtgroups; i++) { + struct queue *queue = &px->per_tgrp[i].queue; + int local_xferred = 0; - /* force-persist streams may occasionally appear in the - * proxy's queue, and we certainly don't want them here! - */ - p->strm_flags &= ~SF_FORCE_PRST; - __pendconn_unlink_prx(p); + HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock); + for (node = eb32_first(&queue->head); node; node = nodeb) { + nodeb = eb32_next(node); + p = eb32_entry(node, struct pendconn, node); - task_wakeup(p->strm->task, TASK_WOKEN_RES); - px_xferred++; + /* force-persist streams may occasionally appear in the + * proxy's queue, and we certainly don't want them here! + */ + p->strm_flags &= ~SF_FORCE_PRST; + __pendconn_unlink_prx(p); + + task_wakeup(p->strm->task, TASK_WOKEN_RES); + local_xferred++; + } + HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock); + if (local_xferred) + _HA_ATOMIC_SUB(&queue->length, local_xferred); + px_xferred += local_xferred; } - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); if (px_xferred) { - _HA_ATOMIC_SUB(&px->queue.length, px_xferred); _HA_ATOMIC_SUB(&px->queueslength, px_xferred); _HA_ATOMIC_SUB(&px->totpend, px_xferred); } diff --git a/src/server.c b/src/server.c index 6d15de371..82f237dc8 100644 --- a/src/server.c +++ b/src/server.c @@ -2961,7 +2961,6 @@ struct server *new_server(struct proxy *proxy) srv->obj_type = OBJ_TYPE_SERVER; srv->proxy = proxy; - queue_init(&srv->queue, proxy, srv); MT_LIST_APPEND(&servers_list, &srv->global_list); LIST_INIT(&srv->srv_rec_item); LIST_INIT(&srv->ip_rec_item);