MINOR: mux-quic: store QCS Rx buf in a single-entry tree

Convert QCS rx buffer pointer to a tree container. Additionnaly, offset
field of qc_stream_rxbuf is thus transformed into a node tree.

For now, only a single Rx buffer is stored at most in QCS tree. Multiple
Rx buffers will be implemented in a future patch to improve QUIC clients
upload throughput.
This commit is contained in:
Amaury Denoyelle 2025-02-24 16:28:50 +01:00
parent cc3c2d1f12
commit a4f31ffeeb
2 changed files with 50 additions and 24 deletions

View File

@ -134,11 +134,13 @@ enum qcs_state {
QC_SS_CLO, /* closed */ QC_SS_CLO, /* closed */
} __attribute__((packed)); } __attribute__((packed));
/* STREAM receive buffer. Can handle out-of-order storage. */ /* STREAM receive buffer. Can handle out-of-order storage.
* Can be used as a tree node to allocate multiple entries ordered by offsets.
*/
struct qc_stream_rxbuf { struct qc_stream_rxbuf {
struct ncbuf ncb; /* data storage with support for out of order offset */ struct eb64_node off_node; /* base offset of current buffer, node for QCS rx.bufs */
uint64_t off; /* base offset of current buffer */ struct ncbuf ncb; /* data storage with support for out of order offset */
uint64_t off_end; /* first offset directly outside of current buffer */ uint64_t off_end; /* first offset directly outside of current buffer */
}; };
struct qcs { struct qcs {
@ -151,7 +153,7 @@ struct qcs {
struct { struct {
uint64_t offset; /* read offset */ uint64_t offset; /* read offset */
uint64_t offset_max; /* maximum absolute offset received */ uint64_t offset_max; /* maximum absolute offset received */
struct qc_stream_rxbuf *buf; /* single receive buffer */ struct eb_root bufs; /* receive buffers tree ordered by offset */
struct buffer app_buf; /* receive buffer used by stconn layer */ struct buffer app_buf; /* receive buffer used by stconn layer */
uint64_t msd; /* current max-stream-data limit to enforce */ uint64_t msd; /* current max-stream-data limit to enforce */
uint64_t msd_init; /* initial max-stream-data */ uint64_t msd_init; /* initial max-stream-data */

View File

@ -51,24 +51,21 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
struct ncbuf *ncbuf; struct ncbuf *ncbuf;
struct buffer buf; struct buffer buf;
if (!rxbuf)
return;
ncbuf = &rxbuf->ncb; ncbuf = &rxbuf->ncb;
if (!ncb_is_null(ncbuf)) { if (!ncb_is_null(ncbuf)) {
buf = b_make(ncbuf->area, ncbuf->size, 0, 0); buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
b_free(&buf); b_free(&buf);
offer_buffers(NULL, 1); offer_buffers(NULL, 1);
} }
rxbuf->ncb = NCBUF_NULL;
/* Reset DEM_FULL as buffer is released. This ensures mux is not woken /* Reset DEM_FULL as buffer is released. This ensures mux is not woken
* up from rcv_buf stream callback when demux was previously blocked. * up from rcv_buf stream callback when demux was previously blocked.
*/ */
qcs->flags &= ~QC_SF_DEM_FULL; qcs->flags &= ~QC_SF_DEM_FULL;
eb64_delete(&rxbuf->off_node);
pool_free(pool_head_qc_stream_rxbuf, rxbuf); pool_free(pool_head_qc_stream_rxbuf, rxbuf);
qcs->rx.buf = NULL;
} }
/* Free <qcs> instance. This function is reserved for internal usage : it must /* Free <qcs> instance. This function is reserved for internal usage : it must
@ -78,6 +75,7 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
static void qcs_free(struct qcs *qcs) static void qcs_free(struct qcs *qcs)
{ {
struct qcc *qcc = qcs->qcc; struct qcc *qcc = qcs->qcc;
struct qc_stream_rxbuf *b;
TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn, qcs); TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn, qcs);
TRACE_STATE("releasing QUIC stream", QMUX_EV_QCS_END, qcc->conn, qcs); TRACE_STATE("releasing QUIC stream", QMUX_EV_QCS_END, qcc->conn, qcs);
@ -105,7 +103,11 @@ static void qcs_free(struct qcs *qcs)
} }
/* Free Rx buffer. */ /* Free Rx buffer. */
qcs_free_rxbuf(qcs, qcs->rx.buf); while (!eb_is_empty(&qcs->rx.bufs)) {
b = container_of(eb64_first(&qcs->rx.bufs),
struct qc_stream_rxbuf, off_node);
qcs_free_rxbuf(qcs, b);
}
/* Remove qcs from qcc tree. */ /* Remove qcs from qcc tree. */
eb64_delete(&qcs->by_id); eb64_delete(&qcs->by_id);
@ -160,7 +162,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
qfctl_init(&qcs->tx.fc, 0); qfctl_init(&qcs->tx.fc, 0);
} }
qcs->rx.buf = NULL; qcs->rx.bufs = EB_ROOT_UNIQUE;
qcs->rx.app_buf = BUF_NULL; qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = qcs->rx.offset_max = 0; qcs->rx.offset = qcs->rx.offset_max = 0;
@ -1094,10 +1096,22 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
} }
} }
/* Returns the current Rx buffer instance for <qcs> stream. */
static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
{
struct eb64_node *node;
struct qc_stream_rxbuf *buf;
node = eb64_first(&qcs->rx.bufs);
buf = container_of(node, struct qc_stream_rxbuf, off_node);
return buf;
}
/* Returns the amount of data readable at <qcs> stream current offset. */ /* Returns the amount of data readable at <qcs> stream current offset. */
static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs) static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
{ {
return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0; struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
return b ? ncb_data(&b->ncb, 0) : 0;
} }
/* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may /* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
@ -1107,18 +1121,19 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
{ {
struct qcc *qcc = qcs->qcc; struct qcc *qcc = qcs->qcc;
struct quic_frame *frm; struct quic_frame *frm;
struct ncbuf *ncbuf = &qcs->rx.buf->ncb; struct qc_stream_rxbuf *rxbuf;
enum ncb_ret ret; enum ncb_ret ret;
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
ret = ncb_advance(ncbuf, bytes); rxbuf = qcs_get_curr_rxbuf(qcs);
ret = ncb_advance(&rxbuf->ncb, bytes);
if (ret) { if (ret) {
ABORT_NOW(); /* should not happens because removal only in data */ ABORT_NOW(); /* should not happens because removal only in data */
} }
if (ncb_is_empty(ncbuf)) if (ncb_is_empty(&rxbuf->ncb))
qcs_free_rxbuf(qcs, qcs->rx.buf); qcs_free_rxbuf(qcs, rxbuf);
qcs->rx.offset += bytes; qcs->rx.offset += bytes;
/* Not necessary to emit a MAX_STREAM_DATA if all data received. */ /* Not necessary to emit a MAX_STREAM_DATA if all data received. */
@ -1170,6 +1185,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
*/ */
static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
{ {
struct qc_stream_rxbuf *rxbuf;
struct buffer b; struct buffer b;
ssize_t ret; ssize_t ret;
int fin = 0; int fin = 0;
@ -1177,7 +1193,8 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
b = qcs_b_dup(qcs->rx.buf); rxbuf = qcs_get_curr_rxbuf(qcs);
b = qcs_b_dup(rxbuf);
/* Signal FIN to application if STREAM FIN received with all data. */ /* Signal FIN to application if STREAM FIN received with all data. */
if (qcs_is_close_remote(qcs)) if (qcs_is_close_remote(qcs))
@ -1607,7 +1624,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
} }
if (len) { if (len) {
if (!qcs->rx.buf) { if (eb_is_empty(&qcs->rx.bufs)) {
struct qc_stream_rxbuf *buf; struct qc_stream_rxbuf *buf;
buf = pool_alloc(pool_head_qc_stream_rxbuf); buf = pool_alloc(pool_head_qc_stream_rxbuf);
if (!buf) { if (!buf) {
@ -1617,13 +1634,15 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
} }
buf->ncb = NCBUF_NULL; buf->ncb = NCBUF_NULL;
buf->off = qcs->rx.offset; buf->off_node.key = qcs->rx.offset;
buf->off_end = buf->off + qmux_stream_rx_bufsz(); buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz();
qcs->rx.buf = buf; eb64_insert(&qcs->rx.bufs, &buf->off_node);
ncbuf = &buf->ncb; ncbuf = &buf->ncb;
} }
else { else {
ncbuf = &qcs->rx.buf->ncb; struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs);
ncbuf = &buf->ncb;
} }
if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) { if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
@ -1789,6 +1808,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size) int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size)
{ {
struct qcs *qcs; struct qcs *qcs;
struct qc_stream_rxbuf *b;
int prev_glitches = qcc->glitches; int prev_glitches = qcc->glitches;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
@ -1845,7 +1865,11 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
*/ */
qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET; qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
qcs_close_remote(qcs); qcs_close_remote(qcs);
qcs_free_rxbuf(qcs, qcs->rx.buf); while (!eb_is_empty(&qcs->rx.bufs)) {
b = container_of(eb64_first(&qcs->rx.bufs),
struct qc_stream_rxbuf, off_node);
qcs_free_rxbuf(qcs, b);
}
out: out:
if (qcc->glitches != prev_glitches) if (qcc->glitches != prev_glitches)