diff --git a/include/haproxy/defaults.h b/include/haproxy/defaults.h index 88ab35b46..f70bf4309 100644 --- a/include/haproxy/defaults.h +++ b/include/haproxy/defaults.h @@ -589,4 +589,8 @@ # define DEBUG_MEMORY_POOLS 1 #endif +#ifndef MAX_SELF_USE_QUEUE +#define MAX_SELF_USE_QUEUE 9 +#endif + #endif /* _HAPROXY_DEFAULTS_H */ diff --git a/include/haproxy/proxy-t.h b/include/haproxy/proxy-t.h index d6f9cef4d..d243d072e 100644 --- a/include/haproxy/proxy-t.h +++ b/include/haproxy/proxy-t.h @@ -362,7 +362,6 @@ struct proxy { __decl_thread(HA_RWLOCK_T lock); /* may be taken under the server's lock */ char *id, *desc; /* proxy id (name) and description */ - struct queue queue; /* queued requests (pendconns) */ struct proxy_per_tgroup *per_tgrp; /* array of per-tgroup stuff such as queues */ unsigned int queueslength; /* Sum of the length of each queue */ int totpend; /* total number of pending connections on this instance (for stats) */ diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h index ee266c207..d75171d93 100644 --- a/include/haproxy/server-t.h +++ b/include/haproxy/server-t.h @@ -369,9 +369,7 @@ struct server { unsigned int max_used_conns; /* Max number of used connections (the counter is reset at each connection purges */ unsigned int est_need_conns; /* Estimate on the number of needed connections (max of curr and previous max_used) */ - struct queue queue; /* pending connections */ struct mt_list sess_conns; /* list of private conns managed by a session on this server */ - unsigned int dequeuing; /* non-zero = dequeuing in progress (atomic) */ /* Element below are usd by LB algorithms and must be doable in * parallel to other threads reusing connections above. diff --git a/src/backend.c b/src/backend.c index f0dba4d18..b1703e295 100644 --- a/src/backend.c +++ b/src/backend.c @@ -1029,6 +1029,7 @@ int assign_server_and_queue(struct stream *s) * not full, in which case we have to return FULL. */ if (srv->maxconn) { + struct queue *queue = &srv->per_tgrp[tgid - 1].queue; int served; int got_it = 0; @@ -1037,7 +1038,7 @@ int assign_server_and_queue(struct stream *s) * Try to increment its served, while making sure * it is < maxconn. */ - if (!srv->queue.length && + if (!queue->length && (served = srv->served) < srv_dynamic_maxconn(srv)) { /* * Attempt to increment served, while diff --git a/src/proxy.c b/src/proxy.c index a1a4ee734..a1143fc68 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -1408,7 +1408,6 @@ void init_new_proxy(struct proxy *p) { memset(p, 0, sizeof(struct proxy)); p->obj_type = OBJ_TYPE_PROXY; - queue_init(&p->queue, p, NULL); LIST_INIT(&p->acl); LIST_INIT(&p->http_req_rules); LIST_INIT(&p->http_res_rules); diff --git a/src/queue.c b/src/queue.c index f5ea31de4..0bfdd09f0 100644 --- a/src/queue.c +++ b/src/queue.c @@ -254,7 +254,7 @@ static struct pendconn *pendconn_first(struct eb_root *pendconns) * When a pending connection is dequeued, this function returns 1 if a pendconn * is dequeued, otherwise 0. */ -static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok) +static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok, int tgrp) { struct pendconn *p = NULL; struct pendconn *pp = NULL; @@ -264,18 +264,18 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int int got_it = 0; p = NULL; - if (srv->queue.length) - p = pendconn_first(&srv->queue.head); + if (srv->per_tgrp[tgrp - 1].queue.length) + p = pendconn_first(&srv->per_tgrp[tgrp - 1].queue.head); pp = NULL; - if (px_ok && px->queue.length) { + if (px_ok && px->per_tgrp[tgrp - 1].queue.length) { /* the lock only remains held as long as the pp is * in the proxy's queue. */ - HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock); - pp = pendconn_first(&px->queue.head); + HA_SPIN_LOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); + pp = pendconn_first(&px->per_tgrp[tgrp - 1].queue.head); if (!pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); } if (!p && !pp) @@ -290,7 +290,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int /* No more slot available, give up */ if (!got_it) { if (pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); return 0; } @@ -332,7 +332,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int /* now the element won't go, we can release the proxy */ __pendconn_unlink_prx(pp); - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); pp->strm_flags |= SF_ASSIGNED; pp->target = srv; @@ -346,15 +346,15 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int task_wakeup(pp->strm->task, TASK_WOKEN_RES); HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock); - _HA_ATOMIC_DEC(&px->queue.length); - _HA_ATOMIC_INC(&px->queue.idx); + _HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length); + _HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx); _HA_ATOMIC_DEC(&px->queueslength); return 1; use_p: /* we don't need the px queue lock anymore, we have the server's lock */ if (pp) - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); + HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock); p->strm_flags |= SF_ASSIGNED; p->target = srv; @@ -368,8 +368,8 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int task_wakeup(p->strm->task, TASK_WOKEN_RES); __pendconn_unlink_srv(p); - _HA_ATOMIC_DEC(&srv->queue.length); - _HA_ATOMIC_INC(&srv->queue.idx); + _HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length); + _HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx); _HA_ATOMIC_DEC(&srv->queueslength); return 1; } @@ -381,10 +381,11 @@ int process_srv_queue(struct server *s) { struct server *ref = s->track ? s->track : s; struct proxy *p = s->proxy; + uint64_t non_empty_tgids = all_tgroups_mask; int maxconn; - int stop = 0; int done = 0; int px_ok; + int cur_tgrp; /* if a server is not usable or backup and must not be used * to dequeue backend requests. @@ -409,27 +410,82 @@ int process_srv_queue(struct server *s) * and would occasionally leave entries in the queue that are never * dequeued. Nobody else uses the dequeuing flag so when seeing it * non-null, we're certain that another thread is waiting on it. + * + * We'll dequeue MAX_SELF_USE_QUEUE items from the queue corresponding + * to our thread group, then we'll get one from a different one, to + * be sure those actually get processsed too. */ - while (!stop && (done < global.tune.maxpollevents || !s->served) && + while (non_empty_tgids != 0 + && (done < global.tune.maxpollevents || !s->served) && s->served < (maxconn = srv_dynamic_maxconn(s))) { - if (HA_ATOMIC_XCHG(&s->dequeuing, 1)) - break; + int self_served; + int to_dequeue; - HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); - while (s->served < maxconn) { + /* + * self_served contains the number of times we dequeued items + * from our own thread-group queue. + */ + self_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].self_served) % (MAX_SELF_USE_QUEUE + 1); + if ((self_served == MAX_SELF_USE_QUEUE && non_empty_tgids != (1UL << (tgid - 1))) || + !(non_empty_tgids & (1UL << (tgid - 1)))) { + int old_served, new_served; + + /* + * We want to dequeue from another queue. The last + * one we used is stored in last_other_tgrp_served. + */ + old_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].last_other_tgrp_served); + do { + new_served = old_served + 1; + + /* + * Find the next tgrp to dequeue from. + * If we're here then we know there is + * at least one tgrp that is not the current + * tgrp that we can dequeue from, so that + * loop will end eventually. + */ + while (new_served == tgid || + new_served == global.nbtgroups + 1 || + !(non_empty_tgids & (1UL << (new_served - 1)))) { + if (new_served == global.nbtgroups + 1) + new_served = 1; + else + new_served++; + } + } while (!_HA_ATOMIC_CAS(&s->per_tgrp[tgid - 1].last_other_tgrp_served, &old_served, new_served)); + cur_tgrp = new_served; + to_dequeue = 1; + } else { + cur_tgrp = tgid; + if (self_served == MAX_SELF_USE_QUEUE) + self_served = 0; + to_dequeue = MAX_SELF_USE_QUEUE - self_served; + } + if (HA_ATOMIC_XCHG(&s->per_tgrp[cur_tgrp - 1].dequeuing, 1)) { + non_empty_tgids &= ~(1UL << (cur_tgrp - 1)); + continue; + } + + HA_SPIN_LOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock); + while (to_dequeue > 0 && s->served < maxconn) { /* * pendconn_process_next_strm() will increment * the served field, only if it is < maxconn. */ - stop = !pendconn_process_next_strm(s, p, px_ok); - if (stop) + if (!pendconn_process_next_strm(s, p, px_ok, cur_tgrp)) { + non_empty_tgids &= ~(1UL << (cur_tgrp - 1)); break; + } + to_dequeue--; + if (cur_tgrp == tgid) + _HA_ATOMIC_INC(&s->per_tgrp[tgid - 1].self_served); done++; if (done >= global.tune.maxpollevents) break; } - HA_ATOMIC_STORE(&s->dequeuing, 0); - HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock); + HA_ATOMIC_STORE(&s->per_tgrp[cur_tgrp - 1].dequeuing, 0); + HA_SPIN_UNLOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock); } if (done) { @@ -440,6 +496,8 @@ int process_srv_queue(struct server *s) p->lbprm.server_take_conn(s); } if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) { + int i; + /* * If there is no task running on the server, and the proxy, * let it known that we are ready, there is a small race @@ -454,10 +512,13 @@ int process_srv_queue(struct server *s) * checked, but before we set ready_srv so it would not see it, * just in case try to run one more stream. */ - if (pendconn_process_next_strm(s, p, px_ok)) { - _HA_ATOMIC_SUB(&p->totpend, 1); - _HA_ATOMIC_ADD(&p->served, 1); - done++; + for (i = 0; i < global.nbtgroups; i++) { + if (pendconn_process_next_strm(s, p, px_ok, i + 1)) { + _HA_ATOMIC_SUB(&p->totpend, 1); + _HA_ATOMIC_ADD(&p->served, 1); + done++; + break; + } } } return done; @@ -510,12 +571,12 @@ struct pendconn *pendconn_add(struct stream *strm) srv = NULL; if (srv) { - q = &srv->queue; + q = &srv->per_tgrp[tgid - 1].queue; max_ptr = &srv->counters.nbpend_max; queueslength = &srv->queueslength; } else { - q = &px->queue; + q = &px->per_tgrp[tgid - 1].queue; max_ptr = &px->be_counters.nbpend_max; queueslength = &px->queueslength; } @@ -550,6 +611,7 @@ int pendconn_redistribute(struct server *s) struct proxy *px = s->proxy; int px_xferred = 0; int xferred = 0; + int i; /* The REDISP option was specified. We will ignore cookie and force to * balance or use the dispatcher. @@ -558,26 +620,33 @@ int pendconn_redistribute(struct server *s) (s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP) goto skip_srv_queue; - HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); - for (node = eb32_first(&s->queue.head); node; node = nodeb) { - nodeb = eb32_next(node); + for (i = 0; i < global.nbtgroups; i++) { + struct queue *queue = &s->per_tgrp[i].queue; + int local_xferred = 0; - p = eb32_entry(node, struct pendconn, node); - if (p->strm_flags & SF_FORCE_PRST) - continue; + HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock); + for (node = eb32_first(&queue->head); node; node = nodeb) { + nodeb = eb32_next(node); - /* it's left to the dispatcher to choose a server */ - __pendconn_unlink_srv(p); - if (!(s->proxy->options & PR_O_REDISP)) - p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED); + p = eb32_entry(node, struct pendconn, node); + if (p->strm_flags & SF_FORCE_PRST) + continue; - task_wakeup(p->strm->task, TASK_WOKEN_RES); - xferred++; + /* it's left to the dispatcher to choose a server */ + __pendconn_unlink_srv(p); + if (!(s->proxy->options & PR_O_REDISP)) + p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED); + + task_wakeup(p->strm->task, TASK_WOKEN_RES); + local_xferred++; + } + HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock); + xferred += local_xferred; + if (local_xferred) + _HA_ATOMIC_SUB(&queue->length, local_xferred); } - HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock); if (xferred) { - _HA_ATOMIC_SUB(&s->queue.length, xferred); _HA_ATOMIC_SUB(&s->queueslength, xferred); _HA_ATOMIC_SUB(&s->proxy->totpend, xferred); } @@ -586,24 +655,31 @@ int pendconn_redistribute(struct server *s) if (px->lbprm.tot_wact || px->lbprm.tot_wbck) goto done; - HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock); - for (node = eb32_first(&px->queue.head); node; node = nodeb) { - nodeb = eb32_next(node); - p = eb32_entry(node, struct pendconn, node); + for (i = 0; i < global.nbtgroups; i++) { + struct queue *queue = &px->per_tgrp[i].queue; + int local_xferred = 0; - /* force-persist streams may occasionally appear in the - * proxy's queue, and we certainly don't want them here! - */ - p->strm_flags &= ~SF_FORCE_PRST; - __pendconn_unlink_prx(p); + HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock); + for (node = eb32_first(&queue->head); node; node = nodeb) { + nodeb = eb32_next(node); + p = eb32_entry(node, struct pendconn, node); - task_wakeup(p->strm->task, TASK_WOKEN_RES); - px_xferred++; + /* force-persist streams may occasionally appear in the + * proxy's queue, and we certainly don't want them here! + */ + p->strm_flags &= ~SF_FORCE_PRST; + __pendconn_unlink_prx(p); + + task_wakeup(p->strm->task, TASK_WOKEN_RES); + local_xferred++; + } + HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock); + if (local_xferred) + _HA_ATOMIC_SUB(&queue->length, local_xferred); + px_xferred += local_xferred; } - HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock); if (px_xferred) { - _HA_ATOMIC_SUB(&px->queue.length, px_xferred); _HA_ATOMIC_SUB(&px->queueslength, px_xferred); _HA_ATOMIC_SUB(&px->totpend, px_xferred); } diff --git a/src/server.c b/src/server.c index 6d15de371..82f237dc8 100644 --- a/src/server.c +++ b/src/server.c @@ -2961,7 +2961,6 @@ struct server *new_server(struct proxy *proxy) srv->obj_type = OBJ_TYPE_SERVER; srv->proxy = proxy; - queue_init(&srv->queue, proxy, srv); MT_LIST_APPEND(&servers_list, &srv->global_list); LIST_INIT(&srv->srv_rec_item); LIST_INIT(&srv->ip_rec_item);