MINOR: connections/mux: Add a new "subscribe" method.
Add a new "subscribe" method for connection, conn_stream and mux, so that upper layer can subscribe to them, to be called when the event happens. Right now, the only event implemented is "SUB_CAN_SEND", where the upper layer can register to be called back when it is possible to send data. The connection and conn_stream got a new "send_wait_list" entry, which required to move a few struct members around to maintain an efficient cache alignment (and actually this slightly improved performance).
This commit is contained in:
parent
e17c2d3e57
commit
6ff2039d13
@ -29,6 +29,7 @@
|
||||
#include <types/listener.h>
|
||||
#include <proto/fd.h>
|
||||
#include <proto/obj_type.h>
|
||||
#include <proto/task.h>
|
||||
|
||||
extern struct pool_head *pool_head_connection;
|
||||
extern struct pool_head *pool_head_connstream;
|
||||
@ -49,6 +50,7 @@ int make_proxy_line(char *buf, int buf_len, struct server *srv, struct connectio
|
||||
int make_proxy_line_v1(char *buf, int buf_len, struct sockaddr_storage *src, struct sockaddr_storage *dst);
|
||||
int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct connection *remote);
|
||||
|
||||
int conn_subscribe(struct connection *conn, int event_type, void *param);
|
||||
/* receive a NetScaler Client IP insertion header over a connection */
|
||||
int conn_recv_netscaler_cip(struct connection *conn, int flag);
|
||||
|
||||
@ -596,6 +598,7 @@ static inline void cs_init(struct conn_stream *cs, struct connection *conn)
|
||||
{
|
||||
cs->obj_type = OBJ_TYPE_CS;
|
||||
cs->flags = CS_FL_NONE;
|
||||
LIST_INIT(&cs->send_wait_list);
|
||||
cs->conn = conn;
|
||||
}
|
||||
|
||||
@ -621,6 +624,7 @@ static inline void conn_init(struct connection *conn)
|
||||
conn->destroy_cb = NULL;
|
||||
conn->proxy_netns = NULL;
|
||||
LIST_INIT(&conn->list);
|
||||
LIST_INIT(&conn->send_wait_list);
|
||||
}
|
||||
|
||||
/* sets <owner> as the connection's owner */
|
||||
|
@ -44,6 +44,10 @@ struct buffer;
|
||||
struct server;
|
||||
struct pipe;
|
||||
|
||||
struct wait_list {
|
||||
struct tasklet *task;
|
||||
struct list list;
|
||||
};
|
||||
|
||||
/* A connection handle is how we differenciate two connections on the lower
|
||||
* layers. It usually is a file descriptor but can be a connection id.
|
||||
@ -85,6 +89,9 @@ enum cs_shw_mode {
|
||||
CS_SHW_SILENT = 1, /* imminent close, don't notify peer */
|
||||
};
|
||||
|
||||
enum sub_event_type {
|
||||
SUB_CAN_SEND = 0x00000001, /* Schedule the tasklet when we can send more */
|
||||
};
|
||||
/* For each direction, we have a CO_FL_{SOCK,DATA}_<DIR>_ENA flag, which
|
||||
* indicates if read or write is desired in that direction for the respective
|
||||
* layers. The current status corresponding to the current layer being used is
|
||||
@ -287,6 +294,7 @@ struct xprt_ops {
|
||||
void (*destroy_srv)(struct server *srv); /* destroy a server context */
|
||||
int (*get_alpn)(const struct connection *conn, const char **str, int *len); /* get application layer name */
|
||||
char name[8]; /* transport layer name, zero-terminated */
|
||||
int (*subscribe)(struct connection *conn, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
|
||||
};
|
||||
|
||||
/* mux_ops describes the mux operations, which are to be performed at the
|
||||
@ -312,6 +320,7 @@ struct mux_ops {
|
||||
struct conn_stream *(*attach)(struct connection *); /* Create and attach a conn_stream to an outgoing connection */
|
||||
void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
|
||||
void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */
|
||||
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
|
||||
unsigned int flags; /* some flags characterizing the mux's capabilities (MX_FL_*) */
|
||||
char name[8]; /* mux layer name, zero-terminated */
|
||||
};
|
||||
@ -327,6 +336,7 @@ struct data_cb {
|
||||
void (*recv)(struct conn_stream *cs); /* data-layer recv callback */
|
||||
void (*send)(struct conn_stream *cs); /* data-layer send callback */
|
||||
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
|
||||
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
|
||||
char name[8]; /* data layer name, zero-terminated */
|
||||
};
|
||||
|
||||
@ -358,8 +368,9 @@ struct conn_src {
|
||||
*/
|
||||
struct conn_stream {
|
||||
enum obj_type obj_type; /* differentiates connection from applet context */
|
||||
unsigned int flags; /* CS_FL_* */
|
||||
struct connection *conn; /* xprt-level connection */
|
||||
unsigned int flags; /* CS_FL_* */
|
||||
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
|
||||
void *data; /* pointer to upper layer's entity (eg: stream interface) */
|
||||
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
|
||||
void *ctx; /* mux-specific context */
|
||||
@ -376,6 +387,7 @@ struct conn_stream {
|
||||
* connection being instanciated. It must be removed once done.
|
||||
*/
|
||||
struct connection {
|
||||
/* first cache line */
|
||||
enum obj_type obj_type; /* differentiates connection from applet context */
|
||||
unsigned char err_code; /* CO_ER_* */
|
||||
signed short send_proxy_ofs; /* <0 = offset to (re)send from the end, >0 = send all */
|
||||
@ -386,15 +398,20 @@ struct connection {
|
||||
void *xprt_ctx; /* general purpose pointer, initialized to NULL */
|
||||
void *mux_ctx; /* mux-specific context, initialized to NULL */
|
||||
void *owner; /* pointer to the owner session for incoming connections, or NULL */
|
||||
enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */
|
||||
|
||||
/* second cache line */
|
||||
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
|
||||
struct list list; /* attach point to various connection lists (idle, ...) */
|
||||
int xprt_st; /* transport layer state, initialized to zero */
|
||||
int tmp_early_data; /* 1st byte of early data, if any */
|
||||
int sent_early_data; /* Amount of early data we sent so far */
|
||||
union conn_handle handle; /* connection handle at the socket layer */
|
||||
enum obj_type *target; /* the target to connect to (server, proxy, applet, ...) */
|
||||
struct list list; /* attach point to various connection lists (idle, ...) */
|
||||
int (*xprt_done_cb)(struct connection *conn); /* callback to notify of end of handshake */
|
||||
void (*destroy_cb)(struct connection *conn); /* callback to notify of imminent death of the connection */
|
||||
const struct netns_entry *proxy_netns;
|
||||
int (*xprt_done_cb)(struct connection *conn); /* callback to notify of end of handshake */
|
||||
|
||||
/* third cache line and beyond */
|
||||
void (*destroy_cb)(struct connection *conn); /* callback to notify of imminent death of the connection */
|
||||
struct {
|
||||
struct sockaddr_storage from; /* client address, or address to spoof when connecting to the server */
|
||||
struct sockaddr_storage to; /* address reached by the client, or address to connect to */
|
||||
|
@ -128,6 +128,13 @@ void conn_fd_handler(int fd)
|
||||
*/
|
||||
flags = 0;
|
||||
conn->mux->send(conn);
|
||||
while (!LIST_ISEMPTY(&conn->send_wait_list)) {
|
||||
struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
}
|
||||
|
||||
/* The data transfer starts here and stops on error and handshakes. Note
|
||||
@ -323,6 +330,22 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int conn_subscribe(struct connection *conn, int event_type, void *param)
|
||||
{
|
||||
struct wait_list *sw;
|
||||
|
||||
switch (event_type) {
|
||||
case SUB_CAN_SEND:
|
||||
sw = param;
|
||||
if (LIST_ISEMPTY(&sw->list))
|
||||
LIST_ADDQ(&conn->send_wait_list, &sw->list);
|
||||
return 0;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return (-1);
|
||||
}
|
||||
|
||||
/* Drains possibly pending incoming data on the file descriptor attached to the
|
||||
* connection and update the connection's flags accordingly. This is used to
|
||||
* know whether we need to disable lingering on close. Returns non-zero if it
|
||||
|
36
src/mux_h2.c
36
src/mux_h2.c
@ -120,6 +120,7 @@ struct h2c {
|
||||
struct list send_list; /* list of blocked streams requesting to send */
|
||||
struct list fctl_list; /* list of streams blocked by connection's fctl */
|
||||
struct buffer_wait buf_wait; /* wait list for buffer allocations */
|
||||
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
|
||||
};
|
||||
|
||||
/* H2 stream state, in h2s->st */
|
||||
@ -379,6 +380,7 @@ static int h2c_frt_init(struct connection *conn)
|
||||
if (t)
|
||||
task_queue(t);
|
||||
conn_xprt_want_recv(conn);
|
||||
LIST_INIT(&h2c->send_wait_list);
|
||||
|
||||
/* mux->wake will be called soon to complete the operation */
|
||||
return 0;
|
||||
@ -2228,6 +2230,19 @@ static void h2_send(struct connection *conn)
|
||||
/* output closed, nothing to send, clear the buffer to release it */
|
||||
b_reset(&h2c->mbuf);
|
||||
}
|
||||
/* We're not full anymore, so we can wake any task that are waiting
|
||||
* for us.
|
||||
*/
|
||||
if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
|
||||
while (!LIST_ISEMPTY(&h2c->send_wait_list)) {
|
||||
struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
|
||||
struct wait_list *, list);
|
||||
LIST_DEL(&sw->list);
|
||||
LIST_INIT(&sw->list);
|
||||
tasklet_wakeup(sw->task);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/* callback called on any event by the connection handler.
|
||||
@ -3369,6 +3384,26 @@ static size_t h2s_frt_make_resp_data(struct h2s *h2s, const struct buffer *buf,
|
||||
return total;
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to subscribe to events, such as being able to send */
|
||||
static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
{
|
||||
struct wait_list *sw;
|
||||
struct h2s *h2s = cs->ctx;
|
||||
|
||||
switch (event_type) {
|
||||
case SUB_CAN_SEND:
|
||||
sw = param;
|
||||
if (LIST_ISEMPTY(&h2s->list) && LIST_ISEMPTY(&sw->list))
|
||||
LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
|
||||
return 0;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
|
||||
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to send data */
|
||||
static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_t count, int flags)
|
||||
{
|
||||
@ -3545,6 +3580,7 @@ const struct mux_ops h2_ops = {
|
||||
.update_poll = h2_update_poll,
|
||||
.rcv_buf = h2_rcv_buf,
|
||||
.snd_buf = h2_snd_buf,
|
||||
.subscribe = h2_subscribe,
|
||||
.attach = h2_attach,
|
||||
.detach = h2_detach,
|
||||
.shutr = h2_shutr,
|
||||
|
@ -177,6 +177,12 @@ static size_t mux_pt_snd_buf(struct conn_stream *cs, const struct buffer *buf, s
|
||||
return cs->conn->xprt->snd_buf(cs->conn, buf, count, flags);
|
||||
}
|
||||
|
||||
/* Called from the upper layer, to subscribe to events */
|
||||
static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param)
|
||||
{
|
||||
return (cs->conn->xprt->subscribe(cs->conn, event_type, param));
|
||||
}
|
||||
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
/* Send and get, using splicing */
|
||||
static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
|
||||
@ -206,6 +212,7 @@ const struct mux_ops mux_pt_ops = {
|
||||
.update_poll = mux_pt_update_poll,
|
||||
.rcv_buf = mux_pt_rcv_buf,
|
||||
.snd_buf = mux_pt_snd_buf,
|
||||
.subscribe = mux_pt_subscribe,
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
.rcv_pipe = mux_pt_rcv_pipe,
|
||||
.snd_pipe = mux_pt_snd_pipe,
|
||||
|
@ -424,6 +424,7 @@ static size_t raw_sock_from_buf(struct connection *conn, const struct buffer *bu
|
||||
static struct xprt_ops raw_sock = {
|
||||
.snd_buf = raw_sock_from_buf,
|
||||
.rcv_buf = raw_sock_to_buf,
|
||||
.subscribe = conn_subscribe,
|
||||
#if defined(CONFIG_HAP_LINUX_SPLICE)
|
||||
.rcv_pipe = raw_sock_to_pipe,
|
||||
.snd_pipe = raw_sock_from_pipe,
|
||||
|
@ -8895,6 +8895,7 @@ static struct cfg_kw_list cfg_kws = {ILH, {
|
||||
static struct xprt_ops ssl_sock = {
|
||||
.snd_buf = ssl_sock_from_buf,
|
||||
.rcv_buf = ssl_sock_to_buf,
|
||||
.subscribe = conn_subscribe,
|
||||
.rcv_pipe = NULL,
|
||||
.snd_pipe = NULL,
|
||||
.shutr = NULL,
|
||||
|
Loading…
x
Reference in New Issue
Block a user