MAJOR: mux-quic: support multiple QCS RX buffers

Implement support for multiple Rx buffers per QCS instances. This
requires several changes mostly in qcc_recv() / qcc_decode_qcs() which
deal with STREAM frames reception and decoding. These multiple buffers
can be stored in QCS rx.bufs tree which was introduced in an earlier
patch.

On STREAM frame reception, a buffer is retrieved from QCS bufs tree, or
allocated if necessary, based on the data starting offset. Each buffers
are aligned on bufsize for convenience. This ensures there is no overlap
between two contiguous buffers. Special care is taken when dealing with
a STREAM frame which must be splitted and stored in two contiguous
buffers.

When decoding input data, qcc_decode_qcs() is still invoked with a
single buffer as input. This requires a new while loop to ensure
decoding is performed accross multiple contiguous buffers until all data
are decoded or app stream buffer is full.

Also, after qcs_consume() has been performed, the stream Rx channel is
immediately closed if FIN was already received and QCS now contains only
a single buffer with all remaining data. This is necessary as qcc_recv()
is unable to close the Rx channel if FIN is received for a buffer
different from the current readable offset.

Note that for now stream flow-control value is still too low to fully
utilizing this new infrastructure and improve clients upload throughput.
Indeed, flow-control max-stream-data initial values are set to match
bufsize. This ensures that each QCS will use 1 buffer, or at most 2 if
data are splitted. A future patch will increase this value to unblock
this limitation.
This commit is contained in:
Amaury Denoyelle 2025-03-07 11:43:01 +01:00
parent 7b168e356f
commit 60f64449fb

View File

@ -1096,18 +1096,33 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
}
}
/* Returns the current Rx buffer instance for <qcs> stream. */
/* Returns the Rx buffer instance for <qcs> stream read offset. May be NULL if
* not already allocated.
*/
static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
{
struct eb64_node *node;
struct qc_stream_rxbuf *buf;
node = eb64_first(&qcs->rx.bufs);
if (!node)
return NULL;
buf = container_of(node, struct qc_stream_rxbuf, off_node);
if (qcs->rx.offset < buf->off_node.key) {
/* first buffer allocated for a future offset */
return NULL;
}
/* Ensures obsolete buffer are not kept inside QCS */
BUG_ON(buf->off_end < qcs->rx.offset);
return buf;
}
/* Returns the amount of data readable at <qcs> stream current offset. */
/* Returns the amount of data readable at <qcs> stream on current buffer. Note
* that this does account for hypothetical contiguous data divided on other
* Rx buffers instances.
*/
static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
{
struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
@ -1211,6 +1226,9 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
if (qcc->glitches != prev_glitches)
session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches);
/* TODO not enough data in current rxbuf, merging required with next buffer */
BUG_ON(rxbuf && !ret && qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end);
if (ret < 0) {
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto err;
@ -1232,8 +1250,20 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
if (ret)
qcs_consume(qcs, ret, rxbuf);
if (ncb_is_empty(&rxbuf->ncb))
if (ncb_is_empty(&rxbuf->ncb)) {
qcs_free_rxbuf(qcs, rxbuf);
/* Close QCS remotely if only one Rx buffer remains and
* all data with FIN already stored in it. This is
* necessary to be performed before app_ops rcv_buf to
* ensure FIN is correctly signalled.
*/
if (qcs->flags & QC_SF_SIZE_KNOWN && !eb_is_empty(&qcs->rx.bufs)) {
const ncb_sz_t avail = qcs_rx_avail_data(qcs);
if (qcs->rx.offset + avail == qcs->rx.offset_max)
qcs_close_remote(qcs);
}
}
}
if (ret || (!b_data(&b) && fin))
@ -1537,6 +1567,62 @@ int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops)
return 1;
}
/* Retrieves the Rx buffer instance usable to store STREAM data starting at
* <offset>. It is dynamically allocated if not already instantiated. <len>
* must contains the size of the STREAM frame. It may be reduced by the
* function if data is too large relative to the buffer starting offset.
* Another buffer instance should be allocated to store the remaining data.
*
* Returns the buffer instance or NULL in case of error.
*/
static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
uint64_t *len)
{
struct qcc *qcc = qcs->qcc;
struct eb64_node *node;
struct qc_stream_rxbuf *buf;
struct ncbuf *ncbuf;
TRACE_ENTER(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
node = eb64_lookup_le(&qcs->rx.bufs, offset);
if (node)
buf = container_of(node, struct qc_stream_rxbuf, off_node);
if (!node || offset >= buf->off_end) {
const uint64_t aligned_off = offset - (offset % qmux_stream_rx_bufsz());
TRACE_DEVEL("allocating a new entry", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
buf = pool_alloc(pool_head_qc_stream_rxbuf);
if (!buf) {
TRACE_ERROR("qcs rxbuf alloc error", QMUX_EV_QCC_RECV, qcc->conn, qcs);
goto err;
}
buf->ncb = NCBUF_NULL;
buf->off_node.key = aligned_off;
buf->off_end = aligned_off + qmux_stream_rx_bufsz();
eb64_insert(&qcs->rx.bufs, &buf->off_node);
}
ncbuf = &buf->ncb;
if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
goto err;
}
if (offset + *len > buf->off_end)
*len = buf->off_end - offset;
TRACE_LEAVE(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
return buf;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
return NULL;
}
/* Handle a new STREAM frame for stream with id <id>. Payload is pointed by
* <data> with length <len> and represents the offset <offset>. <fin> is set if
* the QUIC frame FIN bit is set.
@ -1548,9 +1634,10 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
char fin, char *data)
{
const int fin_standalone = (!len && fin);
struct ncbuf *ncbuf;
struct qcs *qcs;
enum ncb_ret ret;
enum ncb_ret ncb_ret;
uint64_t left;
int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
@ -1634,36 +1721,23 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
offset = qcs->rx.offset;
}
if (len) {
if (eb_is_empty(&qcs->rx.bufs)) {
struct qc_stream_rxbuf *buf;
buf = pool_alloc(pool_head_qc_stream_rxbuf);
if (!buf) {
TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
goto err;
}
left = len;
while (left) {
struct qc_stream_rxbuf *buf;
ncb_sz_t ncb_off;
buf->ncb = NCBUF_NULL;
buf->off_node.key = qcs->rx.offset;
buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz();
eb64_insert(&qcs->rx.bufs, &buf->off_node);
ncbuf = &buf->ncb;
}
else {
struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs);
ncbuf = &buf->ncb;
}
if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
buf = qcs_get_rxbuf(qcs, offset, &len);
if (!buf) {
TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
goto err;
}
ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
switch (ret) {
/* For oldest buffer, ncb_advance() may already have been performed. */
ncb_off = offset - MAX(qcs->rx.offset, buf->off_node.key);
ncb_ret = ncb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_COMPARE);
switch (ncb_ret) {
case NCB_RET_OK:
break;
@ -1688,6 +1762,11 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
qcc->conn, qcs);
return 1;
}
offset += len;
data += len;
left -= len;
len = left;
}
if (fin)
@ -1698,11 +1777,17 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
qcs_close_remote(qcs);
}
if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
qcc_decode_qcs(qcc, qcs);
while ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
ret = qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
qcc_refresh_timeout(qcc);
if (ret <= 0)
break;
BUG_ON_HOT(fin_standalone); /* On fin_standalone <ret> should be NULL, which ensures no infinite loop. */
}
out:
@ -2762,6 +2847,7 @@ static void qcc_wait_for_hs(struct qcc *qcc)
static int qcc_io_recv(struct qcc *qcc)
{
struct qcs *qcs;
int ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
@ -2778,8 +2864,14 @@ static int qcc_io_recv(struct qcc *qcc)
qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
/* No need to add an uni local stream in recv_list. */
BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_local(qcc, qcs->id));
qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
while (qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) {
ret = qcc_decode_qcs(qcc, qcs);
LIST_DEL_INIT(&qcs->el_recv);
if (ret <= 0)
break;
}
}
TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);