MEDIUM: stream: Map task wake up reasons to dedicated stream events
To fix thread-safety issues when a stream must be shut, three new task states were added. These states are generic (UEVT1, UEVT2 and UEVT3), the task callback function is responsible to know what to do with them. However, it is not really scalable. The best is to use an atomic field in the stream structure itself to deal with these dedicated events. There is already the "pending_events" field that save wake up reasons (TASK_WOKEN_*) to not loose them if process_stream() is interrupted before it had a chance to handle them. So the idea is to introduce a new field to handle streams dedicated events and merged them with the task's wake up reasons used by the stream. This means a mapping must be performed between some task wake up reasons and streams events. Note that not all task wake up reasons will be mapped. In this patch, the "new_events" field is introduced. It is an atomic bit-field. Streams events (STRM_EVT_*) are also introduced to map the task wake up reasons used by process_stream(). Only TASK_WOKEN_TIMER and TASK_WOKEN_MSG are mapped, in addition to TASK_F_UEVT* flags. In process_stream(), "pending_events" field is now filled with new stream events and the mapping of the wake up reasons.
This commit is contained in:
parent
0a52a75ef7
commit
6048460102
@ -718,7 +718,7 @@ static void flt_ot_check_timeouts(struct stream *s, struct filter *f)
|
||||
if (flt_ot_is_disabled(f FLT_OT_DBG_ARGS(, -1)))
|
||||
FLT_OT_RETURN();
|
||||
|
||||
s->pending_events |= TASK_WOKEN_MSG;
|
||||
s->pending_events |= STRM_EVT_MSG;
|
||||
|
||||
flt_ot_return_void(f, &err);
|
||||
|
||||
|
@ -167,6 +167,18 @@ enum {
|
||||
STRM_ENTITY_WREQ_BODY = 0x0003,
|
||||
};
|
||||
|
||||
/* All possible stream events handled by process_stream(). First ones are mapped
|
||||
* from TASK_WOKEN_*.
|
||||
*/
|
||||
enum {
|
||||
STRM_EVT_NONE = 0x00000000, /* No events */
|
||||
STRM_EVT_TIMER = 0x00000001, /* A timer has expired */
|
||||
STRM_EVT_MSG = 0x00000002, /* A message event was triggered */
|
||||
STRM_EVT_SHUT_SRV_DOWN = 0x00000004, /* Must be shut because the selected server became available */
|
||||
STRM_EVT_SHUT_SRV_UP = 0x00000008, /* Must be shut because a preferred server became available */
|
||||
STRM_EVT_KILLED = 0x00000010, /* Must be shut for external reason */
|
||||
};
|
||||
|
||||
/* This function is used to report flags in debugging tools. Please reflect
|
||||
* below any single-bit flag addition above in the same order via the
|
||||
* __APPEND_FLAG macro. The new end of the buffer is returned.
|
||||
@ -241,8 +253,8 @@ struct stream {
|
||||
struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */
|
||||
|
||||
struct task *task; /* the task associated with this stream */
|
||||
unsigned int pending_events; /* the pending events not yet processed by the stream.
|
||||
* This is a bit field of TASK_WOKEN_* */
|
||||
unsigned int pending_events; /* the pending events not yet processed by the stream but handled by process_stream() */
|
||||
unsigned int new_events; /* the new events added since the previous wakeup (never seen by process_stream()). It is atomic field */
|
||||
int conn_retries; /* number of connect retries performed */
|
||||
unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */
|
||||
unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */
|
||||
|
@ -403,6 +403,21 @@ static inline void stream_shutdown(struct stream *s, int why)
|
||||
0));
|
||||
}
|
||||
|
||||
/* Map task states to stream events. TASK_WOKEN_* and TASK_F_UEVT* are mapped on
|
||||
* STRM_EVT_*. Not all states/flags are mapped, only those explicitly used by
|
||||
* the stream.
|
||||
*/
|
||||
static inline unsigned int stream_map_task_state(unsigned int state)
|
||||
{
|
||||
return ((state & TASK_WOKEN_TIMER) ? STRM_EVT_TIMER : 0) |
|
||||
((state & TASK_WOKEN_MSG) ? STRM_EVT_MSG : 0) |
|
||||
((state & TASK_F_UEVT1) ? STRM_EVT_SHUT_SRV_DOWN : 0) |
|
||||
((state & TASK_F_UEVT3) ? STRM_EVT_SHUT_SRV_UP : 0) |
|
||||
((state & TASK_F_UEVT2) ? STRM_EVT_KILLED : 0) |
|
||||
0;
|
||||
}
|
||||
|
||||
|
||||
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
|
||||
void stream_retnclose(struct stream *s, const struct buffer *msg);
|
||||
void sess_set_term_flags(struct stream *s);
|
||||
|
@ -1346,7 +1346,7 @@ static void spoe_check_timeouts(struct stream *s, struct filter *filter)
|
||||
struct spoe_context *ctx = filter->ctx;
|
||||
|
||||
if (tick_is_expired(ctx->process_exp, now_ms))
|
||||
s->pending_events |= TASK_WOKEN_MSG;
|
||||
s->pending_events |= STRM_EVT_MSG;
|
||||
}
|
||||
|
||||
/* Called when we are ready to filter data on a channel */
|
||||
|
35
src/stream.c
35
src/stream.c
@ -432,7 +432,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
|
||||
goto out_fail_alloc;
|
||||
|
||||
s->task = t;
|
||||
s->pending_events = 0;
|
||||
s->pending_events = s->new_events = STRM_EVT_NONE;
|
||||
s->conn_retries = 0;
|
||||
s->max_retries = 0;
|
||||
s->conn_exp = TICK_ETERNITY;
|
||||
@ -1729,9 +1729,15 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
activity[tid].stream_calls++;
|
||||
stream_cond_update_cpu_latency(s);
|
||||
|
||||
if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2 | TASK_F_UEVT3))) {
|
||||
/* update pending events */
|
||||
s->pending_events |= stream_map_task_state(state);
|
||||
s->pending_events |= HA_ATOMIC_XCHG(&s->new_events, STRM_EVT_NONE);
|
||||
|
||||
if (s->pending_events & (STRM_EVT_SHUT_SRV_DOWN|STRM_EVT_SHUT_SRV_UP|STRM_EVT_KILLED)) {
|
||||
/* that an instant kill message, the reason is in _UEVT* */
|
||||
stream_shutdown_self(s, (state & TASK_F_UEVT3) ? SF_ERR_UP : (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN);
|
||||
stream_shutdown_self(s, ((s->pending_events & STRM_EVT_SHUT_SRV_DOWN) ? SF_ERR_DOWN :
|
||||
(s->pending_events & STRM_EVT_SHUT_SRV_UP) ? SF_ERR_UP:
|
||||
SF_ERR_KILLED));
|
||||
}
|
||||
|
||||
req = &s->req;
|
||||
@ -1774,24 +1780,23 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
scf_flags = scf->flags;
|
||||
scb_flags = scb->flags;
|
||||
|
||||
/* update pending events */
|
||||
s->pending_events |= (state & TASK_WOKEN_ANY);
|
||||
|
||||
/* 1a: Check for low level timeouts if needed. We just set a flag on
|
||||
* stream connectors when their timeouts have expired.
|
||||
*/
|
||||
if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
|
||||
if (unlikely(s->pending_events & STRM_EVT_TIMER)) {
|
||||
stream_handle_timeouts(s);
|
||||
|
||||
/* Once in a while we're woken up because the task expires. But
|
||||
* this does not necessarily mean that a timeout has been reached.
|
||||
* So let's not run a whole stream processing if only an expiration
|
||||
* timeout needs to be refreshed.
|
||||
* this does not necessarily mean that a timeout has been
|
||||
* reached. So let's not run a whole stream processing if only
|
||||
* an expiration timeout needs to be refreshed. To do so, we
|
||||
* must be sure only the TIMER event was triggered and not
|
||||
* error/timeout/abort/shut occurred. on both sides.
|
||||
*/
|
||||
if (!((scf->flags | scb->flags) & (SC_FL_ERROR|SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) &&
|
||||
!((req->flags | res->flags) & (CF_READ_EVENT|CF_READ_TIMEOUT|CF_WRITE_EVENT|CF_WRITE_TIMEOUT)) &&
|
||||
!(s->flags & SF_CONN_EXP) &&
|
||||
((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
|
||||
(s->pending_events == STRM_EVT_TIMER)) {
|
||||
scf->flags &= ~SC_FL_DONT_WAKE;
|
||||
scb->flags &= ~SC_FL_DONT_WAKE;
|
||||
goto update_exp_and_leave;
|
||||
@ -1952,7 +1957,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
(req->analysers && (scb->flags & SC_FL_SHUT_DONE)) ||
|
||||
scf->state != rq_prod_last ||
|
||||
scb->state != rq_cons_last ||
|
||||
s->pending_events & TASK_WOKEN_MSG) {
|
||||
s->pending_events & STRM_EVT_MSG) {
|
||||
unsigned int scf_flags_ana = scf->flags;
|
||||
unsigned int scb_flags_ana = scb->flags;
|
||||
|
||||
@ -2058,7 +2063,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
(res->analysers && (scf->flags & SC_FL_SHUT_DONE)) ||
|
||||
scf->state != rp_cons_last ||
|
||||
scb->state != rp_prod_last ||
|
||||
s->pending_events & TASK_WOKEN_MSG) {
|
||||
s->pending_events & STRM_EVT_MSG) {
|
||||
unsigned int scb_flags_ana = scb->flags;
|
||||
unsigned int scf_flags_ana = scf->flags;
|
||||
|
||||
@ -2524,7 +2529,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
stream_update_both_sc(s);
|
||||
|
||||
/* Reset pending events now */
|
||||
s->pending_events = 0;
|
||||
s->pending_events = STRM_EVT_NONE;
|
||||
|
||||
update_exp_and_leave:
|
||||
/* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */
|
||||
@ -2554,7 +2559,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
||||
goto resync_stconns;
|
||||
}
|
||||
leave:
|
||||
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
|
||||
s->pending_events &= ~STRM_EVT_TIMER;
|
||||
stream_release_buffers(s);
|
||||
|
||||
DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);
|
||||
|
Loading…
x
Reference in New Issue
Block a user