MEDIUM: servers/proxies: Switch to using per-tgroup queues.

For both servers and proxies, use one connection queue per thread-group,
instead of only one. Having only one can lead to severe performance
issues on NUMA machines, it is actually trivial to get the watchdog to
trigger on an AMD machine, having a server with a maxconn of 96, and an
injector that uses 160 concurrent connections.
We now have one queue per thread-group, however when dequeueing, we're
dequeuing MAX_SELF_USE_QUEUE (currently 9) pendconns from our own queue,
before dequeueing one from another thread group, if available, to make
sure everybody is still running.
This commit is contained in:
Olivier Houchard 2025-01-15 16:44:05 +01:00 committed by Olivier Houchard
parent 583303c48b
commit 26b3e5236f
7 changed files with 139 additions and 63 deletions

View File

@ -589,4 +589,8 @@
# define DEBUG_MEMORY_POOLS 1
#endif
#ifndef MAX_SELF_USE_QUEUE
#define MAX_SELF_USE_QUEUE 9
#endif
#endif /* _HAPROXY_DEFAULTS_H */

View File

@ -362,7 +362,6 @@ struct proxy {
__decl_thread(HA_RWLOCK_T lock); /* may be taken under the server's lock */
char *id, *desc; /* proxy id (name) and description */
struct queue queue; /* queued requests (pendconns) */
struct proxy_per_tgroup *per_tgrp; /* array of per-tgroup stuff such as queues */
unsigned int queueslength; /* Sum of the length of each queue */
int totpend; /* total number of pending connections on this instance (for stats) */

View File

@ -369,9 +369,7 @@ struct server {
unsigned int max_used_conns; /* Max number of used connections (the counter is reset at each connection purges */
unsigned int est_need_conns; /* Estimate on the number of needed connections (max of curr and previous max_used) */
struct queue queue; /* pending connections */
struct mt_list sess_conns; /* list of private conns managed by a session on this server */
unsigned int dequeuing; /* non-zero = dequeuing in progress (atomic) */
/* Element below are usd by LB algorithms and must be doable in
* parallel to other threads reusing connections above.

View File

@ -1029,6 +1029,7 @@ int assign_server_and_queue(struct stream *s)
* not full, in which case we have to return FULL.
*/
if (srv->maxconn) {
struct queue *queue = &srv->per_tgrp[tgid - 1].queue;
int served;
int got_it = 0;
@ -1037,7 +1038,7 @@ int assign_server_and_queue(struct stream *s)
* Try to increment its served, while making sure
* it is < maxconn.
*/
if (!srv->queue.length &&
if (!queue->length &&
(served = srv->served) < srv_dynamic_maxconn(srv)) {
/*
* Attempt to increment served, while

View File

@ -1408,7 +1408,6 @@ void init_new_proxy(struct proxy *p)
{
memset(p, 0, sizeof(struct proxy));
p->obj_type = OBJ_TYPE_PROXY;
queue_init(&p->queue, p, NULL);
LIST_INIT(&p->acl);
LIST_INIT(&p->http_req_rules);
LIST_INIT(&p->http_res_rules);

View File

@ -254,7 +254,7 @@ static struct pendconn *pendconn_first(struct eb_root *pendconns)
* When a pending connection is dequeued, this function returns 1 if a pendconn
* is dequeued, otherwise 0.
*/
static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok)
static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok, int tgrp)
{
struct pendconn *p = NULL;
struct pendconn *pp = NULL;
@ -264,18 +264,18 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
int got_it = 0;
p = NULL;
if (srv->queue.length)
p = pendconn_first(&srv->queue.head);
if (srv->per_tgrp[tgrp - 1].queue.length)
p = pendconn_first(&srv->per_tgrp[tgrp - 1].queue.head);
pp = NULL;
if (px_ok && px->queue.length) {
if (px_ok && px->per_tgrp[tgrp - 1].queue.length) {
/* the lock only remains held as long as the pp is
* in the proxy's queue.
*/
HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock);
pp = pendconn_first(&px->queue.head);
HA_SPIN_LOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
pp = pendconn_first(&px->per_tgrp[tgrp - 1].queue.head);
if (!pp)
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
}
if (!p && !pp)
@ -290,7 +290,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
/* No more slot available, give up */
if (!got_it) {
if (pp)
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
return 0;
}
@ -332,7 +332,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
/* now the element won't go, we can release the proxy */
__pendconn_unlink_prx(pp);
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
pp->strm_flags |= SF_ASSIGNED;
pp->target = srv;
@ -346,15 +346,15 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
task_wakeup(pp->strm->task, TASK_WOKEN_RES);
HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock);
_HA_ATOMIC_DEC(&px->queue.length);
_HA_ATOMIC_INC(&px->queue.idx);
_HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&px->queueslength);
return 1;
use_p:
/* we don't need the px queue lock anymore, we have the server's lock */
if (pp)
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
p->strm_flags |= SF_ASSIGNED;
p->target = srv;
@ -368,8 +368,8 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
task_wakeup(p->strm->task, TASK_WOKEN_RES);
__pendconn_unlink_srv(p);
_HA_ATOMIC_DEC(&srv->queue.length);
_HA_ATOMIC_INC(&srv->queue.idx);
_HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&srv->queueslength);
return 1;
}
@ -381,10 +381,11 @@ int process_srv_queue(struct server *s)
{
struct server *ref = s->track ? s->track : s;
struct proxy *p = s->proxy;
uint64_t non_empty_tgids = all_tgroups_mask;
int maxconn;
int stop = 0;
int done = 0;
int px_ok;
int cur_tgrp;
/* if a server is not usable or backup and must not be used
* to dequeue backend requests.
@ -409,27 +410,82 @@ int process_srv_queue(struct server *s)
* and would occasionally leave entries in the queue that are never
* dequeued. Nobody else uses the dequeuing flag so when seeing it
* non-null, we're certain that another thread is waiting on it.
*
* We'll dequeue MAX_SELF_USE_QUEUE items from the queue corresponding
* to our thread group, then we'll get one from a different one, to
* be sure those actually get processsed too.
*/
while (!stop && (done < global.tune.maxpollevents || !s->served) &&
while (non_empty_tgids != 0
&& (done < global.tune.maxpollevents || !s->served) &&
s->served < (maxconn = srv_dynamic_maxconn(s))) {
if (HA_ATOMIC_XCHG(&s->dequeuing, 1))
break;
int self_served;
int to_dequeue;
HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
while (s->served < maxconn) {
/*
* self_served contains the number of times we dequeued items
* from our own thread-group queue.
*/
self_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].self_served) % (MAX_SELF_USE_QUEUE + 1);
if ((self_served == MAX_SELF_USE_QUEUE && non_empty_tgids != (1UL << (tgid - 1))) ||
!(non_empty_tgids & (1UL << (tgid - 1)))) {
int old_served, new_served;
/*
* We want to dequeue from another queue. The last
* one we used is stored in last_other_tgrp_served.
*/
old_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].last_other_tgrp_served);
do {
new_served = old_served + 1;
/*
* Find the next tgrp to dequeue from.
* If we're here then we know there is
* at least one tgrp that is not the current
* tgrp that we can dequeue from, so that
* loop will end eventually.
*/
while (new_served == tgid ||
new_served == global.nbtgroups + 1 ||
!(non_empty_tgids & (1UL << (new_served - 1)))) {
if (new_served == global.nbtgroups + 1)
new_served = 1;
else
new_served++;
}
} while (!_HA_ATOMIC_CAS(&s->per_tgrp[tgid - 1].last_other_tgrp_served, &old_served, new_served));
cur_tgrp = new_served;
to_dequeue = 1;
} else {
cur_tgrp = tgid;
if (self_served == MAX_SELF_USE_QUEUE)
self_served = 0;
to_dequeue = MAX_SELF_USE_QUEUE - self_served;
}
if (HA_ATOMIC_XCHG(&s->per_tgrp[cur_tgrp - 1].dequeuing, 1)) {
non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
continue;
}
HA_SPIN_LOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
while (to_dequeue > 0 && s->served < maxconn) {
/*
* pendconn_process_next_strm() will increment
* the served field, only if it is < maxconn.
*/
stop = !pendconn_process_next_strm(s, p, px_ok);
if (stop)
if (!pendconn_process_next_strm(s, p, px_ok, cur_tgrp)) {
non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
break;
}
to_dequeue--;
if (cur_tgrp == tgid)
_HA_ATOMIC_INC(&s->per_tgrp[tgid - 1].self_served);
done++;
if (done >= global.tune.maxpollevents)
break;
}
HA_ATOMIC_STORE(&s->dequeuing, 0);
HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
HA_ATOMIC_STORE(&s->per_tgrp[cur_tgrp - 1].dequeuing, 0);
HA_SPIN_UNLOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
}
if (done) {
@ -440,6 +496,8 @@ int process_srv_queue(struct server *s)
p->lbprm.server_take_conn(s);
}
if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) {
int i;
/*
* If there is no task running on the server, and the proxy,
* let it known that we are ready, there is a small race
@ -454,10 +512,13 @@ int process_srv_queue(struct server *s)
* checked, but before we set ready_srv so it would not see it,
* just in case try to run one more stream.
*/
if (pendconn_process_next_strm(s, p, px_ok)) {
_HA_ATOMIC_SUB(&p->totpend, 1);
_HA_ATOMIC_ADD(&p->served, 1);
done++;
for (i = 0; i < global.nbtgroups; i++) {
if (pendconn_process_next_strm(s, p, px_ok, i + 1)) {
_HA_ATOMIC_SUB(&p->totpend, 1);
_HA_ATOMIC_ADD(&p->served, 1);
done++;
break;
}
}
}
return done;
@ -510,12 +571,12 @@ struct pendconn *pendconn_add(struct stream *strm)
srv = NULL;
if (srv) {
q = &srv->queue;
q = &srv->per_tgrp[tgid - 1].queue;
max_ptr = &srv->counters.nbpend_max;
queueslength = &srv->queueslength;
}
else {
q = &px->queue;
q = &px->per_tgrp[tgid - 1].queue;
max_ptr = &px->be_counters.nbpend_max;
queueslength = &px->queueslength;
}
@ -550,6 +611,7 @@ int pendconn_redistribute(struct server *s)
struct proxy *px = s->proxy;
int px_xferred = 0;
int xferred = 0;
int i;
/* The REDISP option was specified. We will ignore cookie and force to
* balance or use the dispatcher.
@ -558,26 +620,33 @@ int pendconn_redistribute(struct server *s)
(s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
goto skip_srv_queue;
HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
for (node = eb32_first(&s->queue.head); node; node = nodeb) {
nodeb = eb32_next(node);
for (i = 0; i < global.nbtgroups; i++) {
struct queue *queue = &s->per_tgrp[i].queue;
int local_xferred = 0;
p = eb32_entry(node, struct pendconn, node);
if (p->strm_flags & SF_FORCE_PRST)
continue;
HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
for (node = eb32_first(&queue->head); node; node = nodeb) {
nodeb = eb32_next(node);
/* it's left to the dispatcher to choose a server */
__pendconn_unlink_srv(p);
if (!(s->proxy->options & PR_O_REDISP))
p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
p = eb32_entry(node, struct pendconn, node);
if (p->strm_flags & SF_FORCE_PRST)
continue;
task_wakeup(p->strm->task, TASK_WOKEN_RES);
xferred++;
/* it's left to the dispatcher to choose a server */
__pendconn_unlink_srv(p);
if (!(s->proxy->options & PR_O_REDISP))
p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
task_wakeup(p->strm->task, TASK_WOKEN_RES);
local_xferred++;
}
HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
xferred += local_xferred;
if (local_xferred)
_HA_ATOMIC_SUB(&queue->length, local_xferred);
}
HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
if (xferred) {
_HA_ATOMIC_SUB(&s->queue.length, xferred);
_HA_ATOMIC_SUB(&s->queueslength, xferred);
_HA_ATOMIC_SUB(&s->proxy->totpend, xferred);
}
@ -586,24 +655,31 @@ int pendconn_redistribute(struct server *s)
if (px->lbprm.tot_wact || px->lbprm.tot_wbck)
goto done;
HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock);
for (node = eb32_first(&px->queue.head); node; node = nodeb) {
nodeb = eb32_next(node);
p = eb32_entry(node, struct pendconn, node);
for (i = 0; i < global.nbtgroups; i++) {
struct queue *queue = &px->per_tgrp[i].queue;
int local_xferred = 0;
/* force-persist streams may occasionally appear in the
* proxy's queue, and we certainly don't want them here!
*/
p->strm_flags &= ~SF_FORCE_PRST;
__pendconn_unlink_prx(p);
HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
for (node = eb32_first(&queue->head); node; node = nodeb) {
nodeb = eb32_next(node);
p = eb32_entry(node, struct pendconn, node);
task_wakeup(p->strm->task, TASK_WOKEN_RES);
px_xferred++;
/* force-persist streams may occasionally appear in the
* proxy's queue, and we certainly don't want them here!
*/
p->strm_flags &= ~SF_FORCE_PRST;
__pendconn_unlink_prx(p);
task_wakeup(p->strm->task, TASK_WOKEN_RES);
local_xferred++;
}
HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
if (local_xferred)
_HA_ATOMIC_SUB(&queue->length, local_xferred);
px_xferred += local_xferred;
}
HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
if (px_xferred) {
_HA_ATOMIC_SUB(&px->queue.length, px_xferred);
_HA_ATOMIC_SUB(&px->queueslength, px_xferred);
_HA_ATOMIC_SUB(&px->totpend, px_xferred);
}

View File

@ -2961,7 +2961,6 @@ struct server *new_server(struct proxy *proxy)
srv->obj_type = OBJ_TYPE_SERVER;
srv->proxy = proxy;
queue_init(&srv->queue, proxy, srv);
MT_LIST_APPEND(&servers_list, &srv->global_list);
LIST_INIT(&srv->srv_rec_item);
LIST_INIT(&srv->ip_rec_item);