From cc3c2d1f12ca2fbf91f599383488b1f20e8ac727 Mon Sep 17 00:00:00 2001 From: Amaury Denoyelle Date: Mon, 24 Feb 2025 16:22:22 +0100 Subject: [PATCH] MINOR: mux-quic: define rxbuf wrapper Define a new type qc_stream_rxbuf. This is used as a wrapper around QCS Rx buffer with encapsulation of the ncbuf storage. It is allocated via a new pool. Several functions are adapted to be able to deal with qc_stream_rxbuf as a wrapper instead of the previous plain ncbuf instance. No functional change should happen with this patch. For now, only a single qc_stream_rxbuf can be instantiated per QCS. However, this new type will be useful to implement multiple Rx buffer storage in a future commit. --- include/haproxy/mux_quic-t.h | 11 ++++- src/mux_quic.c | 84 ++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index c92ed3644..567e04025 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -134,6 +134,13 @@ enum qcs_state { QC_SS_CLO, /* closed */ } __attribute__((packed)); +/* STREAM receive buffer. Can handle out-of-order storage. */ +struct qc_stream_rxbuf { + struct ncbuf ncb; /* data storage with support for out of order offset */ + uint64_t off; /* base offset of current buffer */ + uint64_t off_end; /* first offset directly outside of current buffer */ +}; + struct qcs { struct qcc *qcc; struct sedesc *sd; @@ -142,9 +149,9 @@ struct qcs { void *ctx; /* app-ops context */ struct { - uint64_t offset; /* absolute current base offset of ncbuf */ + uint64_t offset; /* read offset */ uint64_t offset_max; /* maximum absolute offset received */ - struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */ + struct qc_stream_rxbuf *buf; /* single receive buffer */ struct buffer app_buf; /* receive buffer used by stconn layer */ uint64_t msd; /* current max-stream-data limit to enforce */ uint64_t msd_init; /* initial max-stream-data */ diff --git a/src/mux_quic.c b/src/mux_quic.c index ba6b446e5..859880f50 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -34,6 +34,7 @@ DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); +DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct qc_stream_rxbuf)); static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); @@ -44,23 +45,30 @@ static int qcc_is_pacing_active(const struct connection *conn) return !(quic_tune.options & QUIC_TUNE_NO_PACING); } -static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) +/* Free instance and its inner data storage attached to stream. */ +static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf) { + struct ncbuf *ncbuf; struct buffer buf; - if (ncb_is_null(ncbuf)) + if (!rxbuf) return; - buf = b_make(ncbuf->area, ncbuf->size, 0, 0); - b_free(&buf); - offer_buffers(NULL, 1); + ncbuf = &rxbuf->ncb; - *ncbuf = NCBUF_NULL; + if (!ncb_is_null(ncbuf)) { + buf = b_make(ncbuf->area, ncbuf->size, 0, 0); + b_free(&buf); + offer_buffers(NULL, 1); + } /* Reset DEM_FULL as buffer is released. This ensures mux is not woken * up from rcv_buf stream callback when demux was previously blocked. */ qcs->flags &= ~QC_SF_DEM_FULL; + + pool_free(pool_head_qc_stream_rxbuf, rxbuf); + qcs->rx.buf = NULL; } /* Free instance. This function is reserved for internal usage : it must @@ -97,7 +105,7 @@ static void qcs_free(struct qcs *qcs) } /* Free Rx buffer. */ - qcs_free_ncbuf(qcs, &qcs->rx.ncbuf); + qcs_free_rxbuf(qcs, qcs->rx.buf); /* Remove qcs from qcc tree. */ eb64_delete(&qcs->by_id); @@ -152,7 +160,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qfctl_init(&qcs->tx.fc, 0); } - qcs->rx.ncbuf = NCBUF_NULL; + qcs->rx.buf = NULL; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = qcs->rx.offset_max = 0; @@ -1074,10 +1082,22 @@ int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only, return 1; } -/* Simple function to duplicate a buffer */ -static inline struct buffer qcs_b_dup(const struct ncbuf *b) +/* Convert out-of-order storage into a contiguous buffer. */ +static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b) { - return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0)); + if (b) { + const struct ncbuf *ncb = &b->ncb; + return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0)); + } + else { + return BUF_NULL; + } +} + +/* Returns the amount of data readable at stream current offset. */ +static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs) +{ + return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0; } /* Remove from Rx buffer. Flow-control for received offsets may @@ -1087,18 +1107,18 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) { struct qcc *qcc = qcs->qcc; struct quic_frame *frm; - struct ncbuf *buf = &qcs->rx.ncbuf; + struct ncbuf *ncbuf = &qcs->rx.buf->ncb; enum ncb_ret ret; TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - ret = ncb_advance(buf, bytes); + ret = ncb_advance(ncbuf, bytes); if (ret) { ABORT_NOW(); /* should not happens because removal only in data */ } - if (ncb_is_empty(buf)) - qcs_free_ncbuf(qcs, buf); + if (ncb_is_empty(ncbuf)) + qcs_free_rxbuf(qcs, qcs->rx.buf); qcs->rx.offset += bytes; /* Not necessary to emit a MAX_STREAM_DATA if all data received. */ @@ -1157,7 +1177,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - b = qcs_b_dup(&qcs->rx.ncbuf); + b = qcs_b_dup(qcs->rx.buf); /* Signal FIN to application if STREAM FIN received with all data. */ if (qcs_is_close_remote(qcs)) @@ -1500,6 +1520,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data) { const int fin_standalone = (!len && fin); + struct ncbuf *ncbuf; struct qcs *qcs; enum ncb_ret ret; @@ -1586,13 +1607,32 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, } if (len) { - if (!qcs_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) { + if (!qcs->rx.buf) { + struct qc_stream_rxbuf *buf; + buf = pool_alloc(pool_head_qc_stream_rxbuf); + if (!buf) { + TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); + qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); + goto err; + } + + buf->ncb = NCBUF_NULL; + buf->off = qcs->rx.offset; + buf->off_end = buf->off + qmux_stream_rx_bufsz(); + qcs->rx.buf = buf; + ncbuf = &buf->ncb; + } + else { + ncbuf = &qcs->rx.buf->ncb; + } + + if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) { TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0); goto err; } - ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE); + ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE); switch (ret) { case NCB_RET_OK: break; @@ -1624,11 +1664,11 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, qcs->flags |= QC_SF_SIZE_KNOWN; if (qcs->flags & QC_SF_SIZE_KNOWN && - qcs->rx.offset_max == qcs->rx.offset + ncb_data(&qcs->rx.ncbuf, 0)) { + qcs->rx.offset_max == qcs->rx.offset + qcs_rx_avail_data(qcs)) { qcs_close_remote(qcs); } - if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) || + if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) || unlikely(fin_standalone && qcs_is_close_remote(qcs))) { qcc_decode_qcs(qcc, qcs); LIST_DEL_INIT(&qcs->el_recv); @@ -1805,7 +1845,7 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f */ qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET; qcs_close_remote(qcs); - qcs_free_ncbuf(qcs, &qcs->rx.ncbuf); + qcs_free_rxbuf(qcs, qcs->rx.buf); out: if (qcc->glitches != prev_glitches) @@ -3345,7 +3385,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf, /* Ensure DEM_FULL is only set if there is available data to * ensure we never do unnecessary wakeup here. */ - BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0)); + BUG_ON(!qcs_rx_avail_data(qcs)); qcs->flags &= ~QC_SF_DEM_FULL; if (!(qcc->flags & QC_CF_ERRL)) {