diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 96b26e3ce..528543bf8 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -240,6 +240,7 @@ struct qcc_app_ops { #define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */ #define QC_CF_WAIT_HS 0x00000040 /* MUX init before QUIC handshake completed (0-RTT) */ #define QC_CF_QOS 0x00000080 +#define QC_CF_QSTP_SENT 0x00000100 /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the diff --git a/src/mux_quic.c b/src/mux_quic.c index 15ce50cb1..4c60c5e1b 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -38,6 +38,7 @@ DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); +static void qmux_qos_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset); int qmux_is_quic(const struct qcc *qcc) { @@ -2350,7 +2351,7 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin, goto err; } - frm->stream.stream = qcs->stream; + frm->stream.stream = qmux_is_quic(qcc) ? qcs->stream : (struct qc_stream_desc *)qcs; frm->stream.id = qcs->id; frm->stream.offset = 0; frm->stream.dup = 0; @@ -2519,7 +2520,8 @@ static int _qcc_qos_send_frames(struct qcc *qcc, struct list *frms, int stream) } if (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_E) { - /* TODO notify MUX */ + qmux_qos_ctrl_send((struct qcs *)frm->stream.stream, + frm->stream.len, frm->stream.offset); } LIST_DEL_INIT(&frm->list); @@ -4181,3 +4183,487 @@ static struct mux_proto_list mux_proto_quic = { .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_ops }; INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic); + +static int qcc_qos_recv(struct qcc *qcc) +{ + struct connection *conn = qcc->conn; + struct quic_frame frm; + const unsigned char *pos, *end; + int ret; + + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); + + chunk_reset(&trash); + ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, 0); + BUG_ON(ret < 0); + + if (ret) { + b_add(&trash, ret); + + pos = (unsigned char *)b_head(&trash); + end = (unsigned char *)b_tail(&trash); + ret = qc_parse_frm(&frm, NULL, &pos, end, NULL); + BUG_ON(!ret); + + if (frm.type == QUIC_FT_QS_TP) { + struct qf_qs_tp *qs_tp_frm = &frm.qs_tp; + fprintf(stderr, "got qs_transport_parameters frame\n"); + fprintf(stderr, " max_idle_timeout=%llu\n", (ullong)qs_tp_frm->tps.max_idle_timeout); + fprintf(stderr, " initial_max_data=%llu\n", (ullong)qs_tp_frm->tps.initial_max_data); + qfctl_set_max(&qcc->tx.fc, qs_tp_frm->tps.initial_max_data, NULL, NULL); + fprintf(stderr, " initial_max_stream_data_bidi_local=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_local); + qcc->rfctl.msd_bidi_l = qs_tp_frm->tps.initial_max_stream_data_bidi_local; + fprintf(stderr, " initial_max_stream_data_bidi_remote=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_bidi_remote); + qcc->rfctl.msd_bidi_r = qs_tp_frm->tps.initial_max_stream_data_bidi_remote; + fprintf(stderr, " initial_max_stream_data_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_stream_data_uni); + qcc->rfctl.msd_uni_l = qs_tp_frm->tps.initial_max_stream_data_uni; + fprintf(stderr, " initial_max_streams_bidi=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_bidi); + fprintf(stderr, " initial_max_streams_uni=%llu\n", (ullong)qs_tp_frm->tps.initial_max_streams_uni); + } + else if (frm.type >= QUIC_FT_STREAM_8 && + frm.type <= QUIC_FT_STREAM_F) { + struct qf_stream *strm_frm = &frm.stream; + + qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset, + (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data); + } + else if (frm.type == QUIC_FT_RESET_STREAM) { + struct qf_reset_stream *rst_frm = &frm.reset_stream; + qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size); + } + else { + ABORT_NOW(); + } + + } + else { + BUG_ON(!trash.size); + if (!conn_xprt_read0_pending(qcc->conn)) { + conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, + &qcc->wait_event); + } + } + + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return ret; + + err: + return -1; +} + +static int qcc_qos_io_recv(struct qcc *qcc) +{ + struct qcs *qcs; + + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); + + if (qcc->flags & QC_CF_ERRL) { + TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn); + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return 0; + } + + if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV)) + qcc_wait_for_hs(qcc); + + if (!(qcc->wait_event.events & SUB_RETRY_RECV)) + qcc_qos_recv(qcc); + + while (!LIST_ISEMPTY(&qcc->recv_list)) { + qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv); + /* no need to add an uni local stream in recv_list. */ + BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id)); + qcc_decode_qcs(qcc, qcs); + LIST_DEL_INIT(&qcs->el_recv); + } + + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return 0; +} + +/* Used as a callback for qc_stream_desc layer to notify about emission of a + * STREAM frame of length starting at . + */ +static void qmux_qos_ctrl_send(struct qcs *qcs, uint64_t data, uint64_t offset) +{ + struct qcc *qcc = qcs->qcc; + uint64_t diff; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs); + + /* Real off MUST always be the greatest offset sent. */ + BUG_ON(offset > qcs->tx.fc.off_real); + + /* Check if the STREAM frame has already been notified. An empty FIN + * frame must not be considered retransmitted. + */ + if (data && offset + data <= qcs->tx.fc.off_real) { + TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs); + goto out; + } + + /* An empty STREAM frame is only used to notify FIN. A retransmitted + * empty FIN cannot be notified as QCS will be unsubscribed first. + */ + BUG_ON(!data && !(qcs->flags & QC_SF_FIN_STREAM)); + + qcs_idle_open(qcs); + + diff = offset + data - qcs->tx.fc.off_real; + if (diff) { + struct quic_fctl *fc_conn = &qcc->tx.fc; + struct quic_fctl *fc_strm = &qcs->tx.fc; + + /* Ensure real offset never exceeds soft value. */ + BUG_ON(fc_conn->off_real + diff > fc_conn->off_soft); + BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft); + + /* increase offset sum on connection */ + if (qfctl_rinc(fc_conn, diff)) { + TRACE_STATE("connection flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn); + } + + /* increase offset on stream */ + if (qfctl_rinc(fc_strm, diff)) { + TRACE_STATE("stream flow-control reached", + QMUX_EV_QCS_SEND, qcc->conn, qcs); + } + + b_del(&qcs->qos_buf, diff); + /* Release buffer if everything sent and buf is full or stream is waiting for room. */ + if (!qcs_prep_bytes(qcs) && qcs->flags & QC_SF_BLK_MROOM) { + qcs->flags &= ~QC_SF_BLK_MROOM; + qcs_notify_send(qcs); + } + + /* Add measurement for send rate. This is done at the MUX layer + * to account only for STREAM frames without retransmission. + */ + increment_send_rate(diff, 0); + } + + if (!qcs_prep_bytes(qcs)) { + /* Remove stream from send_list if all was sent. */ + LIST_DEL_INIT(&qcs->el_send); + TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs); + + if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) { + /* Close stream locally. */ + qcs_close_local(qcs); + + if (qcs->flags & QC_SF_FIN_STREAM) { + /* Reset flag to not emit multiple FIN STREAM frames. */ + qcs->flags &= ~QC_SF_FIN_STREAM; + } + + if (qcs_is_completed(qcs)) { + TRACE_STATE("add stream in purg_list", QMUX_EV_QCS_SEND, qcc->conn, qcs); + LIST_APPEND(&qcc->purg_list, &qcs->el_send); + } + } + } + + out: + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs); +} + +static int qcc_qos_send_tp(struct qcc *qcc) +{ + struct quic_frame *frm; + struct list list = LIST_HEAD_INIT(list); + + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); + + frm = qc_frm_alloc(QUIC_FT_QS_TP); + if (!frm) { + TRACE_ERROR("frame alloc failure", QMUX_EV_QCC_SEND, qcc->conn); + goto err; + } + + LIST_APPEND(&list, &frm->list); + if (qcc_send_frames(qcc, &list, 0)) { + TRACE_DEVEL("QoS frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); + goto err; + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return 0; + + err: + TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn); + return -1; +} + +static int qcc_qos_io_send(struct qcc *qcc) +{ + struct list *frms = &qcc->tx.frms; + /* Temporary list for QCS on error. */ + struct list qcs_failed = LIST_HEAD_INIT(qcs_failed); + struct qcs *qcs, *qcs_tmp; + uint64_t window_conn __maybe_unused = qfctl_rcap(&qcc->tx.fc); + int ret __maybe_unused = 0, total = 0, resent __maybe_unused; + + TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn); + + if (!(qcc->flags & QC_CF_QSTP_SENT)) { + if (qcc_qos_send_tp(qcc)) + return 0; + qcc->flags |= QC_CF_QSTP_SENT; + } + + /* Check for transport error. */ + if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) { + TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); + goto out; + } + + /* Check for locally detected connection error. */ + if (qcc->flags & QC_CF_ERRL) { + /* Prepare a CONNECTION_CLOSE if not already done. */ + if (!(qcc->flags & QC_CF_ERRL_DONE)) { + TRACE_DATA("report a connection error", QMUX_EV_QCC_SEND|QMUX_EV_QCC_ERR, qcc->conn); + quic_set_connection_close(qcc->conn->handle.qc, qcc->err); + qcc->flags |= QC_CF_ERRL_DONE; + } + goto out; + } + + if (qcc->app_st < QCC_APP_ST_INIT) { + if (qcc_app_init(qcc)) + goto out; + } + + if (qcc->conn->flags & CO_FL_SOCK_WR_SH) { + qcc->conn->flags |= CO_FL_ERROR; + TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn); + goto out; + } + + if (!LIST_ISEMPTY(&qcc->lfctl.frms)) { + if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) { + TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn); + goto out; + } + } + + if (qcc_emit_rs_ss(qcc)) { + TRACE_DEVEL("emission interrupted on STOP_SENDING/RESET_STREAM send error", QMUX_EV_QCC_SEND, qcc->conn); + goto out; + } + + /* Encode new STREAM frames if list has been previously cleared. */ + if (LIST_ISEMPTY(frms) && !LIST_ISEMPTY(&qcc->send_list)) { + total = qcc_build_frms(qcc, &qcs_failed); + if (LIST_ISEMPTY(frms)) + goto out; + } + + ret = qcc_send_frames(qcc, frms, 1); + + out: + /* Re-insert on-error QCS at the end of the send-list. */ + if (!LIST_ISEMPTY(&qcs_failed)) { + list_for_each_entry_safe(qcs, qcs_tmp, &qcs_failed, el_send) { + LIST_DEL_INIT(&qcs->el_send); + LIST_APPEND(&qcc->send_list, &qcs->el_send); + } + + if (!qfctl_rblocked(&qcc->tx.fc)) + tasklet_wakeup(qcc->wait_event.tasklet); + } + + if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) { + TRACE_ERROR("error reported by transport layer", + QMUX_EV_QCC_SEND, qcc->conn); + qcc->flags |= QC_CF_ERR_CONN; + } + + TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn); + return total; +} + +struct task *qcc_qos_io_cb(struct task *t, void *ctx, unsigned int status) +{ + struct qcc *qcc = ctx; + + TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn); + + qcc_qos_io_recv(qcc); + + if (!(qcc->wait_event.events & SUB_RETRY_SEND)) + qcc_qos_io_send(qcc); + + qcc_qos_io_recv(qcc); + + if (qcc_io_process(qcc)) { + TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn); + goto release; + } + + qcc_refresh_timeout(qcc); + + TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn); + + return NULL; + + release: + qcc_shutdown(qcc); + qcc_release(qcc); + + TRACE_LEAVE(QMUX_EV_QCC_WAKE); + + return NULL; +} + +static int qmux_qos_init(struct connection *conn, struct proxy *prx, + struct session *sess, struct buffer *input) +{ + struct qcc *qcc; + + TRACE_ENTER(QMUX_EV_QCC_NEW); + + qcc = pool_alloc(pool_head_qcc); + if (!qcc) { + TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW); + goto err; + } + + _qcc_init(qcc); + conn->ctx = qcc; + qcc->nb_hreq = qcc->nb_sc = 0; + qcc->flags = QC_CF_QOS; + qcc->app_st = QCC_APP_ST_NULL; + qcc->glitches = 0; + qcc->err = quic_err_transport(QC_ERR_NO_ERROR); + + /* hardcoded inital TP values. Is this really necessary? */ + qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = 16384; + qcc->lfctl.ms_uni = 3; + qcc->lfctl.msd_bidi_l = 16384; + qcc->lfctl.msd_bidi_r = 16384; + qcc->lfctl.msd_uni_r = 16384; + qcc->lfctl.cl_bidi_r = 0; + + qcc->lfctl.md = qcc->lfctl.md_init = 16384; + qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0; + + qfctl_init(&qcc->tx.fc, 0); + + qcc->tx.buf_in_flight = 0; + + if (conn_is_back(conn)) { + qcc->next_bidi_l = 0x00; + qcc->largest_bidi_r = 0x01; + qcc->next_uni_l = 0x02; + qcc->largest_uni_r = 0x03; + } + else { + qcc->largest_bidi_r = 0x00; + qcc->next_bidi_l = 0x01; + qcc->largest_uni_r = 0x02; + qcc->next_uni_l = 0x03; + } + + qcc->wait_event.tasklet = tasklet_new(); + if (!qcc->wait_event.tasklet) { + TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW); + goto err; + } + + LIST_INIT(&qcc->recv_list); + LIST_INIT(&qcc->send_list); + LIST_INIT(&qcc->fctl_list); + LIST_INIT(&qcc->buf_wait_list); + LIST_INIT(&qcc->purg_list); + + qcc->wait_event.tasklet->process = qcc_qos_io_cb; + qcc->wait_event.tasklet->context = qcc; + qcc->wait_event.tasklet->state |= TASK_F_WANTS_TIME; + qcc->wait_event.events = 0; + + qcc->proxy = prx; + /* haproxy timeouts */ + if (conn_is_back(conn)) { + qcc->timeout = prx->timeout.server; + qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ? + prx->timeout.serverfin : prx->timeout.server; + } + else { + qcc->timeout = prx->timeout.client; + qcc->shut_timeout = tick_isset(prx->timeout.clientfin) ? + prx->timeout.clientfin : prx->timeout.client; + } + + /* Always allocate task even if timeout is unset. In MUX code, if task + * is NULL, it indicates that a timeout has stroke earlier. + */ + qcc->task = task_new_here(); + if (!qcc->task) { + TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW); + goto err; + } + qcc->task->process = qcc_timeout_task; + qcc->task->context = qcc; + qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout); + + qcc_reset_idle_start(qcc); + LIST_INIT(&qcc->opening_list); + + /* Register conn as app_ops may use it. */ + qcc->conn = conn; + + /* TODO hardcoded HTTP/3 ops */ + if (qcc_install_app_ops(qcc, &h3_ops)) { + TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, conn); + goto err; + } + + if (qcc->app_ops == &h3_ops) + proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3); + + /* Register conn for idle front closing. This is done once everything is allocated. */ + if (!conn_is_back(conn)) + LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list); + + /* init read cycle */ + tasklet_wakeup(qcc->wait_event.tasklet); + + TRACE_LEAVE(QMUX_EV_QCC_NEW, conn); + return 0; + + err: + if (qcc) { + /* In case of MUX init failure, session will ensure connection is freed. */ + qcc->conn = NULL; + qcc_release(qcc); + } + + TRACE_DEVEL("leaving on error", QMUX_EV_QCC_NEW, conn); + return -1; +} + +static const struct mux_ops qmux_qos_ops = { + .init = qmux_qos_init, + .destroy = qmux_destroy, + .detach = qmux_strm_detach, + .rcv_buf = qmux_strm_rcv_buf, + .snd_buf = qmux_strm_snd_buf, + .nego_fastfwd = qmux_strm_nego_ff, + .done_fastfwd = qmux_strm_done_ff, + .resume_fastfwd = qmux_strm_resume_ff, + .subscribe = qmux_strm_subscribe, + .unsubscribe = qmux_strm_unsubscribe, + .wake = qmux_wake, + .shut = qmux_strm_shut, + .ctl = qmux_ctl, + .sctl = qmux_sctl, + .show_sd = qmux_strm_show_sd, + .flags = MX_FL_HTX|MX_FL_NO_UPG, + .name = "QOS", +}; + +static struct mux_proto_list mux_proto_qos = + { .token = IST("qos"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_qos_ops }; + +INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_qos);