diff --git a/src/ring.c b/src/ring.c index d798f27db..f56c8f168 100644 --- a/src/ring.c +++ b/src/ring.c @@ -402,7 +402,7 @@ int ring_attach_cli(struct ring *ring, struct appctx *appctx, uint flags) int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags, ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)) { - size_t head_ofs, tail_ofs; + size_t head_ofs, tail_ofs, prev_ofs; size_t ring_size; uint8_t *ring_area; struct ist v1, v2; @@ -450,11 +450,10 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t head_ofs = *ofs_ptr; BUG_ON(head_ofs >= ring_size); - /* dec readers count */ - do { - readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); - } while ((readers > RING_MAX_READERS || - !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers - 1)) && __ha_cpu_relax()); + /* we keep track of where we were and we don't release it before + * we've protected the next place. + */ + prev_ofs = head_ofs; /* in this loop, head_ofs always points to the counter byte that precedes * the message so that we can take our reference there if we have to @@ -494,12 +493,18 @@ int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t vp_data_to_ring(v1, v2, (char *)ring_area, ring_size, &head_ofs, &tail_ofs); - /* inc readers count */ + /* inc readers count on new place */ do { readers = _HA_ATOMIC_LOAD(ring_area + head_ofs); } while ((readers > RING_MAX_READERS || !_HA_ATOMIC_CAS(ring_area + head_ofs, &readers, readers + 1)) && __ha_cpu_relax()); + /* dec readers count on old place */ + do { + readers = _HA_ATOMIC_LOAD(ring_area + prev_ofs); + } while ((readers > RING_MAX_READERS || + !_HA_ATOMIC_CAS(ring_area + prev_ofs, &readers, readers - 1)) && __ha_cpu_relax()); + if (last_ofs_ptr) *last_ofs_ptr = tail_ofs; *ofs_ptr = head_ofs;