diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 567e04025..d87e2fecd 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -134,11 +134,13 @@ enum qcs_state { QC_SS_CLO, /* closed */ } __attribute__((packed)); -/* STREAM receive buffer. Can handle out-of-order storage. */ +/* STREAM receive buffer. Can handle out-of-order storage. + * Can be used as a tree node to allocate multiple entries ordered by offsets. + */ struct qc_stream_rxbuf { - struct ncbuf ncb; /* data storage with support for out of order offset */ - uint64_t off; /* base offset of current buffer */ - uint64_t off_end; /* first offset directly outside of current buffer */ + struct eb64_node off_node; /* base offset of current buffer, node for QCS rx.bufs */ + struct ncbuf ncb; /* data storage with support for out of order offset */ + uint64_t off_end; /* first offset directly outside of current buffer */ }; struct qcs { @@ -151,7 +153,7 @@ struct qcs { struct { uint64_t offset; /* read offset */ uint64_t offset_max; /* maximum absolute offset received */ - struct qc_stream_rxbuf *buf; /* single receive buffer */ + struct eb_root bufs; /* receive buffers tree ordered by offset */ struct buffer app_buf; /* receive buffer used by stconn layer */ uint64_t msd; /* current max-stream-data limit to enforce */ uint64_t msd_init; /* initial max-stream-data */ diff --git a/src/mux_quic.c b/src/mux_quic.c index 859880f50..7639a6665 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -51,24 +51,21 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf) struct ncbuf *ncbuf; struct buffer buf; - if (!rxbuf) - return; - ncbuf = &rxbuf->ncb; - if (!ncb_is_null(ncbuf)) { buf = b_make(ncbuf->area, ncbuf->size, 0, 0); b_free(&buf); offer_buffers(NULL, 1); } + rxbuf->ncb = NCBUF_NULL; /* Reset DEM_FULL as buffer is released. This ensures mux is not woken * up from rcv_buf stream callback when demux was previously blocked. */ qcs->flags &= ~QC_SF_DEM_FULL; + eb64_delete(&rxbuf->off_node); pool_free(pool_head_qc_stream_rxbuf, rxbuf); - qcs->rx.buf = NULL; } /* Free instance. This function is reserved for internal usage : it must @@ -78,6 +75,7 @@ static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf) static void qcs_free(struct qcs *qcs) { struct qcc *qcc = qcs->qcc; + struct qc_stream_rxbuf *b; TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn, qcs); TRACE_STATE("releasing QUIC stream", QMUX_EV_QCS_END, qcc->conn, qcs); @@ -105,7 +103,11 @@ static void qcs_free(struct qcs *qcs) } /* Free Rx buffer. */ - qcs_free_rxbuf(qcs, qcs->rx.buf); + while (!eb_is_empty(&qcs->rx.bufs)) { + b = container_of(eb64_first(&qcs->rx.bufs), + struct qc_stream_rxbuf, off_node); + qcs_free_rxbuf(qcs, b); + } /* Remove qcs from qcc tree. */ eb64_delete(&qcs->by_id); @@ -160,7 +162,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qfctl_init(&qcs->tx.fc, 0); } - qcs->rx.buf = NULL; + qcs->rx.bufs = EB_ROOT_UNIQUE; qcs->rx.app_buf = BUF_NULL; qcs->rx.offset = qcs->rx.offset_max = 0; @@ -1094,10 +1096,22 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b) } } +/* Returns the current Rx buffer instance for stream. */ +static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs) +{ + struct eb64_node *node; + struct qc_stream_rxbuf *buf; + + node = eb64_first(&qcs->rx.bufs); + buf = container_of(node, struct qc_stream_rxbuf, off_node); + return buf; +} + /* Returns the amount of data readable at stream current offset. */ static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs) { - return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0; + struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs); + return b ? ncb_data(&b->ncb, 0) : 0; } /* Remove from Rx buffer. Flow-control for received offsets may @@ -1107,18 +1121,19 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) { struct qcc *qcc = qcs->qcc; struct quic_frame *frm; - struct ncbuf *ncbuf = &qcs->rx.buf->ncb; + struct qc_stream_rxbuf *rxbuf; enum ncb_ret ret; TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - ret = ncb_advance(ncbuf, bytes); + rxbuf = qcs_get_curr_rxbuf(qcs); + ret = ncb_advance(&rxbuf->ncb, bytes); if (ret) { ABORT_NOW(); /* should not happens because removal only in data */ } - if (ncb_is_empty(ncbuf)) - qcs_free_rxbuf(qcs, qcs->rx.buf); + if (ncb_is_empty(&rxbuf->ncb)) + qcs_free_rxbuf(qcs, rxbuf); qcs->rx.offset += bytes; /* Not necessary to emit a MAX_STREAM_DATA if all data received. */ @@ -1170,6 +1185,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes) */ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) { + struct qc_stream_rxbuf *rxbuf; struct buffer b; ssize_t ret; int fin = 0; @@ -1177,7 +1193,8 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); - b = qcs_b_dup(qcs->rx.buf); + rxbuf = qcs_get_curr_rxbuf(qcs); + b = qcs_b_dup(rxbuf); /* Signal FIN to application if STREAM FIN received with all data. */ if (qcs_is_close_remote(qcs)) @@ -1607,7 +1624,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, } if (len) { - if (!qcs->rx.buf) { + if (eb_is_empty(&qcs->rx.bufs)) { struct qc_stream_rxbuf *buf; buf = pool_alloc(pool_head_qc_stream_rxbuf); if (!buf) { @@ -1617,13 +1634,15 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, } buf->ncb = NCBUF_NULL; - buf->off = qcs->rx.offset; - buf->off_end = buf->off + qmux_stream_rx_bufsz(); - qcs->rx.buf = buf; + buf->off_node.key = qcs->rx.offset; + buf->off_end = qcs->rx.offset + qmux_stream_rx_bufsz(); + eb64_insert(&qcs->rx.bufs, &buf->off_node); + ncbuf = &buf->ncb; } else { - ncbuf = &qcs->rx.buf->ncb; + struct qc_stream_rxbuf *buf = qcs_get_curr_rxbuf(qcs); + ncbuf = &buf->ncb; } if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) { @@ -1789,6 +1808,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max) int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size) { struct qcs *qcs; + struct qc_stream_rxbuf *b; int prev_glitches = qcc->glitches; TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); @@ -1845,7 +1865,11 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f */ qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET; qcs_close_remote(qcs); - qcs_free_rxbuf(qcs, qcs->rx.buf); + while (!eb_is_empty(&qcs->rx.bufs)) { + b = container_of(eb64_first(&qcs->rx.bufs), + struct qc_stream_rxbuf, off_node); + qcs_free_rxbuf(qcs, b); + } out: if (qcc->glitches != prev_glitches)