MEDIUM: mux-quic: handle too short data splitted on multiple rxbuf

Previous commit introduces support for multiple Rx buffers per QCS
instance. Contiguous data may be splitted accross multiple buffers
depending on their offset.

A particular issue could arise with this new model. Indeed, app_ops
rcv_buf callback can still deal with a single buffer at a time. This may
cause a deadlock in decoding if app_ops layer cannot proceed due to
partial data, but such data are precisely divided on two buffers. This
can for example intervene during HTTP/3 frame header parsing.

To deal with this, a new function is implemented to force data realign
between two contiguous buffers. This is called only when app_ops rcv_buf
returned 0 but data is available in the next buffer after the current
one. In this case, data are transferred from the next into the current
buffer via qcs_transfer_rx_data(). Decoding is then restarted, which
should ensure that app_ops layer has enough data to advance.

During this operation, special care is ensure to removed both
qc_stream_rxbuf entries, as their offset are adjusted. The next buffer
is only reinserted if there is remaining data in it, else it can be
freed.

This case is not easily reproducible as it depends on the HTTP/3 framing
used by the client. It seems to be easily reproduced though with quiche.
$ quiche-client --http-version HTTP/3 --method POST --body /tmp/100m \
  "https://127.0.0.1:20443/post"
This commit is contained in:
Amaury Denoyelle 2025-03-05 15:29:24 +01:00
parent 60f64449fb
commit 75027692a3

View File

@ -1096,6 +1096,67 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
} }
} }
/* Transfer data into <qcs> stream <rxbuf> current Rx buffer from its directly
* following buffer. This is useful when parsing was interrupted due to partial
* data. If following buffer does not exists, nothing is done.
*
* Returns 0 if data transfer was performed.
*/
static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
{
struct qc_stream_rxbuf *rxbuf_next;
struct eb64_node *next;
struct buffer b, b_next;
enum ncb_ret ncb_ret;
size_t to_copy;
int ret = 1;
BUG_ON(ncb_is_full(&rxbuf->ncb));
next = eb64_next(&rxbuf->off_node);
if (!next)
goto out;
rxbuf_next = container_of(next, struct qc_stream_rxbuf, off_node);
if (rxbuf_next->off_node.key == rxbuf->off_end &&
ncb_data(&rxbuf_next->ncb, 0)) {
eb64_delete(&rxbuf->off_node);
eb64_delete(next);
b = qcs_b_dup(rxbuf);
b_next = qcs_b_dup(rxbuf_next);
to_copy = MIN(b_data(&b_next), ncb_size(&rxbuf->ncb) - b_data(&b));
ncb_ret = ncb_add(&rxbuf->ncb, ncb_data(&rxbuf->ncb, 0),
b_head(&b_next), to_copy, NCB_ADD_COMPARE);
BUG_ON(ncb_ret != NCB_RET_OK);
ncb_ret = ncb_advance(&rxbuf_next->ncb, to_copy);
BUG_ON(ncb_ret != NCB_RET_OK);
rxbuf->off_node.key = qcs->rx.offset;
rxbuf->off_end = qcs->rx.offset + b_data(&b) + to_copy;
eb64_insert(&qcs->rx.bufs, &rxbuf->off_node);
rxbuf_next->off_node.key += to_copy;
BUG_ON(rxbuf_next->off_node.key > rxbuf_next->off_end);
if (rxbuf_next->off_node.key == rxbuf_next->off_end) {
eb64_insert(&qcs->rx.bufs, &rxbuf_next->off_node);
}
else {
b_free(&b_next);
offer_buffers(NULL, 1);
pool_free(pool_head_qc_stream_rxbuf, rxbuf_next);
}
ret = 0;
}
out:
return ret;
}
/* Returns the Rx buffer instance for <qcs> stream read offset. May be NULL if /* Returns the Rx buffer instance for <qcs> stream read offset. May be NULL if
* not already allocated. * not already allocated.
*/ */
@ -1213,6 +1274,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
restart:
rxbuf = qcs_get_curr_rxbuf(qcs); rxbuf = qcs_get_curr_rxbuf(qcs);
b = qcs_b_dup(rxbuf); b = qcs_b_dup(rxbuf);
@ -1226,14 +1288,23 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
if (qcc->glitches != prev_glitches) if (qcc->glitches != prev_glitches)
session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches); session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches);
/* TODO not enough data in current rxbuf, merging required with next buffer */
BUG_ON(rxbuf && !ret && qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end);
if (ret < 0) { if (ret < 0) {
TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto err; goto err;
} }
/* App layer cannot decode due to partial data, which is stored
* at a Rx buffer boundary. Try to realign data if possible and
* restart decoding.
*/
if (!ret && rxbuf && !(qcs->flags & QC_SF_DEM_FULL) &&
qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end) {
if (!qcs_transfer_rx_data(qcs, rxbuf)) {
TRACE_DEVEL("restart parsing after data realignment", QMUX_EV_QCS_RECV, qcc->conn, qcs);
goto restart;
}
}
if (qcs->flags & QC_SF_TO_RESET) { if (qcs->flags & QC_SF_TO_RESET) {
if (qcs_sc(qcs) && !se_fl_test(qcs->sd, SE_FL_ERROR|SE_FL_ERR_PENDING)) { if (qcs_sc(qcs) && !se_fl_test(qcs->sd, SE_FL_ERROR|SE_FL_ERR_PENDING)) {
se_fl_set_error(qcs->sd); se_fl_set_error(qcs->sd);