diff --git a/addons/ot/src/filter.c b/addons/ot/src/filter.c index cf67fd207..20d61d981 100644 --- a/addons/ot/src/filter.c +++ b/addons/ot/src/filter.c @@ -718,7 +718,7 @@ static void flt_ot_check_timeouts(struct stream *s, struct filter *f) if (flt_ot_is_disabled(f FLT_OT_DBG_ARGS(, -1))) FLT_OT_RETURN(); - s->pending_events |= TASK_WOKEN_MSG; + s->pending_events |= STRM_EVT_MSG; flt_ot_return_void(f, &err); diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 5da8101ed..0211adaae 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -167,6 +167,18 @@ enum { STRM_ENTITY_WREQ_BODY = 0x0003, }; +/* All possible stream events handled by process_stream(). First ones are mapped + * from TASK_WOKEN_*. + */ +enum { + STRM_EVT_NONE = 0x00000000, /* No events */ + STRM_EVT_TIMER = 0x00000001, /* A timer has expired */ + STRM_EVT_MSG = 0x00000002, /* A message event was triggered */ + STRM_EVT_SHUT_SRV_DOWN = 0x00000004, /* Must be shut because the selected server became available */ + STRM_EVT_SHUT_SRV_UP = 0x00000008, /* Must be shut because a preferred server became available */ + STRM_EVT_KILLED = 0x00000010, /* Must be shut for external reason */ +}; + /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the * __APPEND_FLAG macro. The new end of the buffer is returned. @@ -241,8 +253,8 @@ struct stream { struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */ struct task *task; /* the task associated with this stream */ - unsigned int pending_events; /* the pending events not yet processed by the stream. - * This is a bit field of TASK_WOKEN_* */ + unsigned int pending_events; /* the pending events not yet processed by the stream but handled by process_stream() */ + unsigned int new_events; /* the new events added since the previous wakeup (never seen by process_stream()). It is atomic field */ int conn_retries; /* number of connect retries performed */ unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */ unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */ diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index a14a4b17a..3ca799912 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -403,6 +403,21 @@ static inline void stream_shutdown(struct stream *s, int why) 0)); } +/* Map task states to stream events. TASK_WOKEN_* and TASK_F_UEVT* are mapped on + * STRM_EVT_*. Not all states/flags are mapped, only those explicitly used by + * the stream. + */ +static inline unsigned int stream_map_task_state(unsigned int state) +{ + return ((state & TASK_WOKEN_TIMER) ? STRM_EVT_TIMER : 0) | + ((state & TASK_WOKEN_MSG) ? STRM_EVT_MSG : 0) | + ((state & TASK_F_UEVT1) ? STRM_EVT_SHUT_SRV_DOWN : 0) | + ((state & TASK_F_UEVT3) ? STRM_EVT_SHUT_SRV_UP : 0) | + ((state & TASK_F_UEVT2) ? STRM_EVT_KILLED : 0) | + 0; +} + + int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout); void stream_retnclose(struct stream *s, const struct buffer *msg); void sess_set_term_flags(struct stream *s); diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 8f81e5494..7aa354dcb 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1346,7 +1346,7 @@ static void spoe_check_timeouts(struct stream *s, struct filter *filter) struct spoe_context *ctx = filter->ctx; if (tick_is_expired(ctx->process_exp, now_ms)) - s->pending_events |= TASK_WOKEN_MSG; + s->pending_events |= STRM_EVT_MSG; } /* Called when we are ready to filter data on a channel */ diff --git a/src/stream.c b/src/stream.c index 0961ec9ad..f5a3f806e 100644 --- a/src/stream.c +++ b/src/stream.c @@ -432,7 +432,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer goto out_fail_alloc; s->task = t; - s->pending_events = 0; + s->pending_events = s->new_events = STRM_EVT_NONE; s->conn_retries = 0; s->max_retries = 0; s->conn_exp = TICK_ETERNITY; @@ -1729,9 +1729,15 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) activity[tid].stream_calls++; stream_cond_update_cpu_latency(s); - if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2 | TASK_F_UEVT3))) { + /* update pending events */ + s->pending_events |= stream_map_task_state(state); + s->pending_events |= HA_ATOMIC_XCHG(&s->new_events, STRM_EVT_NONE); + + if (s->pending_events & (STRM_EVT_SHUT_SRV_DOWN|STRM_EVT_SHUT_SRV_UP|STRM_EVT_KILLED)) { /* that an instant kill message, the reason is in _UEVT* */ - stream_shutdown_self(s, (state & TASK_F_UEVT3) ? SF_ERR_UP : (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN); + stream_shutdown_self(s, ((s->pending_events & STRM_EVT_SHUT_SRV_DOWN) ? SF_ERR_DOWN : + (s->pending_events & STRM_EVT_SHUT_SRV_UP) ? SF_ERR_UP: + SF_ERR_KILLED)); } req = &s->req; @@ -1774,24 +1780,23 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) scf_flags = scf->flags; scb_flags = scb->flags; - /* update pending events */ - s->pending_events |= (state & TASK_WOKEN_ANY); - /* 1a: Check for low level timeouts if needed. We just set a flag on * stream connectors when their timeouts have expired. */ - if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) { + if (unlikely(s->pending_events & STRM_EVT_TIMER)) { stream_handle_timeouts(s); /* Once in a while we're woken up because the task expires. But - * this does not necessarily mean that a timeout has been reached. - * So let's not run a whole stream processing if only an expiration - * timeout needs to be refreshed. + * this does not necessarily mean that a timeout has been + * reached. So let's not run a whole stream processing if only + * an expiration timeout needs to be refreshed. To do so, we + * must be sure only the TIMER event was triggered and not + * error/timeout/abort/shut occurred. on both sides. */ if (!((scf->flags | scb->flags) & (SC_FL_ERROR|SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) && !((req->flags | res->flags) & (CF_READ_EVENT|CF_READ_TIMEOUT|CF_WRITE_EVENT|CF_WRITE_TIMEOUT)) && !(s->flags & SF_CONN_EXP) && - ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) { + (s->pending_events == STRM_EVT_TIMER)) { scf->flags &= ~SC_FL_DONT_WAKE; scb->flags &= ~SC_FL_DONT_WAKE; goto update_exp_and_leave; @@ -1952,7 +1957,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) (req->analysers && (scb->flags & SC_FL_SHUT_DONE)) || scf->state != rq_prod_last || scb->state != rq_cons_last || - s->pending_events & TASK_WOKEN_MSG) { + s->pending_events & STRM_EVT_MSG) { unsigned int scf_flags_ana = scf->flags; unsigned int scb_flags_ana = scb->flags; @@ -2058,7 +2063,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) (res->analysers && (scf->flags & SC_FL_SHUT_DONE)) || scf->state != rp_cons_last || scb->state != rp_prod_last || - s->pending_events & TASK_WOKEN_MSG) { + s->pending_events & STRM_EVT_MSG) { unsigned int scb_flags_ana = scb->flags; unsigned int scf_flags_ana = scf->flags; @@ -2524,7 +2529,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) stream_update_both_sc(s); /* Reset pending events now */ - s->pending_events = 0; + s->pending_events = STRM_EVT_NONE; update_exp_and_leave: /* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */ @@ -2554,7 +2559,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) goto resync_stconns; } leave: - s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES); + s->pending_events &= ~STRM_EVT_TIMER; stream_release_buffers(s); DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);