MINOR: proxies/servers: Calculate queueslength and use it.

For both proxies and servers, properly calculates queueslength, which is
the total number of element in each queues (as they currently are only
using one queue, it is equivalent to the number of element of that
queue), and use it instead of the queue's length.
This commit is contained in:
Olivier Houchard 2025-01-15 16:37:35 +01:00 committed by Olivier Houchard
parent 59eddabe16
commit 583303c48b
13 changed files with 37 additions and 29 deletions

View File

@ -86,7 +86,7 @@ static inline int server_has_room(const struct server *s) {
* for and if/else usage.
*/
static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
return (s && (s->queue.length || (p->queue.length && srv_currently_usable(s))) &&
return (s && (s->queueslength || (p->queueslength && srv_currently_usable(s))) &&
(!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
}

View File

@ -584,7 +584,7 @@ struct server *get_server_rnd(struct stream *s, const struct server *avoid)
* the backend's queue instead.
*/
if (curr &&
(curr->queue.length || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
(curr->queueslength || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
curr = NULL;
return curr;
@ -654,7 +654,7 @@ int assign_server(struct stream *s)
((s->sess->flags & SESS_FL_PREFER_LAST) ||
(!s->be->max_ka_queue ||
server_has_room(tmpsrv) || (
tmpsrv->queue.length + 1 < s->be->max_ka_queue))) &&
tmpsrv->queueslength + 1 < s->be->max_ka_queue))) &&
srv_currently_usable(tmpsrv)) {
list_for_each_entry(conn, &pconns->conn_list, sess_el) {
if (!(conn->flags & CO_FL_WAIT_XPRT)) {
@ -681,7 +681,7 @@ int assign_server(struct stream *s)
/* if there's some queue on the backend, with certain algos we
* know it's because all servers are full.
*/
if (s->be->queue.length && s->be->served && s->be->queue.length != s->be->beconn &&
if (s->be->queueslength && s->be->served && s->be->queueslength != s->be->beconn &&
(((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_FAS)|| // first
((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_RR) || // roundrobin
((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_SRR))) { // static-rr
@ -1051,7 +1051,7 @@ int assign_server_and_queue(struct stream *s)
__ha_cpu_relax());
}
if (!got_it) {
if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue)
if (srv->maxqueue > 0 && srv->queueslength >= srv->maxqueue)
return SRV_STATUS_FULL;
p = pendconn_add(s);
@ -3063,7 +3063,7 @@ smp_fetch_connslots(const struct arg *args, struct sample *smp, const char *kw,
}
smp->data.u.sint += (iterator->maxconn - iterator->cur_sess)
+ (iterator->maxqueue - iterator->queue.length);
+ (iterator->maxqueue - iterator->queueslength);
}
return 1;
@ -3340,7 +3340,7 @@ smp_fetch_srv_queue(const struct arg *args, struct sample *smp, const char *kw,
{
smp->flags = SMP_F_VOL_TEST;
smp->data.type = SMP_T_SINT;
smp->data.u.sint = args->data.srv->queue.length;
smp->data.u.sint = args->data.srv->queueslength;
return 1;
}
@ -3482,7 +3482,7 @@ sample_conv_srv_queue(const struct arg *args, struct sample *smp, void *private)
return 0;
smp->data.type = SMP_T_SINT;
smp->data.u.sint = srv->queue.length;
smp->data.u.sint = srv->queueslength;
return 1;
}

View File

@ -1027,8 +1027,8 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf)
global.node,
(s->cur_eweight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv,
(s->proxy->lbprm.tot_weight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv,
s->cur_sess, s->proxy->beconn - s->proxy->queue.length,
s->queue.length);
s->cur_sess, s->proxy->beconn - s->proxy->queueslength,
s->queueslength);
if ((s->cur_state == SRV_ST_STARTING) &&
ns_to_sec(now_ns) < s->counters.last_change + s->slowstart &&

View File

@ -748,7 +748,7 @@ static void sig_dump_state(struct sig_handler *sh)
"SIGHUP: Server %s/%s is %s. Conn: %d act, %d pend, %lld tot.",
p->id, s->id,
(s->cur_state != SRV_ST_STOPPED) ? "UP" : "DOWN",
s->cur_sess, s->queue.length, s->counters.cum_sess);
s->cur_sess, s->queueslength, s->counters.cum_sess);
ha_warning("%s\n", trash.area);
send_log(p, LOG_NOTICE, "%s\n", trash.area);
s = s->next;
@ -759,19 +759,19 @@ static void sig_dump_state(struct sig_handler *sh)
chunk_printf(&trash,
"SIGHUP: Proxy %s has no servers. Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.",
p->id,
p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess);
p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess);
} else if (p->srv_act == 0) {
chunk_printf(&trash,
"SIGHUP: Proxy %s %s ! Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.",
p->id,
(p->srv_bck) ? "is running on backup servers" : "has no server available",
p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess);
p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess);
} else {
chunk_printf(&trash,
"SIGHUP: Proxy %s has %d active servers and %d backup servers available."
" Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.",
p->id, p->srv_act, p->srv_bck,
p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess);
p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess);
}
ha_warning("%s\n", trash.area);
send_log(p, LOG_NOTICE, "%s\n", trash.area);

View File

@ -1421,7 +1421,7 @@ int hlua_server_get_pend_conn(lua_State *L)
return 1;
}
lua_pushinteger(L, srv->queue.length);
lua_pushinteger(L, srv->queueslength);
return 1;
}

View File

@ -521,7 +521,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid)
* case we simply remember it for later use if needed.
*/
s = eb32_entry(node, struct tree_occ, node)->server;
if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) {
if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
if (s != srvtoavoid) {
srv = s;
break;

View File

@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid)
struct server *s;
s = eb32_entry(node, struct server, lb_node);
if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) {
if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
if (s != srvtoavoid) {
srv = s;
break;

View File

@ -57,7 +57,7 @@ static inline void fwlc_dequeue_srv(struct server *s)
*/
static inline void fwlc_queue_srv(struct server *s, unsigned int eweight)
{
unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength);
s->lb_node.key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / eweight : 0;
eb32_insert(s->lb_tree, &s->lb_node);
@ -70,7 +70,7 @@ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight)
*/
static void fwlc_srv_reposition(struct server *s)
{
unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength);
unsigned int eweight = _HA_ATOMIC_LOAD(&s->cur_eweight);
unsigned int new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / (eweight ? eweight : 1) : 0;
@ -87,7 +87,7 @@ static void fwlc_srv_reposition(struct server *s)
* likely to have released a connection or taken one leading
* to our target value (50% of the case in measurements).
*/
inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength);
eweight = _HA_ATOMIC_LOAD(&s->cur_eweight);
new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / (eweight ? eweight : 1) : 0;
if (!s->lb_node.node.leaf_p || s->lb_node.key != new_key) {
@ -349,7 +349,7 @@ struct server *fwlc_get_next_server(struct proxy *p, struct server *srvtoavoid)
struct server *s;
s = eb32_entry(node, struct server, lb_node);
if (!s->maxconn || s->served + s->queue.length < srv_dynamic_maxconn(s) + s->maxqueue) {
if (!s->maxconn || s->served + s->queueslength < srv_dynamic_maxconn(s) + s->maxqueue) {
if (s != srvtoavoid) {
srv = s;
break;

View File

@ -564,7 +564,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid)
fwrr_update_position(grp, srv);
fwrr_dequeue_srv(srv);
grp->curr_pos++;
if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) {
if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
/* make sure it is not the server we are trying to exclude... */
if (srv != srvtoavoid || avoided)
break;

View File

@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid)
avoididx = 0; /* shut a gcc warning */
do {
srv = px->lbprm.map.srv[newidx++];
if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) {
if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
/* make sure it is not the server we are try to exclude... */
/* ...but remember that is was selected yet avoided */
avoided = srv;

View File

@ -348,6 +348,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
_HA_ATOMIC_DEC(&px->queue.length);
_HA_ATOMIC_INC(&px->queue.idx);
_HA_ATOMIC_DEC(&px->queueslength);
return 1;
use_p:
@ -369,6 +370,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
_HA_ATOMIC_DEC(&srv->queue.length);
_HA_ATOMIC_INC(&srv->queue.idx);
_HA_ATOMIC_DEC(&srv->queueslength);
return 1;
}
@ -487,6 +489,7 @@ struct pendconn *pendconn_add(struct stream *strm)
struct server *srv;
struct queue *q;
unsigned int *max_ptr;
unsigned int *queueslength;
unsigned int old_max, new_max;
p = pool_alloc(pool_head_pendconn);
@ -509,15 +512,18 @@ struct pendconn *pendconn_add(struct stream *strm)
if (srv) {
q = &srv->queue;
max_ptr = &srv->counters.nbpend_max;
queueslength = &srv->queueslength;
}
else {
q = &px->queue;
max_ptr = &px->be_counters.nbpend_max;
queueslength = &px->queueslength;
}
p->queue = q;
p->queue_idx = _HA_ATOMIC_LOAD(&q->idx) - 1; // for logging only
new_max = _HA_ATOMIC_ADD_FETCH(&q->length, 1);
new_max = _HA_ATOMIC_ADD_FETCH(queueslength, 1);
_HA_ATOMIC_INC(&q->length);
old_max = _HA_ATOMIC_LOAD(max_ptr);
while (new_max > old_max) {
if (likely(_HA_ATOMIC_CAS(max_ptr, &old_max, new_max)))
@ -572,6 +578,7 @@ int pendconn_redistribute(struct server *s)
if (xferred) {
_HA_ATOMIC_SUB(&s->queue.length, xferred);
_HA_ATOMIC_SUB(&s->queueslength, xferred);
_HA_ATOMIC_SUB(&s->proxy->totpend, xferred);
}
@ -597,6 +604,7 @@ int pendconn_redistribute(struct server *s)
if (px_xferred) {
_HA_ATOMIC_SUB(&px->queue.length, px_xferred);
_HA_ATOMIC_SUB(&px->queueslength, px_xferred);
_HA_ATOMIC_SUB(&px->totpend, px_xferred);
}
done:

View File

@ -2092,13 +2092,13 @@ static void srv_append_more(struct buffer *msg, struct server *s,
" %d sessions active, %d requeued, %d remaining in queue",
s->proxy->srv_act, s->proxy->srv_bck,
(s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
s->cur_sess, xferred, s->queue.length);
s->cur_sess, xferred, s->queueslength);
else
chunk_appendf(msg, ". %d active and %d backup servers online.%s"
" %d sessions requeued, %d total in queue",
s->proxy->srv_act, s->proxy->srv_bck,
(s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
xferred, s->queue.length);
xferred, s->queueslength);
}
}
@ -6044,7 +6044,7 @@ int srv_check_for_deletion(const char *bename, const char *svname, struct proxy
/* Ensure that there is no active/pending connection on the server. */
if (srv->curr_used_conns ||
!eb_is_empty(&srv->queue.head) || srv_has_streams(srv)) {
srv->queueslength || srv_has_streams(srv)) {
msg = "Server still has connections attached to it, cannot remove it.";
goto leave;
}

View File

@ -786,7 +786,7 @@ int stats_fill_sv_line(struct proxy *px, struct server *sv, int flags,
field = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode));
break;
case ST_I_PX_QCUR:
field = mkf_u32(0, sv->queue.length);
field = mkf_u32(0, sv->queueslength);
break;
case ST_I_PX_QMAX:
field = mkf_u32(FN_MAX, sv->counters.nbpend_max);
@ -1165,7 +1165,7 @@ int stats_fill_be_line(struct proxy *px, int flags, struct field *line, int len,
field = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode));
break;
case ST_I_PX_QCUR:
field = mkf_u32(0, px->queue.length);
field = mkf_u32(0, px->queueslength);
break;
case ST_I_PX_QMAX:
field = mkf_u32(FN_MAX, px->be_counters.nbpend_max);