diff --git a/src/mux_quic.c b/src/mux_quic.c index 1fd44612d..f1d41d42a 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -1096,6 +1096,67 @@ static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b) } } +/* Transfer data into stream current Rx buffer from its directly + * following buffer. This is useful when parsing was interrupted due to partial + * data. If following buffer does not exists, nothing is done. + * + * Returns 0 if data transfer was performed. + */ +static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf) +{ + struct qc_stream_rxbuf *rxbuf_next; + struct eb64_node *next; + struct buffer b, b_next; + enum ncb_ret ncb_ret; + size_t to_copy; + int ret = 1; + + BUG_ON(ncb_is_full(&rxbuf->ncb)); + + next = eb64_next(&rxbuf->off_node); + if (!next) + goto out; + + rxbuf_next = container_of(next, struct qc_stream_rxbuf, off_node); + if (rxbuf_next->off_node.key == rxbuf->off_end && + ncb_data(&rxbuf_next->ncb, 0)) { + eb64_delete(&rxbuf->off_node); + eb64_delete(next); + + b = qcs_b_dup(rxbuf); + b_next = qcs_b_dup(rxbuf_next); + to_copy = MIN(b_data(&b_next), ncb_size(&rxbuf->ncb) - b_data(&b)); + + ncb_ret = ncb_add(&rxbuf->ncb, ncb_data(&rxbuf->ncb, 0), + b_head(&b_next), to_copy, NCB_ADD_COMPARE); + BUG_ON(ncb_ret != NCB_RET_OK); + + ncb_ret = ncb_advance(&rxbuf_next->ncb, to_copy); + BUG_ON(ncb_ret != NCB_RET_OK); + + rxbuf->off_node.key = qcs->rx.offset; + rxbuf->off_end = qcs->rx.offset + b_data(&b) + to_copy; + eb64_insert(&qcs->rx.bufs, &rxbuf->off_node); + + rxbuf_next->off_node.key += to_copy; + BUG_ON(rxbuf_next->off_node.key > rxbuf_next->off_end); + + if (rxbuf_next->off_node.key == rxbuf_next->off_end) { + eb64_insert(&qcs->rx.bufs, &rxbuf_next->off_node); + } + else { + b_free(&b_next); + offer_buffers(NULL, 1); + pool_free(pool_head_qc_stream_rxbuf, rxbuf_next); + } + + ret = 0; + } + + out: + return ret; +} + /* Returns the Rx buffer instance for stream read offset. May be NULL if * not already allocated. */ @@ -1213,6 +1274,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs); + restart: rxbuf = qcs_get_curr_rxbuf(qcs); b = qcs_b_dup(rxbuf); @@ -1226,14 +1288,23 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs) if (qcc->glitches != prev_glitches) session_add_glitch_ctr(qcc->conn->owner, qcc->glitches - prev_glitches); - /* TODO not enough data in current rxbuf, merging required with next buffer */ - BUG_ON(rxbuf && !ret && qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end); - if (ret < 0) { TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs); goto err; } + /* App layer cannot decode due to partial data, which is stored + * at a Rx buffer boundary. Try to realign data if possible and + * restart decoding. + */ + if (!ret && rxbuf && !(qcs->flags & QC_SF_DEM_FULL) && + qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end) { + if (!qcs_transfer_rx_data(qcs, rxbuf)) { + TRACE_DEVEL("restart parsing after data realignment", QMUX_EV_QCS_RECV, qcc->conn, qcs); + goto restart; + } + } + if (qcs->flags & QC_SF_TO_RESET) { if (qcs_sc(qcs) && !se_fl_test(qcs->sd, SE_FL_ERROR|SE_FL_ERR_PENDING)) { se_fl_set_error(qcs->sd);