MINOR: queue: store the queue index in the stream when enqueuing

We store the queue index in the stream and check it on dequeueing to
figure how many entries were processed in between. This way we'll be
able to count the elements that may later be added before ours.
This commit is contained in:
Patrick Hemmer 2018-05-11 12:52:31 -04:00 committed by Willy Tarreau
parent ffe5e8c638
commit da282f4a8f
4 changed files with 18 additions and 6 deletions

View File

@ -325,6 +325,7 @@ struct proxy {
struct list pendconns; /* pending connections with no server assigned yet */
int nbpend; /* number of pending connections with no server assigned yet */
int totpend; /* total number of pending connections on this instance (for stats) */
unsigned int queue_idx; /* number of pending connections which have been de-queued */
unsigned int feconn, beconn; /* # of active frontend and backends streams */
struct freq_ctr fe_req_per_sec; /* HTTP requests per second on the frontend */
struct freq_ctr fe_conn_per_sec; /* received connections per second on the frontend */

View File

@ -32,6 +32,7 @@ struct stream;
struct pendconn {
int strm_flags; /* stream flags */
unsigned int queue_idx; /* value of proxy/server queue_idx at time of enqueue */
struct stream *strm;
struct proxy *px;
struct server *srv; /* the server we are waiting for, may be NULL if don't care */

View File

@ -210,6 +210,7 @@ struct server {
int cur_sess; /* number of currently active sessions (including syn_sent) */
unsigned maxconn, minconn; /* max # of active sessions (0 = unlimited), min# for dynamic limit. */
int nbpend; /* number of pending connections */
unsigned int queue_idx; /* count of pending connections which have been de-queued */
int maxqueue; /* maximum number of pending connections allowed */
struct freq_ctr sess_per_sec; /* sessions per second on this server */
struct be_counters counters; /* statistics counters */

View File

@ -129,10 +129,13 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
*/
static void __pendconn_unlink(struct pendconn *p)
{
if (p->srv)
if (p->srv) {
p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->queue_idx;
p->srv->nbpend--;
else
} else {
p->strm->logs.prx_queue_pos += p->px->queue_idx - p->queue_idx;
p->px->nbpend--;
}
HA_ATOMIC_SUB(&p->px->totpend, 1);
LIST_DEL(&p->list);
LIST_INIT(&p->list);
@ -199,6 +202,7 @@ void pendconn_unlink(struct pendconn *p)
static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
{
struct pendconn *p = NULL;
struct pendconn *pp = NULL;
struct server *rsrv;
rsrv = srv->track;
@ -213,8 +217,6 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
(!(srv->flags & SRV_F_BACKUP) ||
(!px->srv_act &&
(srv == px->lbprm.fbck || (px->options & PR_O_USE_ALL_BK))))) {
struct pendconn *pp;
pp = LIST_ELEM(px->pendconns.n, struct pendconn *, list);
/* If the server pendconn is older than the proxy one,
@ -236,6 +238,11 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
p->strm_flags |= SF_ASSIGNED;
p->target = srv;
if (p != pp)
srv->queue_idx++;
else
px->queue_idx++;
HA_ATOMIC_ADD(&srv->served, 1);
HA_ATOMIC_ADD(&srv->proxy->served, 1);
if (px->lbprm.server_take_conn)
@ -272,6 +279,8 @@ void process_srv_queue(struct server *s)
* are updated accordingly. Returns NULL if no memory is available, otherwise the
* pendconn itself. If the stream was already marked as served, its flag is
* cleared. It is illegal to call this function with a non-NULL strm->srv_conn.
* The stream's queue position is counted with an offset of -1 because we want
* to make sure that being at the first position in the queue reports 1.
*
* This function must be called by the stream itself, so in the context of
* process_stream.
@ -302,16 +311,16 @@ struct pendconn *pendconn_add(struct stream *strm)
if (srv) {
srv->nbpend++;
strm->logs.srv_queue_pos += srv->nbpend;
if (srv->nbpend > srv->counters.nbpend_max)
srv->counters.nbpend_max = srv->nbpend;
p->queue_idx = srv->queue_idx - 1; // for increment
LIST_ADDQ(&srv->pendconns, &p->list);
}
else {
px->nbpend++;
strm->logs.prx_queue_pos += px->nbpend;
if (px->nbpend > px->be_counters.nbpend_max)
px->be_counters.nbpend_max = px->nbpend;
p->queue_idx = px->queue_idx - 1; // for increment
LIST_ADDQ(&px->pendconns, &p->list);
}
strm->pend_pos = p;