MINOR: mux-quic: define rxbuf wrapper
Define a new type qc_stream_rxbuf. This is used as a wrapper around QCS Rx buffer with encapsulation of the ncbuf storage. It is allocated via a new pool. Several functions are adapted to be able to deal with qc_stream_rxbuf as a wrapper instead of the previous plain ncbuf instance. No functional change should happen with this patch. For now, only a single qc_stream_rxbuf can be instantiated per QCS. However, this new type will be useful to implement multiple Rx buffer storage in a future commit.
This commit is contained in:
parent
4b1e63d191
commit
cc3c2d1f12
@ -134,6 +134,13 @@ enum qcs_state {
|
|||||||
QC_SS_CLO, /* closed */
|
QC_SS_CLO, /* closed */
|
||||||
} __attribute__((packed));
|
} __attribute__((packed));
|
||||||
|
|
||||||
|
/* STREAM receive buffer. Can handle out-of-order storage. */
|
||||||
|
struct qc_stream_rxbuf {
|
||||||
|
struct ncbuf ncb; /* data storage with support for out of order offset */
|
||||||
|
uint64_t off; /* base offset of current buffer */
|
||||||
|
uint64_t off_end; /* first offset directly outside of current buffer */
|
||||||
|
};
|
||||||
|
|
||||||
struct qcs {
|
struct qcs {
|
||||||
struct qcc *qcc;
|
struct qcc *qcc;
|
||||||
struct sedesc *sd;
|
struct sedesc *sd;
|
||||||
@ -142,9 +149,9 @@ struct qcs {
|
|||||||
void *ctx; /* app-ops context */
|
void *ctx; /* app-ops context */
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
uint64_t offset; /* absolute current base offset of ncbuf */
|
uint64_t offset; /* read offset */
|
||||||
uint64_t offset_max; /* maximum absolute offset received */
|
uint64_t offset_max; /* maximum absolute offset received */
|
||||||
struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
|
struct qc_stream_rxbuf *buf; /* single receive buffer */
|
||||||
struct buffer app_buf; /* receive buffer used by stconn layer */
|
struct buffer app_buf; /* receive buffer used by stconn layer */
|
||||||
uint64_t msd; /* current max-stream-data limit to enforce */
|
uint64_t msd; /* current max-stream-data limit to enforce */
|
||||||
uint64_t msd_init; /* initial max-stream-data */
|
uint64_t msd_init; /* initial max-stream-data */
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
|
|
||||||
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
|
DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
|
||||||
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
|
DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
|
||||||
|
DECLARE_STATIC_POOL(pool_head_qc_stream_rxbuf, "qc_stream_rxbuf", sizeof(struct qc_stream_rxbuf));
|
||||||
|
|
||||||
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
|
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
|
||||||
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
|
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
|
||||||
@ -44,23 +45,30 @@ static int qcc_is_pacing_active(const struct connection *conn)
|
|||||||
return !(quic_tune.options & QUIC_TUNE_NO_PACING);
|
return !(quic_tune.options & QUIC_TUNE_NO_PACING);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
|
/* Free <rxbuf> instance and its inner data storage attached to <qcs> stream. */
|
||||||
|
static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
|
||||||
{
|
{
|
||||||
|
struct ncbuf *ncbuf;
|
||||||
struct buffer buf;
|
struct buffer buf;
|
||||||
|
|
||||||
if (ncb_is_null(ncbuf))
|
if (!rxbuf)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
|
ncbuf = &rxbuf->ncb;
|
||||||
b_free(&buf);
|
|
||||||
offer_buffers(NULL, 1);
|
|
||||||
|
|
||||||
*ncbuf = NCBUF_NULL;
|
if (!ncb_is_null(ncbuf)) {
|
||||||
|
buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
|
||||||
|
b_free(&buf);
|
||||||
|
offer_buffers(NULL, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/* Reset DEM_FULL as buffer is released. This ensures mux is not woken
|
/* Reset DEM_FULL as buffer is released. This ensures mux is not woken
|
||||||
* up from rcv_buf stream callback when demux was previously blocked.
|
* up from rcv_buf stream callback when demux was previously blocked.
|
||||||
*/
|
*/
|
||||||
qcs->flags &= ~QC_SF_DEM_FULL;
|
qcs->flags &= ~QC_SF_DEM_FULL;
|
||||||
|
|
||||||
|
pool_free(pool_head_qc_stream_rxbuf, rxbuf);
|
||||||
|
qcs->rx.buf = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free <qcs> instance. This function is reserved for internal usage : it must
|
/* Free <qcs> instance. This function is reserved for internal usage : it must
|
||||||
@ -97,7 +105,7 @@ static void qcs_free(struct qcs *qcs)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Free Rx buffer. */
|
/* Free Rx buffer. */
|
||||||
qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
|
qcs_free_rxbuf(qcs, qcs->rx.buf);
|
||||||
|
|
||||||
/* Remove qcs from qcc tree. */
|
/* Remove qcs from qcc tree. */
|
||||||
eb64_delete(&qcs->by_id);
|
eb64_delete(&qcs->by_id);
|
||||||
@ -152,7 +160,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
|
|||||||
qfctl_init(&qcs->tx.fc, 0);
|
qfctl_init(&qcs->tx.fc, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
qcs->rx.ncbuf = NCBUF_NULL;
|
qcs->rx.buf = NULL;
|
||||||
qcs->rx.app_buf = BUF_NULL;
|
qcs->rx.app_buf = BUF_NULL;
|
||||||
qcs->rx.offset = qcs->rx.offset_max = 0;
|
qcs->rx.offset = qcs->rx.offset_max = 0;
|
||||||
|
|
||||||
@ -1074,10 +1082,22 @@ int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only,
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Simple function to duplicate a buffer */
|
/* Convert <b> out-of-order storage into a contiguous buffer. */
|
||||||
static inline struct buffer qcs_b_dup(const struct ncbuf *b)
|
static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
|
||||||
{
|
{
|
||||||
return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
|
if (b) {
|
||||||
|
const struct ncbuf *ncb = &b->ncb;
|
||||||
|
return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return BUF_NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns the amount of data readable at <qcs> stream current offset. */
|
||||||
|
static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
|
||||||
|
{
|
||||||
|
return qcs->rx.buf ? ncb_data(&qcs->rx.buf->ncb, 0) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
|
/* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
|
||||||
@ -1087,18 +1107,18 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes)
|
|||||||
{
|
{
|
||||||
struct qcc *qcc = qcs->qcc;
|
struct qcc *qcc = qcs->qcc;
|
||||||
struct quic_frame *frm;
|
struct quic_frame *frm;
|
||||||
struct ncbuf *buf = &qcs->rx.ncbuf;
|
struct ncbuf *ncbuf = &qcs->rx.buf->ncb;
|
||||||
enum ncb_ret ret;
|
enum ncb_ret ret;
|
||||||
|
|
||||||
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
|
|
||||||
ret = ncb_advance(buf, bytes);
|
ret = ncb_advance(ncbuf, bytes);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
ABORT_NOW(); /* should not happens because removal only in data */
|
ABORT_NOW(); /* should not happens because removal only in data */
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ncb_is_empty(buf))
|
if (ncb_is_empty(ncbuf))
|
||||||
qcs_free_ncbuf(qcs, buf);
|
qcs_free_rxbuf(qcs, qcs->rx.buf);
|
||||||
|
|
||||||
qcs->rx.offset += bytes;
|
qcs->rx.offset += bytes;
|
||||||
/* Not necessary to emit a MAX_STREAM_DATA if all data received. */
|
/* Not necessary to emit a MAX_STREAM_DATA if all data received. */
|
||||||
@ -1157,7 +1177,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
|
|||||||
|
|
||||||
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
|
|
||||||
b = qcs_b_dup(&qcs->rx.ncbuf);
|
b = qcs_b_dup(qcs->rx.buf);
|
||||||
|
|
||||||
/* Signal FIN to application if STREAM FIN received with all data. */
|
/* Signal FIN to application if STREAM FIN received with all data. */
|
||||||
if (qcs_is_close_remote(qcs))
|
if (qcs_is_close_remote(qcs))
|
||||||
@ -1500,6 +1520,7 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
|||||||
char fin, char *data)
|
char fin, char *data)
|
||||||
{
|
{
|
||||||
const int fin_standalone = (!len && fin);
|
const int fin_standalone = (!len && fin);
|
||||||
|
struct ncbuf *ncbuf;
|
||||||
struct qcs *qcs;
|
struct qcs *qcs;
|
||||||
enum ncb_ret ret;
|
enum ncb_ret ret;
|
||||||
|
|
||||||
@ -1586,13 +1607,32 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (len) {
|
if (len) {
|
||||||
if (!qcs_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
|
if (!qcs->rx.buf) {
|
||||||
|
struct qc_stream_rxbuf *buf;
|
||||||
|
buf = pool_alloc(pool_head_qc_stream_rxbuf);
|
||||||
|
if (!buf) {
|
||||||
|
TRACE_ERROR("rxbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
|
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf->ncb = NCBUF_NULL;
|
||||||
|
buf->off = qcs->rx.offset;
|
||||||
|
buf->off_end = buf->off + qmux_stream_rx_bufsz();
|
||||||
|
qcs->rx.buf = buf;
|
||||||
|
ncbuf = &buf->ncb;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ncbuf = &qcs->rx.buf->ncb;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
|
||||||
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
|
||||||
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
|
qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
|
ret = ncb_add(ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
|
||||||
switch (ret) {
|
switch (ret) {
|
||||||
case NCB_RET_OK:
|
case NCB_RET_OK:
|
||||||
break;
|
break;
|
||||||
@ -1624,11 +1664,11 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
|
|||||||
qcs->flags |= QC_SF_SIZE_KNOWN;
|
qcs->flags |= QC_SF_SIZE_KNOWN;
|
||||||
|
|
||||||
if (qcs->flags & QC_SF_SIZE_KNOWN &&
|
if (qcs->flags & QC_SF_SIZE_KNOWN &&
|
||||||
qcs->rx.offset_max == qcs->rx.offset + ncb_data(&qcs->rx.ncbuf, 0)) {
|
qcs->rx.offset_max == qcs->rx.offset + qcs_rx_avail_data(qcs)) {
|
||||||
qcs_close_remote(qcs);
|
qcs_close_remote(qcs);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) ||
|
if ((qcs_rx_avail_data(qcs) && !(qcs->flags & QC_SF_DEM_FULL)) ||
|
||||||
unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
|
unlikely(fin_standalone && qcs_is_close_remote(qcs))) {
|
||||||
qcc_decode_qcs(qcc, qcs);
|
qcc_decode_qcs(qcc, qcs);
|
||||||
LIST_DEL_INIT(&qcs->el_recv);
|
LIST_DEL_INIT(&qcs->el_recv);
|
||||||
@ -1805,7 +1845,7 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
|
|||||||
*/
|
*/
|
||||||
qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
|
qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
|
||||||
qcs_close_remote(qcs);
|
qcs_close_remote(qcs);
|
||||||
qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
|
qcs_free_rxbuf(qcs, qcs->rx.buf);
|
||||||
|
|
||||||
out:
|
out:
|
||||||
if (qcc->glitches != prev_glitches)
|
if (qcc->glitches != prev_glitches)
|
||||||
@ -3345,7 +3385,7 @@ static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf,
|
|||||||
/* Ensure DEM_FULL is only set if there is available data to
|
/* Ensure DEM_FULL is only set if there is available data to
|
||||||
* ensure we never do unnecessary wakeup here.
|
* ensure we never do unnecessary wakeup here.
|
||||||
*/
|
*/
|
||||||
BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0));
|
BUG_ON(!qcs_rx_avail_data(qcs));
|
||||||
|
|
||||||
qcs->flags &= ~QC_SF_DEM_FULL;
|
qcs->flags &= ~QC_SF_DEM_FULL;
|
||||||
if (!(qcc->flags & QC_CF_ERRL)) {
|
if (!(qcc->flags & QC_CF_ERRL)) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user