BUG/MEDIUM: mux-spop: Remove frame parsing states from the SPOP connection state
SPOP_CS_FRAME_H and SPOP_CS_FRAME_P states, that were used to handle frame parsing, were removed. The demux process now relies on the demux stream ID to know if it is waiting for the frame header or the frame payload. Concretly, when the demux stream ID is not set (dsi == -1), the demuxer is waiting for the next frame header. Otherwise (dsi >= 0), it is waiting for the frame payload. It is especially important to be able to properly handle DISCONNECT frames sent by the agents. SPOP_CS_RUNNING state is introduced to know the hello handshake was finished and the SPOP connection is able to open SPOP streams and exchange NOTIFY/ACK frames with the agents. It depends on the following fixes: * MINOR: mux-spop: Don't set SPOP connection state to FRAME_H after ACK parsing * BUG/MINOR: mux-spop: Make the demux stream ID a signed integer This change will be mandatory for the next fix. It must be backported to 3.1 with the commits above.
This commit is contained in:
parent
6b0f7de4e3
commit
a3940614c2
@ -87,10 +87,9 @@ static forceinline char *spop_strm_show_flags(char *buf, size_t len, const char
|
||||
|
||||
/* SPOP connection state (spop_conn->state) */
|
||||
enum spop_conn_st {
|
||||
SPOP_CS_HA_HELLO = 0, /* init done, waiting for sending HELLO frame */
|
||||
SPOP_CS_AGENT_HELLO, /* HELLO frame sent, waiting for agent HELLO frame to define the connection settings */
|
||||
SPOP_CS_FRAME_H, /* HELLO handshake finished, waiting for a frame header */
|
||||
SPOP_CS_FRAME_P, /* Frame header received, waiting for a frame data */
|
||||
SPOP_CS_HA_HELLO = 0, /* init done, waiting for sending HELLO frame */
|
||||
SPOP_CS_AGENT_HELLO, /* HELLO frame sent, waiting for agent HELLO frame to define the connection settings */
|
||||
SPOP_CS_RUNNING, /* HELLO handshake finished, exchange NOTIFY/ACK frames */
|
||||
SPOP_CS_ERROR, /* send DISCONNECT frame to be able ti close the connection */
|
||||
SPOP_CS_CLOSING, /* DISCONNECT frame sent, waiting for the agent DISCONNECT frame before closing */
|
||||
SPOP_CS_CLOSED, /* Agent DISCONNECT frame received and close the connection ASAP */
|
||||
@ -103,8 +102,7 @@ static inline const char *spop_conn_st_to_str(enum spop_conn_st st)
|
||||
switch (st) {
|
||||
case SPOP_CS_HA_HELLO : return "HHL";
|
||||
case SPOP_CS_AGENT_HELLO: return "AHL";
|
||||
case SPOP_CS_FRAME_H : return "FRH";
|
||||
case SPOP_CS_FRAME_P : return "FRP";
|
||||
case SPOP_CS_RUNNING : return "RUN";
|
||||
case SPOP_CS_ERROR : return "ERR";
|
||||
case SPOP_CS_CLOSING : return "CLI";
|
||||
case SPOP_CS_CLOSED : return "CLO";
|
||||
|
@ -834,7 +834,7 @@ static inline int spop_conn_is_dead(struct spop_conn *spop_conn)
|
||||
{
|
||||
if (eb_is_empty(&spop_conn->streams_by_id) && /* don't close if streams exist */
|
||||
((spop_conn->flags & SPOP_CF_ERROR) || /* errors close immediately */
|
||||
(spop_conn->flags & SPOP_CF_ERR_PENDING && spop_conn->state < SPOP_CS_FRAME_H) || /* early error during connect */
|
||||
(spop_conn->flags & SPOP_CF_ERR_PENDING && spop_conn->state < SPOP_CS_RUNNING) || /* early error during connect */
|
||||
(spop_conn->state == SPOP_CS_CLOSED && !spop_conn->task) ||/* a timeout stroke earlier */
|
||||
(!(spop_conn->conn->owner)) || /* Nobody's left to take care of the connection, drop it now */
|
||||
(!br_data(spop_conn->mbuf) && /* mux buffer empty, also process clean events below */
|
||||
@ -1782,6 +1782,7 @@ static int spop_conn_handle_hello(struct spop_conn *spop_conn)
|
||||
TRACE_PROTO("SPOP AGENT HELLO frame rcvd", SPOP_EV_RX_FRAME|SPOP_EV_RX_HELLO, spop_conn->conn, 0, 0, (size_t[]){spop_conn->dfl});
|
||||
b_del(&spop_conn->dbuf, spop_conn->dfl);
|
||||
spop_conn->dfl = 0;
|
||||
spop_conn->state = SPOP_CS_RUNNING;
|
||||
spop_wake_unassigned_streams(spop_conn);
|
||||
TRACE_LEAVE(SPOP_EV_RX_FRAME|SPOP_EV_RX_HELLO, spop_conn->conn);
|
||||
return 1;
|
||||
@ -2064,7 +2065,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
if (spop_conn->state >= SPOP_CS_ERROR)
|
||||
goto out;
|
||||
|
||||
if (unlikely(spop_conn->state < SPOP_CS_FRAME_H)) {
|
||||
if (unlikely(spop_conn->state < SPOP_CS_RUNNING)) {
|
||||
if (spop_conn->state == SPOP_CS_HA_HELLO) {
|
||||
TRACE_STATE("waiting AGENT HELLO frame to be sent", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR|SPOP_EV_RX_HELLO, spop_conn->conn);
|
||||
goto out;
|
||||
@ -2113,7 +2114,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
break;
|
||||
}
|
||||
|
||||
if (spop_conn->state == SPOP_CS_FRAME_H) {
|
||||
if (spop_conn->dsi == -1) {
|
||||
TRACE_PROTO("receiving SPOP frame header", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR, spop_conn->conn);
|
||||
if (!spop_get_frame_hdr(&spop_conn->dbuf, &hdr)) {
|
||||
spop_conn->flags |= SPOP_CF_DEM_SHORT_READ;
|
||||
@ -2133,8 +2134,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
spop_conn->dft = hdr.type;
|
||||
spop_conn->dfl = hdr.len;
|
||||
spop_conn->dff = hdr.flags;
|
||||
spop_conn->state = SPOP_CS_FRAME_P;
|
||||
TRACE_STATE("SPOP frame header rcvd, switching to FRAME_P", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR, spop_conn->conn);
|
||||
TRACE_STATE("SPOP frame header rcvd", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR, spop_conn->conn);
|
||||
|
||||
/* Perform sanity check on the frame header */
|
||||
if (!(spop_conn->dff & SPOP_FRM_FL_FIN)) {
|
||||
@ -2211,15 +2211,15 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
|
||||
switch (spop_conn->dft) {
|
||||
case SPOP_FRM_T_AGENT_HELLO:
|
||||
if (spop_conn->state == SPOP_CS_FRAME_P)
|
||||
if (spop_conn->dsi == 0)
|
||||
ret = spop_conn_handle_hello(spop_conn);
|
||||
break;
|
||||
case SPOP_FRM_T_AGENT_DISCON:
|
||||
if (spop_conn->state == SPOP_CS_FRAME_P)
|
||||
if (spop_conn->dsi == 0)
|
||||
ret = spop_conn_handle_disconnect(spop_conn);
|
||||
break;
|
||||
case SPOP_FRM_T_AGENT_ACK:
|
||||
if (spop_conn->state == SPOP_CS_FRAME_P)
|
||||
if (spop_conn->dsi > 0)
|
||||
ret = spop_conn_handle_ack(spop_conn, spop_strm);
|
||||
break;
|
||||
default:
|
||||
@ -2244,7 +2244,7 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
if (ret <= 0)
|
||||
break;
|
||||
|
||||
if (spop_conn->state != SPOP_CS_FRAME_H) {
|
||||
if (spop_conn->dsi >= 0) {
|
||||
if (spop_conn->dfl) {
|
||||
TRACE_DEVEL("skipping remaining frame payload", SPOP_EV_RX_FRAME, spop_conn->conn, spop_strm);
|
||||
ret = MIN(b_data(&spop_conn->dbuf), spop_conn->dfl);
|
||||
@ -2252,8 +2252,8 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
spop_conn->dfl -= ret;
|
||||
}
|
||||
if (!spop_conn->dfl) {
|
||||
TRACE_STATE("switching to FRAME_H", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR, spop_conn->conn);
|
||||
spop_conn->state = SPOP_CS_FRAME_H;
|
||||
TRACE_STATE("reset dsi", SPOP_EV_RX_FRAME|SPOP_EV_RX_FHDR, spop_conn->conn);
|
||||
spop_conn->dsi = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2265,11 +2265,11 @@ static void spop_process_demux(struct spop_conn *spop_conn)
|
||||
}
|
||||
|
||||
if (spop_conn->flags & SPOP_CF_ERROR)
|
||||
spop_conn_report_term_evt(spop_conn, ((eb_is_empty(&spop_conn->streams_by_id) && (spop_conn->state == SPOP_CS_FRAME_H))
|
||||
spop_conn_report_term_evt(spop_conn, ((eb_is_empty(&spop_conn->streams_by_id) && (spop_conn->state == SPOP_CS_RUNNING) && spop_conn->dsi == -1)
|
||||
? muxc_tevt_type_rcv_err
|
||||
: muxc_tevt_type_truncated_rcv_err));
|
||||
else if (spop_conn->flags & SPOP_CF_END_REACHED)
|
||||
spop_conn_report_term_evt(spop_conn, ((eb_is_empty(&spop_conn->streams_by_id) && (spop_conn->state == SPOP_CS_FRAME_H))
|
||||
spop_conn_report_term_evt(spop_conn, ((eb_is_empty(&spop_conn->streams_by_id) && (spop_conn->state == SPOP_CS_RUNNING) && spop_conn->dsi == -1)
|
||||
? muxc_tevt_type_shutr
|
||||
: muxc_tevt_type_truncated_shutr));
|
||||
|
||||
@ -2299,7 +2299,7 @@ static int spop_process_mux(struct spop_conn *spop_conn)
|
||||
{
|
||||
TRACE_ENTER(SPOP_EV_SPOP_CONN_WAKE, spop_conn->conn);
|
||||
|
||||
if (unlikely(spop_conn->state < SPOP_CS_FRAME_H)) {
|
||||
if (unlikely(spop_conn->state < SPOP_CS_RUNNING)) {
|
||||
if (unlikely(spop_conn->state == SPOP_CS_HA_HELLO)) {
|
||||
TRACE_PROTO("sending SPOP HAPROXY HELLO fraame", SPOP_EV_TX_FRAME, spop_conn->conn);
|
||||
if (unlikely(!spop_conn_send_hello(spop_conn)))
|
||||
@ -2308,7 +2308,7 @@ static int spop_process_mux(struct spop_conn *spop_conn)
|
||||
TRACE_STATE("waiting for SPOP AGENT HELLO reply", SPOP_EV_TX_FRAME|SPOP_EV_RX_FRAME, spop_conn->conn);
|
||||
}
|
||||
/* need to wait for the other side */
|
||||
if (spop_conn->state < SPOP_CS_FRAME_H)
|
||||
if (spop_conn->state < SPOP_CS_RUNNING)
|
||||
goto done;
|
||||
}
|
||||
|
||||
@ -2490,7 +2490,7 @@ static int spop_send(struct spop_conn *spop_conn)
|
||||
/* We're not full anymore, so we can wake any task that are waiting
|
||||
* for us.
|
||||
*/
|
||||
if (!(spop_conn->flags & (SPOP_CF_MUX_MFULL | SPOP_CF_DEM_MROOM)) && spop_conn->state >= SPOP_CS_FRAME_H) {
|
||||
if (!(spop_conn->flags & (SPOP_CF_MUX_MFULL | SPOP_CF_DEM_MROOM)) && spop_conn->state >= SPOP_CS_RUNNING) {
|
||||
spop_conn->flags &= ~SPOP_CF_WAIT_INLIST;
|
||||
spop_resume_each_sending_spop_strm(spop_conn, &spop_conn->send_list);
|
||||
}
|
||||
@ -2665,7 +2665,7 @@ static int spop_ctl(struct connection *conn, enum mux_ctl_type mux_ctl, void *ou
|
||||
|
||||
switch (mux_ctl) {
|
||||
case MUX_CTL_STATUS:
|
||||
if ((spop_conn->state >= SPOP_CS_FRAME_H && spop_conn->state < SPOP_CS_ERROR) &&
|
||||
if (spop_conn->state == SPOP_CS_RUNNING &&
|
||||
!(spop_conn->flags & (SPOP_CF_ERROR|SPOP_CF_ERR_PENDING|SPOP_CF_END_REACHED|SPOP_CF_RCVD_SHUT|SPOP_CF_DISCO_SENT|SPOP_CF_DISCO_FAILED)))
|
||||
ret |= MUX_STATUS_READY;
|
||||
return ret;
|
||||
@ -3328,7 +3328,7 @@ static size_t spop_snd_buf(struct stconn *sc, struct buffer *buf, size_t count,
|
||||
}
|
||||
spop_strm->flags &= ~SPOP_SF_NOTIFIED;
|
||||
|
||||
if (spop_conn->state < SPOP_CS_FRAME_H) {
|
||||
if (spop_conn->state < SPOP_CS_RUNNING) {
|
||||
TRACE_DEVEL("connection not ready, leaving", SPOP_EV_STRM_SEND|SPOP_EV_SPOP_STRM_BLK, spop_conn->conn, spop_strm);
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user