runtime, internal/synctest, sync: associate WaitGroups with bubbles

Add support to internal/synctest for managing associations between
arbitrary pointers and synctest bubbles. (Implemented internally to
the runtime package by attaching a special to the pointer.)

Associate WaitGroups with bubbles.
Since WaitGroups don't have a constructor,
perform the association when Add is called.
All Add calls must be made from within the same bubble,
or outside any bubble.

When a bubbled goroutine calls WaitGroup.Wait,
the wait is durably blocking iff the WaitGroup is associated
with the current bubble.

Change-Id: I77e2701e734ac2fa2b32b28d5b0c853b7b2825c9
Reviewed-on: https://go-review.googlesource.com/c/go/+/676656
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Damien Neil <dneil@google.com>
This commit is contained in:
Damien Neil 2025-05-20 15:56:43 -07:00 committed by Gopher Robot
parent 3b77085b40
commit b170c7e94c
10 changed files with 324 additions and 31 deletions

View File

@ -103,6 +103,7 @@ var depsRules = `
< sync/atomic
< internal/sync
< weak
< internal/synctest
< sync
< internal/bisect
< internal/godebug
@ -136,9 +137,6 @@ var depsRules = `
unicode !< path;
RUNTIME
< internal/synctest;
# SYSCALL is RUNTIME plus the packages necessary for basic system calls.
RUNTIME, unicode/utf8, unicode/utf16, internal/synctest
< internal/syscall/windows/sysdll, syscall/js

View File

@ -8,7 +8,7 @@
package synctest
import (
_ "unsafe" // for go:linkname
"unsafe"
)
//go:linkname Run
@ -17,6 +17,36 @@ func Run(f func())
//go:linkname Wait
func Wait()
// IsInBubble reports whether the current goroutine is in a bubble.
//
//go:linkname IsInBubble
func IsInBubble() bool
// Associate associates p with the current bubble.
// It returns false if p has an existing association with a different bubble.
func Associate[T any](p *T) (ok bool) {
return associate(unsafe.Pointer(p))
}
//go:linkname associate
func associate(p unsafe.Pointer) bool
// Disassociate disassociates p from any bubble.
func Disassociate[T any](p *T) {
disassociate(unsafe.Pointer(p))
}
//go:linkname disassociate
func disassociate(b unsafe.Pointer)
// IsAssociated reports whether p is associated with the current bubble.
func IsAssociated[T any](p *T) bool {
return isAssociated(unsafe.Pointer(p))
}
//go:linkname isAssociated
func isAssociated(p unsafe.Pointer) bool
//go:linkname acquire
func acquire() any

View File

@ -7,11 +7,14 @@ package synctest_test
import (
"fmt"
"internal/synctest"
"internal/testenv"
"iter"
"os"
"reflect"
"runtime"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -523,7 +526,7 @@ func TestReflectFuncOf(t *testing.T) {
})
}
func TestWaitGroup(t *testing.T) {
func TestWaitGroupInBubble(t *testing.T) {
synctest.Run(func() {
var wg sync.WaitGroup
wg.Add(1)
@ -540,6 +543,83 @@ func TestWaitGroup(t *testing.T) {
})
}
func TestWaitGroupOutOfBubble(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
donec := make(chan struct{})
go synctest.Run(func() {
// Since wg.Add was called outside the bubble, Wait is not durably blocking
// and this waits until wg.Done is called below.
wg.Wait()
close(donec)
})
select {
case <-donec:
t.Fatalf("synctest.Run finished before WaitGroup.Done called")
case <-time.After(1 * time.Millisecond):
}
wg.Done()
<-donec
}
func TestWaitGroupMovedIntoBubble(t *testing.T) {
wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
var wg sync.WaitGroup
wg.Add(1)
synctest.Run(func() {
wg.Add(1)
})
})
}
func TestWaitGroupMovedOutOfBubble(t *testing.T) {
wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() {
var wg sync.WaitGroup
synctest.Run(func() {
wg.Add(1)
})
wg.Add(1)
})
}
func TestWaitGroupMovedBetweenBubblesWithNonZeroCount(t *testing.T) {
wantFatal(t, "fatal error: sync: WaitGroup.Add called from multiple synctest bubbles", func() {
var wg sync.WaitGroup
synctest.Run(func() {
wg.Add(1)
})
synctest.Run(func() {
wg.Add(1)
})
})
}
func TestWaitGroupMovedBetweenBubblesWithZeroCount(t *testing.T) {
var wg sync.WaitGroup
synctest.Run(func() {
wg.Add(1)
wg.Done()
})
synctest.Run(func() {
// Reusing the WaitGroup is safe, because its count is zero.
wg.Add(1)
wg.Done()
})
}
func TestWaitGroupMovedBetweenBubblesAfterWait(t *testing.T) {
var wg sync.WaitGroup
synctest.Run(func() {
wg.Go(func() {})
wg.Wait()
})
synctest.Run(func() {
// Reusing the WaitGroup is safe, because its count is zero.
wg.Go(func() {})
wg.Wait()
})
}
func TestHappensBefore(t *testing.T) {
// Use two parallel goroutines accessing different vars to ensure that
// we correctly account for multiple goroutines in the bubble.
@ -647,3 +727,23 @@ func wantPanic(t *testing.T, want string) {
t.Errorf("got no panic, want one")
}
}
func wantFatal(t *testing.T, want string, f func()) {
t.Helper()
if os.Getenv("GO_WANT_HELPER_PROCESS") == "1" {
f()
return
}
cmd := testenv.Command(t, testenv.Executable(t), "-test.run=^"+t.Name()+"$")
cmd = testenv.CleanCmdEnv(cmd)
cmd.Env = append(cmd.Env, "GO_WANT_HELPER_PROCESS=1")
out, err := cmd.CombinedOutput()
if err == nil {
t.Errorf("expected test function to panic, but test returned successfully")
}
if !strings.Contains(string(out), want) {
t.Errorf("wanted test output contaiing %q; got %q", want, string(out))
}
}

View File

@ -223,6 +223,7 @@ type mheap struct {
specialReachableAlloc fixalloc // allocator for specialReachable
specialPinCounterAlloc fixalloc // allocator for specialPinCounter
specialWeakHandleAlloc fixalloc // allocator for specialWeakHandle
specialBubbleAlloc fixalloc // allocator for specialBubble
speciallock mutex // lock for special record allocators.
arenaHintAlloc fixalloc // allocator for arenaHints
@ -799,6 +800,7 @@ func (h *mheap) init() {
h.specialReachableAlloc.init(unsafe.Sizeof(specialReachable{}), nil, nil, &memstats.other_sys)
h.specialPinCounterAlloc.init(unsafe.Sizeof(specialPinCounter{}), nil, nil, &memstats.other_sys)
h.specialWeakHandleAlloc.init(unsafe.Sizeof(specialWeakHandle{}), nil, nil, &memstats.gcMiscSys)
h.specialBubbleAlloc.init(unsafe.Sizeof(specialBubble{}), nil, nil, &memstats.other_sys)
h.arenaHintAlloc.init(unsafe.Sizeof(arenaHint{}), nil, nil, &memstats.other_sys)
// Don't zero mspan allocations. Background sweeping can
@ -2003,6 +2005,8 @@ const (
// _KindSpecialCheckFinalizer adds additional context to a finalizer or cleanup.
// Used only if debug.checkfinalizers != 0.
_KindSpecialCheckFinalizer = 8
// _KindSpecialBubble is used to associate objects with synctest bubbles.
_KindSpecialBubble = 9
)
type special struct {
@ -2839,6 +2843,11 @@ func freeSpecial(s *special, p unsafe.Pointer, size uintptr) {
lock(&mheap_.speciallock)
mheap_.specialTinyBlockAlloc.free(unsafe.Pointer(st))
unlock(&mheap_.speciallock)
case _KindSpecialBubble:
st := (*specialBubble)(unsafe.Pointer(s))
lock(&mheap_.speciallock)
mheap_.specialBubbleAlloc.free(unsafe.Pointer(st))
unlock(&mheap_.speciallock)
default:
throw("bad special kind")
panic("not reached")

View File

@ -1096,6 +1096,7 @@ const (
waitReasonSynctestChanReceive // "chan receive (synctest)"
waitReasonSynctestChanSend // "chan send (synctest)"
waitReasonSynctestSelect // "select (synctest)"
waitReasonSynctestWaitGroupWait // "sync.WaitGroup.Wait (synctest)"
waitReasonCleanupWait // "cleanup wait"
)
@ -1145,6 +1146,7 @@ var waitReasonStrings = [...]string{
waitReasonSynctestChanReceive: "chan receive (synctest)",
waitReasonSynctestChanSend: "chan send (synctest)",
waitReasonSynctestSelect: "select (synctest)",
waitReasonSynctestWaitGroupWait: "sync.WaitGroup.Wait (synctest)",
waitReasonCleanupWait: "cleanup wait",
}
@ -1190,18 +1192,18 @@ func (w waitReason) isIdleInSynctest() bool {
// isIdleInSynctest indicates that a goroutine is considered idle by synctest.Wait.
var isIdleInSynctest = [len(waitReasonStrings)]bool{
waitReasonChanReceiveNilChan: true,
waitReasonChanSendNilChan: true,
waitReasonSelectNoCases: true,
waitReasonSleep: true,
waitReasonSyncCondWait: true,
waitReasonSyncWaitGroupWait: true,
waitReasonCoroutine: true,
waitReasonSynctestRun: true,
waitReasonSynctestWait: true,
waitReasonSynctestChanReceive: true,
waitReasonSynctestChanSend: true,
waitReasonSynctestSelect: true,
waitReasonChanReceiveNilChan: true,
waitReasonChanSendNilChan: true,
waitReasonSelectNoCases: true,
waitReasonSleep: true,
waitReasonSyncCondWait: true,
waitReasonSynctestWaitGroupWait: true,
waitReasonCoroutine: true,
waitReasonSynctestRun: true,
waitReasonSynctestWait: true,
waitReasonSynctestChanReceive: true,
waitReasonSynctestChanSend: true,
waitReasonSynctestSelect: true,
}
var (

View File

@ -106,8 +106,12 @@ func sync_runtime_SemacquireRWMutex(addr *uint32, lifo bool, skipframes int) {
}
//go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup
func sync_runtime_SemacquireWaitGroup(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait)
func sync_runtime_SemacquireWaitGroup(addr *uint32, synctestDurable bool) {
reason := waitReasonSyncWaitGroupWait
if synctestDurable {
reason = waitReasonSynctestWaitGroupWait
}
semacquire1(addr, false, semaBlockProfile, 0, reason)
}
//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease

View File

@ -5,6 +5,7 @@
package runtime
import (
"internal/runtime/atomic"
"internal/runtime/sys"
"unsafe"
)
@ -13,12 +14,13 @@ import (
type synctestBubble struct {
mu mutex
timers timers
now int64 // current fake time
root *g // caller of synctest.Run
waiter *g // caller of synctest.Wait
main *g // goroutine started by synctest.Run
waiting bool // true if a goroutine is calling synctest.Wait
done bool // true if main has exited
id uint64 // unique id
now int64 // current fake time
root *g // caller of synctest.Run
waiter *g // caller of synctest.Wait
main *g // goroutine started by synctest.Run
waiting bool // true if a goroutine is calling synctest.Wait
done bool // true if main has exited
// The bubble is active (not blocked) so long as running > 0 || active > 0.
//
@ -163,6 +165,8 @@ func (bubble *synctestBubble) raceaddr() unsafe.Pointer {
return unsafe.Pointer(bubble)
}
var bubbleGen atomic.Uint64 // bubble ID counter
//go:linkname synctestRun internal/synctest.Run
func synctestRun(f func()) {
if debug.asynctimerchan.Load() != 0 {
@ -174,6 +178,7 @@ func synctestRun(f func()) {
panic("synctest.Run called from within a synctest bubble")
}
bubble := &synctestBubble{
id: bubbleGen.Add(1),
total: 1,
running: 1,
root: gp,
@ -313,6 +318,11 @@ func synctestwait_c(gp *g, _ unsafe.Pointer) bool {
return true
}
//go:linkname synctest_isInBubble internal/synctest.IsInBubble
func synctest_isInBubble() bool {
return getg().bubble != nil
}
//go:linkname synctest_acquire internal/synctest.acquire
func synctest_acquire() any {
if bubble := getg().bubble; bubble != nil {
@ -339,3 +349,85 @@ func synctest_inBubble(bubble any, f func()) {
}()
f()
}
// specialBubble is a special used to associate objects with bubbles.
type specialBubble struct {
_ sys.NotInHeap
special special
bubbleid uint64
}
// getOrSetBubbleSpecial checks the special record for p's bubble membership.
//
// If add is true and p is not associated with any bubble,
// it adds a special record for p associating it with bubbleid.
//
// It returns ok==true if p is associated with bubbleid
// (including if a new association was added),
// and ok==false if not.
func getOrSetBubbleSpecial(p unsafe.Pointer, bubbleid uint64, add bool) (ok bool) {
span := spanOfHeap(uintptr(p))
if span == nil {
throw("getOrSetBubbleSpecial on invalid pointer")
}
// Ensure that the span is swept.
// Sweeping accesses the specials list w/o locks, so we have
// to synchronize with it. And it's just much safer.
mp := acquirem()
span.ensureSwept()
offset := uintptr(p) - span.base()
lock(&span.speciallock)
// Find splice point, check for existing record.
iter, exists := span.specialFindSplicePoint(offset, _KindSpecialBubble)
if exists {
// p is already associated with a bubble.
// Return true iff it's the same bubble.
s := (*specialBubble)((unsafe.Pointer)(*iter))
ok = s.bubbleid == bubbleid
} else if add {
// p is not associated with a bubble,
// and we've been asked to add an association.
s := (*specialBubble)(mheap_.specialBubbleAlloc.alloc())
s.bubbleid = bubbleid
s.special.kind = _KindSpecialBubble
s.special.offset = offset
s.special.next = *iter
*iter = (*special)(unsafe.Pointer(s))
spanHasSpecials(span)
ok = true
} else {
// p is not associated with a bubble.
ok = false
}
unlock(&span.speciallock)
releasem(mp)
return ok
}
// synctest_associate associates p with the current bubble.
// It returns false if p is already associated with a different bubble.
//
//go:linkname synctest_associate internal/synctest.associate
func synctest_associate(p unsafe.Pointer) (ok bool) {
return getOrSetBubbleSpecial(p, getg().bubble.id, true)
}
// synctest_disassociate disassociates p from its bubble.
//
//go:linkname synctest_disassociate internal/synctest.disassociate
func synctest_disassociate(p unsafe.Pointer) {
removespecial(p, _KindSpecialBubble)
}
// synctest_isAssociated reports whether p is associated with the current bubble.
//
//go:linkname synctest_isAssociated internal/synctest.isAssociated
func synctest_isAssociated(p unsafe.Pointer) bool {
return getOrSetBubbleSpecial(p, getg().bubble.id, false)
}

View File

@ -14,7 +14,7 @@ import "unsafe"
func runtime_Semacquire(s *uint32)
// SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait.
func runtime_SemacquireWaitGroup(s *uint32)
func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool)
// Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended
// Mutexes and RWMutexes.

View File

@ -6,6 +6,7 @@ package sync
import (
"internal/race"
"internal/synctest"
"sync/atomic"
"unsafe"
)
@ -47,10 +48,17 @@ import (
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
// Bits (high to low):
// bits[0:32] counter
// bits[32] flag: synctest bubble membership
// bits[33:64] wait count
state atomic.Uint64
sema uint32
}
// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
const waitGroupBubbleFlag = 0x8000_0000
// Add adds delta, which may be negative, to the [WaitGroup] task counter.
// If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.
// If the counter goes negative, Add panics.
@ -75,9 +83,27 @@ func (wg *WaitGroup) Add(delta int) {
race.Disable()
defer race.Enable()
}
if synctest.IsInBubble() {
// If Add is called from within a bubble, then all Add calls must be made
// from the same bubble.
if !synctest.Associate(wg) {
// wg is already associated with a different bubble.
fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
} else {
state := wg.state.Or(waitGroupBubbleFlag)
if state != 0 && state&waitGroupBubbleFlag == 0 {
// Add has been called from outside this bubble.
fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
}
}
}
state := wg.state.Add(uint64(delta) << 32)
if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() {
// Add has been called from within a synctest bubble (and we aren't in one).
fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
}
v := int32(state >> 32)
w := uint32(state)
w := uint32(state & 0x7fffffff)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
@ -90,6 +116,13 @@ func (wg *WaitGroup) Add(delta int) {
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v == 0 && state&waitGroupBubbleFlag != 0 {
// Disassociate the WaitGroup from its bubble.
synctest.Disassociate(wg)
if w == 0 {
wg.state.Store(0)
}
}
if v > 0 || w == 0 {
return
}
@ -147,7 +180,21 @@ func (wg *WaitGroup) Wait() {
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
runtime_SemacquireWaitGroup(&wg.sema)
synctestDurable := false
if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
if race.Enabled {
race.Enable()
}
if synctest.IsAssociated(wg) {
// Add was called within the current bubble,
// so this Wait is durably blocking.
synctestDurable = true
}
if race.Enabled {
race.Disable()
}
}
runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}

View File

@ -72,10 +72,17 @@
// - a blocking select statement where every case is a channel created
// within the bubble
// - [sync.Cond.Wait]
// - [sync.WaitGroup.Wait]
// - [sync.WaitGroup.Wait], when [sync.WaitGroup.Add] was called within the bubble
// - [time.Sleep]
//
// Locking a [sync.Mutex] or [sync.RWMutex] is not durably blocking.
// Operations not in the above list are not durably blocking.
// In particular, the following operations may block a goroutine,
// but are not durably blocking because the goroutine can be unblocked
// by an event occurring outside its bubble:
//
// - locking a [sync.Mutex] or [sync.RWMutex]
// - blocking on I/O, such as reading from a network socket
// - system calls
//
// # Isolation
//
@ -83,6 +90,10 @@
// is associated with it. Operating on a bubbled channel, timer, or
// ticker from outside the bubble panics.
//
// A [sync.WaitGroup] becomes associated with a bubble on the first
// call to Add or Go. Once a WaitGroup is associated with a bubble,
// calling Add or Go from outside that bubble panics.
//
// Cleanup functions and finalizers registered with
// [runtime.AddCleanup] and [runtime.SetFinalizer]
// run outside of any bubble.