MEDIUM: quic: implement multi-buffered Tx streams
Complete the qc_stream_desc type to support multiple buffers on emission. The main objective is to increase the transfer throughput. The MUX is now able to transfer more data without having to wait ACKs. To implement this feature, a new type qc_stream_buf is declared. it encapsulates a buffer with a list element. New functions are defined to retrieve the current buffer, release it or allocate a new one. Each buffer is kept in the qc_stream_desc list until all of its data is acknowledged. On the MUX side, a qcs uses the current stream buffer to transfer data. Once the buffer is full, it is released and a new one will be allocated on a future qc_send() invocation.
This commit is contained in:
parent
b22c0460d6
commit
a456920491
@ -6,6 +6,18 @@
|
||||
#include <import/ebtree-t.h>
|
||||
|
||||
#include <haproxy/buf-t.h>
|
||||
#include <haproxy/list-t.h>
|
||||
|
||||
/* A QUIC STREAM buffer used for Tx.
|
||||
*
|
||||
* Currently, no offset is associated with an offset. The qc_stream_desc must
|
||||
* store them in order and keep the offset of the oldest buffer. The buffers
|
||||
* can be freed in strict order.
|
||||
*/
|
||||
struct qc_stream_buf {
|
||||
struct buffer buf; /* STREAM payload */
|
||||
struct list list; /* element for qc_stream_desc list */
|
||||
};
|
||||
|
||||
/* QUIC STREAM descriptor.
|
||||
*
|
||||
@ -20,7 +32,10 @@ struct qc_stream_desc {
|
||||
struct eb64_node by_id; /* node for quic_conn tree */
|
||||
struct quic_conn *qc;
|
||||
|
||||
struct buffer buf; /* buffer for STREAM data on Tx, emptied on acknowledge */
|
||||
struct list buf_list; /* buffers waiting for ACK, oldest offset first */
|
||||
struct qc_stream_buf *buf; /* current buffer used by the MUX */
|
||||
uint64_t buf_offset; /* base offset of current buffer */
|
||||
|
||||
uint64_t ack_offset; /* last acknowledged offset */
|
||||
struct eb_root acked_frms; /* ACK frames tree for non-contiguous ACK ranges */
|
||||
|
||||
|
@ -10,7 +10,13 @@ struct quic_conn;
|
||||
struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
|
||||
struct quic_conn *qc);
|
||||
void qc_stream_desc_release(struct qc_stream_desc *stream);
|
||||
int qc_stream_desc_free(struct qc_stream_desc *stream);
|
||||
void qc_stream_desc_free(struct qc_stream_desc *stream);
|
||||
|
||||
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
|
||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||
uint64_t offset);
|
||||
void qc_stream_buf_release(struct qc_stream_desc *stream);
|
||||
int qc_stream_desc_free_buf(struct qc_stream_desc *stream);
|
||||
|
||||
#endif /* USE_QUIC */
|
||||
#endif /* _HAPROXY_QUIC_STREAM_H_ */
|
||||
|
@ -627,16 +627,24 @@ static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
|
||||
struct qcc *qcc = qcs->qcc;
|
||||
struct quic_frame *frm;
|
||||
int head, total;
|
||||
uint64_t base_off;
|
||||
|
||||
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
|
||||
/* cf buffer schema in qcs_xfer_data */
|
||||
head = qcs->tx.sent_offset - qcs->stream->ack_offset;
|
||||
/* if ack_offset < buf_offset, it points to an older buffer. */
|
||||
base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
|
||||
BUG_ON(qcs->tx.sent_offset < base_off);
|
||||
|
||||
head = qcs->tx.sent_offset - base_off;
|
||||
total = b_data(out) - head;
|
||||
BUG_ON(total < 0);
|
||||
|
||||
if (!total) {
|
||||
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
|
||||
return 0;
|
||||
}
|
||||
BUG_ON(qcs->tx.sent_offset >= qcs->tx.offset);
|
||||
BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
|
||||
|
||||
frm = pool_zalloc(pool_head_quic_frame);
|
||||
if (!frm)
|
||||
@ -689,6 +697,7 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
||||
uint64_t diff;
|
||||
|
||||
BUG_ON(offset > qcs->tx.sent_offset);
|
||||
BUG_ON(offset >= qcs->tx.offset);
|
||||
|
||||
/* check if the STREAM frame has already been notified. It can happen
|
||||
* for retransmission.
|
||||
@ -707,8 +716,14 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
|
||||
/* increase offset on stream */
|
||||
qcs->tx.sent_offset += diff;
|
||||
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
|
||||
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
|
||||
if (qcs->tx.sent_offset == qcs->tx.msd)
|
||||
qcs->flags |= QC_SF_BLK_SFCTL;
|
||||
|
||||
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
|
||||
qc_stream_buf_release(qcs->stream);
|
||||
tasklet_wakeup(qcc->wait_event.tasklet);
|
||||
}
|
||||
}
|
||||
|
||||
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
|
||||
@ -847,7 +862,7 @@ static int qc_send(struct qcc *qcc)
|
||||
while (node) {
|
||||
struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
|
||||
struct buffer *buf = &qcs->tx.buf;
|
||||
struct buffer *out = &qcs->stream->buf;
|
||||
struct buffer *out = qc_stream_buf_get(qcs->stream);
|
||||
|
||||
/* TODO
|
||||
* for the moment, unidirectional streams have their own
|
||||
@ -864,6 +879,24 @@ static int qc_send(struct qcc *qcc)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!b_data(buf) && !out) {
|
||||
node = eb64_next(node);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!out) {
|
||||
struct connection *conn = qcc->conn;
|
||||
|
||||
out = qc_stream_buf_alloc(qcs->stream,
|
||||
qcs->tx.offset);
|
||||
if (!out) {
|
||||
conn->xprt->subscribe(conn, conn->xprt_ctx,
|
||||
SUB_RETRY_SEND, &qcc->wait_event);
|
||||
node = eb64_next(node);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* Prepare <out> buffer with data from <buf>. */
|
||||
if (b_data(buf)) {
|
||||
int ret = qcs_xfer_data(qcs, out, buf,
|
||||
@ -887,7 +920,7 @@ static int qc_send(struct qcc *qcc)
|
||||
}
|
||||
|
||||
/* Build a new STREAM frame with <out> buffer. */
|
||||
if (b_data(out)) {
|
||||
if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
|
||||
int ret;
|
||||
char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
|
||||
|
||||
|
@ -11,6 +11,9 @@
|
||||
|
||||
DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc",
|
||||
sizeof(struct qc_stream_desc));
|
||||
DECLARE_STATIC_POOL(pool_head_quic_conn_stream_buf, "qc_stream_buf",
|
||||
sizeof(struct qc_stream_buf));
|
||||
|
||||
|
||||
/* Allocate a new stream descriptor with id <id>. The caller is responsible to
|
||||
* store the stream in the appropriate tree.
|
||||
@ -30,7 +33,10 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
|
||||
eb64_insert(&qc->streams_by_id, &stream->by_id);
|
||||
stream->qc = qc;
|
||||
|
||||
stream->buf = BUF_NULL;
|
||||
stream->buf = NULL;
|
||||
LIST_INIT(&stream->buf_list);
|
||||
stream->buf_offset = 0;
|
||||
|
||||
stream->acked_frms = EB_ROOT;
|
||||
stream->ack_offset = 0;
|
||||
stream->release = 0;
|
||||
@ -50,46 +56,144 @@ void qc_stream_desc_release(struct qc_stream_desc *stream)
|
||||
stream->release = 1;
|
||||
stream->ctx = NULL;
|
||||
|
||||
if (!b_data(&stream->buf))
|
||||
if (LIST_ISEMPTY(&stream->buf_list)) {
|
||||
/* if no buffer left we can free the stream. */
|
||||
qc_stream_desc_free(stream);
|
||||
}
|
||||
else {
|
||||
/* A released stream does not use <stream.buf>. */
|
||||
stream->buf = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Free the stream descriptor <stream> buffer. This function should be used
|
||||
* when all its data have been acknowledged. If the stream was released by the
|
||||
* upper layer, the stream descriptor will be freed.
|
||||
*
|
||||
* Returns 0 if the stream was not freed else non-zero.
|
||||
/* Free the stream descriptor <stream> content. This function should be used
|
||||
* when all its data have been acknowledged or on full connection closing. It
|
||||
* must only be called after the stream is released.
|
||||
*/
|
||||
int qc_stream_desc_free(struct qc_stream_desc *stream)
|
||||
void qc_stream_desc_free(struct qc_stream_desc *stream)
|
||||
{
|
||||
b_free(&stream->buf);
|
||||
struct qc_stream_buf *buf, *buf_back;
|
||||
struct eb64_node *frm_node;
|
||||
unsigned int free_count = 0;
|
||||
|
||||
/* This function only deals with released streams. */
|
||||
BUG_ON(!stream->release);
|
||||
|
||||
/* free remaining stream buffers */
|
||||
list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
|
||||
if (!(b_data(&buf->buf))) {
|
||||
b_free(&buf->buf);
|
||||
LIST_DELETE(&buf->list);
|
||||
pool_free(pool_head_quic_conn_stream_buf, buf);
|
||||
|
||||
++free_count;
|
||||
}
|
||||
}
|
||||
|
||||
if (free_count)
|
||||
offer_buffers(NULL, free_count);
|
||||
|
||||
/* qc_stream_desc might be freed before having received all its ACKs.
|
||||
* This is the case if some frames were retransmitted.
|
||||
*/
|
||||
frm_node = eb64_first(&stream->acked_frms);
|
||||
while (frm_node) {
|
||||
struct quic_stream *strm;
|
||||
struct quic_frame *frm;
|
||||
|
||||
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
|
||||
|
||||
frm_node = eb64_next(frm_node);
|
||||
eb64_delete(&strm->offset);
|
||||
|
||||
frm = container_of(strm, struct quic_frame, stream);
|
||||
LIST_DELETE(&frm->list);
|
||||
quic_tx_packet_refdec(frm->pkt);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
}
|
||||
|
||||
eb64_delete(&stream->by_id);
|
||||
pool_free(pool_head_quic_conn_stream, stream);
|
||||
}
|
||||
|
||||
/* Return the current buffer of <stream>. May be NULL if not allocated. */
|
||||
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
|
||||
{
|
||||
if (!stream->buf)
|
||||
return NULL;
|
||||
|
||||
return &stream->buf->buf;
|
||||
}
|
||||
|
||||
/* Allocate a new current buffer for <stream>. This function is not allowed if
|
||||
* current buffer is not NULL prior to this call. The new buffer represents
|
||||
* stream payload at offset <offset>.
|
||||
*
|
||||
* Returns the buffer or NULL.
|
||||
*/
|
||||
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
|
||||
uint64_t offset)
|
||||
{
|
||||
/* current buffer must be released first before allocate a new one. */
|
||||
BUG_ON(stream->buf);
|
||||
|
||||
stream->buf_offset = offset;
|
||||
stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
|
||||
if (!stream->buf)
|
||||
return NULL;
|
||||
|
||||
stream->buf->buf = BUF_NULL;
|
||||
LIST_APPEND(&stream->buf_list, &stream->buf->list);
|
||||
|
||||
return &stream->buf->buf;
|
||||
}
|
||||
|
||||
/* Release the current buffer of <stream>. It will be kept internally by
|
||||
* the <stream>. The current buffer cannot be NULL.
|
||||
*/
|
||||
void qc_stream_buf_release(struct qc_stream_desc *stream)
|
||||
{
|
||||
/* current buffer already released */
|
||||
BUG_ON(!stream->buf);
|
||||
|
||||
stream->buf = NULL;
|
||||
stream->buf_offset = 0;
|
||||
}
|
||||
|
||||
/* Free the oldest buffer of <stream>. If the stream was already released and
|
||||
* does not references any buffers, it is freed. This function must only be
|
||||
* called if at least one buffer is present. Use qc_stream_desc_free() to free
|
||||
* a released stream.
|
||||
*
|
||||
* Returns 0 if the stream is not yet freed else 1.
|
||||
*/
|
||||
int qc_stream_desc_free_buf(struct qc_stream_desc *stream)
|
||||
{
|
||||
struct qc_stream_buf *stream_buf;
|
||||
|
||||
BUG_ON(LIST_ISEMPTY(&stream->buf_list) && !stream->buf);
|
||||
|
||||
if (!LIST_ISEMPTY(&stream->buf_list)) {
|
||||
/* get first buffer from buf_list and remove it */
|
||||
stream_buf = LIST_NEXT(&stream->buf_list, struct qc_stream_buf *,
|
||||
list);
|
||||
LIST_DELETE(&stream_buf->list);
|
||||
}
|
||||
else {
|
||||
stream_buf = stream->buf;
|
||||
stream->buf = NULL;
|
||||
}
|
||||
|
||||
b_free(&stream_buf->buf);
|
||||
pool_free(pool_head_quic_conn_stream_buf, stream_buf);
|
||||
|
||||
offer_buffers(NULL, 1);
|
||||
|
||||
if (stream->release) {
|
||||
/* Free frames still waiting for an ACK. Even if the stream buf
|
||||
* is NULL, some frames could still be not acknowledged. This
|
||||
* is notably the case for retransmission where multiple frames
|
||||
* points to the same buffer content.
|
||||
*/
|
||||
struct eb64_node *frm_node = eb64_first(&stream->acked_frms);
|
||||
while (frm_node) {
|
||||
struct quic_stream *strm;
|
||||
struct quic_frame *frm;
|
||||
|
||||
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
|
||||
|
||||
frm_node = eb64_next(frm_node);
|
||||
eb64_delete(&strm->offset);
|
||||
|
||||
frm = container_of(strm, struct quic_frame, stream);
|
||||
LIST_DELETE(&frm->list);
|
||||
quic_tx_packet_refdec(frm->pkt);
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
}
|
||||
|
||||
eb64_delete(&stream->by_id);
|
||||
pool_free(pool_head_quic_conn_stream, stream);
|
||||
|
||||
/* If qc_stream_desc is released and does not contains any buffers, we
|
||||
* can free it now.
|
||||
*/
|
||||
if (stream->release && LIST_ISEMPTY(&stream->buf_list)) {
|
||||
qc_stream_desc_free(stream);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -1457,6 +1457,12 @@ static int quic_stream_try_to_consume(struct quic_conn *qc,
|
||||
stream->ack_offset;
|
||||
stream->ack_offset += diff;
|
||||
b_del(strm->buf, diff);
|
||||
if (!b_data(strm->buf)) {
|
||||
if (qc_stream_desc_free_buf(stream)) {
|
||||
/* stream is freed here */
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
@ -1469,11 +1475,6 @@ static int quic_stream_try_to_consume(struct quic_conn *qc,
|
||||
pool_free(pool_head_quic_frame, frm);
|
||||
}
|
||||
|
||||
if (!b_data(&stream->buf)) {
|
||||
if (qc_stream_desc_free(stream))
|
||||
TRACE_PROTO("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1521,7 +1522,7 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
|
||||
stream_acked = 1;
|
||||
|
||||
if (!b_data(strm_frm->buf)) {
|
||||
if (qc_stream_desc_free(stream)) {
|
||||
if (qc_stream_desc_free_buf(stream)) {
|
||||
/* stream is freed at this stage,
|
||||
* no need to continue.
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user