From 2192983ffd6ca0dbfae245ba1fdd90d768f1b89c Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Wed, 28 Feb 2024 17:18:34 +0100 Subject: [PATCH] MEDIUM: ring: protect the reader's positions against writers The reader now needs to protect the positions it's reading. This is already done via the readers counter at the beginning of messages, but as long as the lock is present, this counter is decremented before starting to parse messages, and incremented at the end. We must now do that in reverse, first protect the end of the messages, and only then remove ourselves from the already processed messages, so that at no point could a writer pass over and possibly overwrite data we're currently watching. --- src/ring.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/ring.c b/src/ring.c index d798f27db..f56c8f168 100644 --- a/src/ring.c +++ b/src/ring.c @@ -402,7 +402,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)) { - size_t head_ofs, tail_ofs; + size_t head_ofs, tail_ofs, prev_ofs; size_t ring_size; uint8_t *ring_area; struct ist v1, v2; @@ -450,11 +450,10 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t head_ofs = *ofs_ptr; BUG_ON(head_ofs >= ring_size); - /* dec readers count */ - do { - readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); - } while ((readers > RING_MAX_READERS || - !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax()); + /* we keep track of where we were and we don't release it before + * we've protected the next place. + */ + prev_ofs = head_ofs; /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to @@ -494,12 +493,18 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); - /* inc readers count */ + /* inc readers count on new place */ do { readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); } while ((readers > RING_MAX_READERS || !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); + /* dec readers count on old place */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax()); + if (last_ofs_ptr) *last_ofs_ptr = tail_ofs; *ofs_ptr = head_ofs;