diff --git a/include/proto/connection.h b/include/proto/connection.h index 16103b7db..a2580f10b 100644 --- a/include/proto/connection.h +++ b/include/proto/connection.h @@ -598,6 +598,7 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn) { cs->obj_type = OBJ_TYPE_CS; cs->flags = CS_FL_NONE; + LIST_INIT(&cs->wait_list.list); LIST_INIT(&cs->send_wait_list); cs->conn = conn; } @@ -663,6 +664,8 @@ static inline struct connection *conn_new() /* Releases a conn_stream previously allocated by cs_new() */ static inline void cs_free(struct conn_stream *cs) { + if (cs->wait_list.task) + tasklet_free(cs->wait_list.task); pool_free(pool_head_connstream, cs); } @@ -681,6 +684,11 @@ static inline struct conn_stream *cs_new(struct connection *conn) if (!likely(cs)) return NULL; + cs->wait_list.task = tasklet_new(); + if (!likely(cs->wait_list.task)) { + cs_free(cs); + return NULL; + } if (!conn) { conn = conn_new(); if (!likely(conn)) { diff --git a/include/types/connection.h b/include/types/connection.h index de0c32a22..9a57fce89 100644 --- a/include/types/connection.h +++ b/include/types/connection.h @@ -307,7 +307,6 @@ struct xprt_ops { struct mux_ops { int (*init)(struct connection *conn); /* early initialization */ void (*recv)(struct connection *conn); /* mux-layer recv callback */ - void (*send)(struct connection *conn); /* mux-layer send callback */ int (*wake)(struct connection *conn); /* mux-layer callback to report activity, mandatory */ void (*update_poll)(struct conn_stream *cs); /* commit cs flags to mux/conn */ size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */ @@ -334,7 +333,6 @@ struct mux_ops { */ struct data_cb { void (*recv)(struct conn_stream *cs); /* data-layer recv callback */ - void (*send)(struct conn_stream *cs); /* data-layer send callback */ int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */ int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */ char name[8]; /* data layer name, zero-terminated */ @@ -370,6 +368,7 @@ struct conn_stream { enum obj_type obj_type; /* differentiates connection from applet context */ unsigned int flags; /* CS_FL_* */ struct connection *conn; /* xprt-level connection */ + struct wait_list wait_list; /* We're in a wait list for send */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ void *data; /* pointer to upper layer's entity (eg: stream interface) */ const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */ diff --git a/src/checks.c b/src/checks.c index c7b5c30a9..624a06594 100644 --- a/src/checks.c +++ b/src/checks.c @@ -69,6 +69,7 @@ static int httpchk_expect(struct server *s, int done); static int tcpcheck_get_step_id(struct check *); static char * tcpcheck_get_step_comment(struct check *, int); static int tcpcheck_main(struct check *); +static void __event_srv_chk_w(struct conn_stream *cs); static struct pool_head *pool_head_email_alert = NULL; static struct pool_head *pool_head_tcpcheck_rule = NULL; @@ -709,23 +710,42 @@ static void chk_report_conn_err(struct check *check, int errno_bck, int expired) * the connection acknowledgement. If the proxy requires L7 health-checks, * it sends the request. In other cases, it calls set_server_check_status() * to set check->status, check->duration and check->result. + */ +static struct task *event_srv_chk_w(struct task *task, void *ctx, unsigned short state) +{ + struct conn_stream *cs = ctx; + struct check __maybe_unused *check = cs->data; + + HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock); + __event_srv_chk_w(cs); + HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock); + return NULL; +} + +/* same as above but protected by the server lock. * * Please do NOT place any return statement in this function and only leave - * via the out_unlock label. + * via the out label. NOTE THAT THIS FUNCTION DOESN'T LOCK, YOU PROBABLY WANT + * TO USE event_srv_chk_w() instead. */ -static void event_srv_chk_w(struct conn_stream *cs) +static void __event_srv_chk_w(struct conn_stream *cs) { struct connection *conn = cs->conn; struct check *check = cs->data; struct server *s = check->server; struct task *t = check->task; - HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock); if (unlikely(check->result == CHK_RES_FAILED)) goto out_wakeup; - if (conn->flags & CO_FL_HANDSHAKE) - goto out_unlock; + if (conn->flags & CO_FL_HANDSHAKE) { + if (cs->wait_list.task->process != event_srv_chk_w) { + cs->wait_list.task->process = event_srv_chk_w; + cs->wait_list.task->context = cs; + } + LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list); + goto out; + } if (retrieve_errno_from_socket(conn)) { chk_report_conn_err(check, errno, 0); @@ -748,19 +768,24 @@ static void event_srv_chk_w(struct conn_stream *cs) /* wake() will take care of calling tcpcheck_main() */ if (check->type == PR_O2_TCPCHK_CHK) - goto out_unlock; + goto out; if (b_data(&check->bo)) { b_del(&check->bo, conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0)); b_realign_if_empty(&check->bo); - if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) { chk_report_conn_err(check, errno, 0); __cs_stop_both(cs); goto out_wakeup; } - if (b_data(&check->bo)) - goto out_unlock; + if (b_data(&check->bo)) { + if (!cs->wait_list.task->process) { + cs->wait_list.task->process = event_srv_chk_w; + cs->wait_list.task->context = cs; + } + conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list); + goto out; + } } /* full request sent, we allow up to if nonzero for a response */ @@ -774,8 +799,8 @@ static void event_srv_chk_w(struct conn_stream *cs) task_wakeup(t, TASK_WOKEN_IO); out_nowake: __cs_stop_send(cs); /* nothing more to write */ - out_unlock: - HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock); + out: + return; } /* @@ -1390,7 +1415,8 @@ static int wake_srv_chk(struct conn_stream *cs) ret = tcpcheck_main(check); cs = check->cs; conn = cs_conn(cs); - } + } else if (LIST_ISEMPTY(&cs->wait_list.list)) + __event_srv_chk_w(cs); if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) { /* We may get error reports bypassing the I/O handlers, typically @@ -1433,7 +1459,6 @@ static int wake_srv_chk(struct conn_stream *cs) struct data_cb check_conn_cb = { .recv = event_srv_chk_r, - .send = event_srv_chk_w, .wake = wake_srv_chk, .name = "CHCK", }; diff --git a/src/connection.c b/src/connection.c index 94e7209b3..e02129093 100644 --- a/src/connection.c +++ b/src/connection.c @@ -64,6 +64,7 @@ void conn_fd_handler(int fd) { struct connection *conn = fdtab[fd].owner; unsigned int flags; + int can_send = 0; if (unlikely(!conn)) { activity[tid].conn_dead++; @@ -127,7 +128,7 @@ void conn_fd_handler(int fd) * both of which will be detected below. */ flags = 0; - conn->mux->send(conn); + can_send = LIST_ISEMPTY(&conn->send_wait_list); while (!LIST_ISEMPTY(&conn->send_wait_list)) { struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n, struct wait_list *, list); @@ -195,9 +196,9 @@ void conn_fd_handler(int fd) * Note that the wake callback is allowed to release the connection and * the fd (and return < 0 in this case). */ - if ((((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) || + if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) || ((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED && - (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED)) && + (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) && conn->mux->wake(conn) < 0) return; diff --git a/src/mux_h2.c b/src/mux_h2.c index ba6bd8d8a..3dfb396a5 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -121,6 +121,7 @@ struct h2c { struct list fctl_list; /* list of streams blocked by connection's fctl */ struct buffer_wait buf_wait; /* wait list for buffer allocations */ struct list send_wait_list; /* list of tasks to wake when we're ready to send */ + struct wait_list wait_list; /* We're in a wait list, to send */ }; /* H2 stream state, in h2s->st */ @@ -217,6 +218,7 @@ static const struct h2s *h2_idle_stream = &(const struct h2s){ }; static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state); +static struct task *h2_send(struct task *t, void *ctx, unsigned short state); /*****************************************************/ /* functions below are for dynamic buffer management */ @@ -347,6 +349,12 @@ static int h2c_frt_init(struct connection *conn) t->expire = tick_add(now_ms, h2c->timeout); } + h2c->wait_list.task = tasklet_new(); + if (!h2c->wait_list.task) + goto fail; + h2c->wait_list.task->process = h2_send; + h2c->wait_list.task->context = conn; + h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size); if (!h2c->ddht) goto fail; @@ -381,12 +389,15 @@ static int h2c_frt_init(struct connection *conn) task_queue(t); conn_xprt_want_recv(conn); LIST_INIT(&h2c->send_wait_list); + LIST_INIT(&h2c->wait_list.list); /* mux->wake will be called soon to complete the operation */ return 0; fail: if (t) task_free(t); + if (h2c->wait_list.task) + tasklet_free(h2c->wait_list.task); pool_free(pool_head_h2c, h2c); return -1; } @@ -445,6 +456,8 @@ static void h2_release(struct connection *conn) task_wakeup(h2c->task, TASK_WOKEN_OTHER); h2c->task = NULL; } + if (h2c->wait_list.task) + tasklet_free(h2c->wait_list.task); pool_free(pool_head_h2c, h2c); } @@ -2049,7 +2062,6 @@ static int h2_process_mux(struct h2c *h2c) h2s->flags &= ~H2_SF_BLK_ANY; if (h2s->cs) { - h2s->cs->data_cb->send(h2s->cs); h2s->cs->data_cb->wake(h2s->cs); } else { h2s_send_rst_stream(h2c, h2s); @@ -2091,7 +2103,6 @@ static int h2_process_mux(struct h2c *h2c) h2s->flags &= ~H2_SF_BLK_ANY; if (h2s->cs) { - h2s->cs->data_cb->send(h2s->cs); h2s->cs->data_cb->wake(h2s->cs); } else { h2s_send_rst_stream(h2c, h2s); @@ -2167,18 +2178,19 @@ static void h2_recv(struct connection *conn) return; } -/* callback called on send event by the connection handler */ -static void h2_send(struct connection *conn) +/* Try to send data if possible */ +static struct task *h2_send(struct task *t, void *ctx, unsigned short state) { + struct connection *conn = ctx; struct h2c *h2c = conn->mux_ctx; int done; if (conn->flags & CO_FL_ERROR) - return; + return NULL; if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) { /* a handshake was requested */ - return; + return NULL; } /* This loop is quite simple : it tries to fill as much as it can from @@ -2243,6 +2255,13 @@ static void h2_send(struct connection *conn) } } + /* We're done, no more to send */ + if (!b_data(&h2c->mbuf)) + return NULL; +schedule: + if (LIST_ISEMPTY(&h2c->wait_list.list)) + conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_list); + return NULL; } /* callback called on any event by the connection handler. @@ -2349,6 +2368,8 @@ static int h2_wake(struct connection *conn) else h2c->task->expire = TICK_ETERNITY; } + + h2_send(NULL, conn, 0); return 0; } @@ -3474,8 +3495,6 @@ static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_ else if (LIST_ISEMPTY(&h2s->list)) { if (h2s->flags & H2_SF_BLK_MFCTL) LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list); - else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM)) - LIST_ADDQ(&h2s->h2c->send_list, &h2s->list); } return total; @@ -3575,7 +3594,6 @@ static int h2_parse_max_concurrent_streams(char **args, int section_type, struct const struct mux_ops h2_ops = { .init = h2_init, .recv = h2_recv, - .send = h2_send, .wake = h2_wake, .update_poll = h2_update_poll, .rcv_buf = h2_rcv_buf, diff --git a/src/mux_pt.c b/src/mux_pt.c index 059e4995c..7fb377955 100644 --- a/src/mux_pt.c +++ b/src/mux_pt.c @@ -97,19 +97,6 @@ static void mux_pt_recv(struct connection *conn) cs_update_mux_polling(cs); } -/* callback to be used by default for the pass-through mux. It simply calls the - * data layer send() callback which must be set. - */ -static void mux_pt_send(struct connection *conn) -{ - struct conn_stream *cs = conn->mux_ctx; - - if (conn->flags & CO_FL_ERROR) - cs->flags |= CS_FL_ERROR; - cs->data_cb->send(cs); - cs_update_mux_polling(cs); -} - /* * Attach a new stream to a connection * (Used for outgoing connections) @@ -207,7 +194,6 @@ static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe) const struct mux_ops mux_pt_ops = { .init = mux_pt_init, .recv = mux_pt_recv, - .send = mux_pt_send, .wake = mux_pt_wake, .update_poll = mux_pt_update_poll, .rcv_buf = mux_pt_rcv_buf, diff --git a/src/stream_interface.c b/src/stream_interface.c index 2fecb9403..6fb7b535f 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -52,10 +52,10 @@ static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); static void si_cs_recv_cb(struct conn_stream *cs); -static void si_cs_send_cb(struct conn_stream *cs); static int si_cs_wake_cb(struct conn_stream *cs); static int si_idle_conn_wake_cb(struct conn_stream *cs); static void si_idle_conn_null_cb(struct conn_stream *cs); +static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state); /* stream-interface operations for embedded tasks */ struct si_ops si_embedded_ops = { @@ -85,14 +85,12 @@ struct si_ops si_applet_ops = { struct data_cb si_conn_cb = { .recv = si_cs_recv_cb, - .send = si_cs_send_cb, .wake = si_cs_wake_cb, .name = "STRM", }; struct data_cb si_idle_conn_cb = { .recv = si_idle_conn_null_cb, - .send = si_idle_conn_null_cb, .wake = si_idle_conn_wake_cb, .name = "IDLE", }; @@ -462,6 +460,10 @@ void stream_int_notify(struct stream_interface *si) struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); + /* If we have data to send, try it now */ + if (!channel_is_empty(oc) && objt_cs(si->end)) + si_cs_send(NULL, objt_cs(si->end), 0); + /* process consumer side */ if (channel_is_empty(oc)) { struct connection *conn = objt_cs(si->end) ? objt_cs(si->end)->conn : NULL; @@ -632,20 +634,42 @@ static int si_cs_wake_cb(struct conn_stream *cs) * caller to commit polling changes. The caller should check conn->flags * for errors. */ -static void si_cs_send(struct conn_stream *cs) +static struct task * si_cs_send(struct task *t, void *ctx, unsigned short state) { + struct conn_stream *cs = ctx; struct connection *conn = cs->conn; struct stream_interface *si = cs->data; struct channel *oc = si_oc(si); int ret; + int did_send = 0; + + /* We're already waiting to be able to send, give up */ + if (!LIST_ISEMPTY(&cs->wait_list.list)) + return NULL; + + if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) + return NULL; + + if (conn->flags & CO_FL_HANDSHAKE) { + /* a handshake was requested */ + /* Schedule ourself to be woken up once the handshake is done */ + LIST_ADDQ(&conn->send_wait_list, &cs->wait_list.list); + return NULL; + } + + /* we might have been called just after an asynchronous shutw */ + if (si_oc(si)->flags & CF_SHUTW) + return NULL; /* ensure it's only set if a write attempt has succeeded */ oc->flags &= ~CF_WRITE_PARTIAL; if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) { ret = conn->mux->snd_pipe(cs, oc->pipe); - if (ret > 0) + if (ret > 0) { oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT; + did_send = 1; + } if (!oc->pipe->data) { put_pipe(oc->pipe); @@ -653,14 +677,14 @@ static void si_cs_send(struct conn_stream *cs) } if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return; + return NULL; } /* At this point, the pipe is empty, but we may still have data pending * in the normal buffer. */ if (!co_data(oc)) - return; + goto wake_others; /* when we're here, we already know that there is no spliced * data left, and that there are sendable buffered data. @@ -691,6 +715,7 @@ static void si_cs_send(struct conn_stream *cs) ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag); if (ret > 0) { + did_send = 1; oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA | CF_WRITE_EVENT; co_set_data(oc, co_data(oc) - ret); @@ -706,6 +731,26 @@ static void si_cs_send(struct conn_stream *cs) */ } } + /* We couldn't send all of our data, let the mux know we'd like to send more */ + if (co_data(oc)) { + if (!cs->wait_list.task->process) { + cs->wait_list.task->process = si_cs_send; + cs->wait_list.task->context = ctx; + } + conn->mux->subscribe(cs, SUB_CAN_SEND, &cs->wait_list); + } +wake_others: + /* Maybe somebody was waiting for this conn_stream, wake them */ + if (did_send) { + while (!LIST_ISEMPTY(&cs->send_wait_list)) { + struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n, + struct wait_list *, list); + LIST_DEL(&sw->list); + LIST_INIT(&sw->list); + tasklet_wakeup(sw->task); + } + } + return NULL; } /* This function is designed to be called from within the stream handler to @@ -995,7 +1040,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) __cs_want_send(cs); - si_cs_send(cs); + si_cs_send(NULL, cs, 0); if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) { /* Write error on the file descriptor */ __cs_stop_both(cs); @@ -1312,34 +1357,6 @@ static void si_cs_recv_cb(struct conn_stream *cs) return; } -/* - * This is the callback which is called by the connection layer to send data - * from the buffer to the connection. It iterates over the transport layer's - * snd_buf function. - */ -static void si_cs_send_cb(struct conn_stream *cs) -{ - struct connection *conn = cs->conn; - struct stream_interface *si = cs->data; - - if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return; - - if (conn->flags & CO_FL_HANDSHAKE) - /* a handshake was requested */ - return; - - /* we might have been called just after an asynchronous shutw */ - if (si_oc(si)->flags & CF_SHUTW) - return; - - /* OK there are data waiting to be sent */ - si_cs_send(cs); - - /* OK all done */ - return; -} - /* * This function propagates a null read received on a socket-based connection. * It updates the stream interface. If the stream interface has SI_FL_NOHALF,