diff --git a/src/go/build/deps_test.go b/src/go/build/deps_test.go index b2668a3d7d..6d92542e31 100644 --- a/src/go/build/deps_test.go +++ b/src/go/build/deps_test.go @@ -103,6 +103,7 @@ var depsRules = ` < sync/atomic < internal/sync < weak + < internal/synctest < sync < internal/bisect < internal/godebug @@ -136,9 +137,6 @@ var depsRules = ` unicode !< path; - RUNTIME - < internal/synctest; - # SYSCALL is RUNTIME plus the packages necessary for basic system calls. RUNTIME, unicode/utf8, unicode/utf16, internal/synctest < internal/syscall/windows/sysdll, syscall/js diff --git a/src/internal/synctest/synctest.go b/src/internal/synctest/synctest.go index 19190d30f1..4d7fa3730c 100644 --- a/src/internal/synctest/synctest.go +++ b/src/internal/synctest/synctest.go @@ -8,7 +8,7 @@ package synctest import ( - _ "unsafe" // for go:linkname + "unsafe" ) //go:linkname Run @@ -17,6 +17,36 @@ func Run(f func()) //go:linkname Wait func Wait() +// IsInBubble reports whether the current goroutine is in a bubble. +// +//go:linkname IsInBubble +func IsInBubble() bool + +// Associate associates p with the current bubble. +// It returns false if p has an existing association with a different bubble. +func Associate[T any](p *T) (ok bool) { + return associate(unsafe.Pointer(p)) +} + +//go:linkname associate +func associate(p unsafe.Pointer) bool + +// Disassociate disassociates p from any bubble. +func Disassociate[T any](p *T) { + disassociate(unsafe.Pointer(p)) +} + +//go:linkname disassociate +func disassociate(b unsafe.Pointer) + +// IsAssociated reports whether p is associated with the current bubble. +func IsAssociated[T any](p *T) bool { + return isAssociated(unsafe.Pointer(p)) +} + +//go:linkname isAssociated +func isAssociated(p unsafe.Pointer) bool + //go:linkname acquire func acquire() any diff --git a/src/internal/synctest/synctest_test.go b/src/internal/synctest/synctest_test.go index 7f71df1710..8b2ade5630 100644 --- a/src/internal/synctest/synctest_test.go +++ b/src/internal/synctest/synctest_test.go @@ -7,11 +7,14 @@ package synctest_test import ( "fmt" "internal/synctest" + "internal/testenv" "iter" + "os" "reflect" "runtime" "slices" "strconv" + "strings" "sync" "testing" "time" @@ -523,7 +526,7 @@ func TestReflectFuncOf(t *testing.T) { }) } -func TestWaitGroup(t *testing.T) { +func TestWaitGroupInBubble(t *testing.T) { synctest.Run(func() { var wg sync.WaitGroup wg.Add(1) @@ -540,6 +543,83 @@ func TestWaitGroup(t *testing.T) { }) } +func TestWaitGroupOutOfBubble(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + donec := make(chan struct{}) + go synctest.Run(func() { + // Since wg.Add was called outside the bubble, Wait is not durably blocking + // and this waits until wg.Done is called below. + wg.Wait() + close(donec) + }) + select { + case <-donec: + t.Fatalf("synctest.Run finished before WaitGroup.Done called") + case <-time.After(1 * time.Millisecond): + } + wg.Done() + <-donec +} + +func TestWaitGroupMovedIntoBubble(t *testing.T) { + wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() { + var wg sync.WaitGroup + wg.Add(1) + synctest.Run(func() { + wg.Add(1) + }) + }) +} + +func TestWaitGroupMovedOutOfBubble(t *testing.T) { + wantFatal(t, "fatal error: sync: WaitGroup.Add called from inside and outside synctest bubble", func() { + var wg sync.WaitGroup + synctest.Run(func() { + wg.Add(1) + }) + wg.Add(1) + }) +} + +func TestWaitGroupMovedBetweenBubblesWithNonZeroCount(t *testing.T) { + wantFatal(t, "fatal error: sync: WaitGroup.Add called from multiple synctest bubbles", func() { + var wg sync.WaitGroup + synctest.Run(func() { + wg.Add(1) + }) + synctest.Run(func() { + wg.Add(1) + }) + }) +} + +func TestWaitGroupMovedBetweenBubblesWithZeroCount(t *testing.T) { + var wg sync.WaitGroup + synctest.Run(func() { + wg.Add(1) + wg.Done() + }) + synctest.Run(func() { + // Reusing the WaitGroup is safe, because its count is zero. + wg.Add(1) + wg.Done() + }) +} + +func TestWaitGroupMovedBetweenBubblesAfterWait(t *testing.T) { + var wg sync.WaitGroup + synctest.Run(func() { + wg.Go(func() {}) + wg.Wait() + }) + synctest.Run(func() { + // Reusing the WaitGroup is safe, because its count is zero. + wg.Go(func() {}) + wg.Wait() + }) +} + func TestHappensBefore(t *testing.T) { // Use two parallel goroutines accessing different vars to ensure that // we correctly account for multiple goroutines in the bubble. @@ -647,3 +727,23 @@ func wantPanic(t *testing.T, want string) { t.Errorf("got no panic, want one") } } + +func wantFatal(t *testing.T, want string, f func()) { + t.Helper() + + if os.Getenv("GO_WANT_HELPER_PROCESS") == "1" { + f() + return + } + + cmd := testenv.Command(t, testenv.Executable(t), "-test.run=^"+t.Name()+"$") + cmd = testenv.CleanCmdEnv(cmd) + cmd.Env = append(cmd.Env, "GO_WANT_HELPER_PROCESS=1") + out, err := cmd.CombinedOutput() + if err == nil { + t.Errorf("expected test function to panic, but test returned successfully") + } + if !strings.Contains(string(out), want) { + t.Errorf("wanted test output contaiing %q; got %q", want, string(out)) + } +} diff --git a/src/runtime/mheap.go b/src/runtime/mheap.go index 3612d71e66..9361089b80 100644 --- a/src/runtime/mheap.go +++ b/src/runtime/mheap.go @@ -223,6 +223,7 @@ type mheap struct { specialReachableAlloc fixalloc // allocator for specialReachable specialPinCounterAlloc fixalloc // allocator for specialPinCounter specialWeakHandleAlloc fixalloc // allocator for specialWeakHandle + specialBubbleAlloc fixalloc // allocator for specialBubble speciallock mutex // lock for special record allocators. arenaHintAlloc fixalloc // allocator for arenaHints @@ -799,6 +800,7 @@ func (h *mheap) init() { h.specialReachableAlloc.init(unsafe.Sizeof(specialReachable{}), nil, nil, &memstats.other_sys) h.specialPinCounterAlloc.init(unsafe.Sizeof(specialPinCounter{}), nil, nil, &memstats.other_sys) h.specialWeakHandleAlloc.init(unsafe.Sizeof(specialWeakHandle{}), nil, nil, &memstats.gcMiscSys) + h.specialBubbleAlloc.init(unsafe.Sizeof(specialBubble{}), nil, nil, &memstats.other_sys) h.arenaHintAlloc.init(unsafe.Sizeof(arenaHint{}), nil, nil, &memstats.other_sys) // Don't zero mspan allocations. Background sweeping can @@ -2003,6 +2005,8 @@ const ( // _KindSpecialCheckFinalizer adds additional context to a finalizer or cleanup. // Used only if debug.checkfinalizers != 0. _KindSpecialCheckFinalizer = 8 + // _KindSpecialBubble is used to associate objects with synctest bubbles. + _KindSpecialBubble = 9 ) type special struct { @@ -2839,6 +2843,11 @@ func freeSpecial(s *special, p unsafe.Pointer, size uintptr) { lock(&mheap_.speciallock) mheap_.specialTinyBlockAlloc.free(unsafe.Pointer(st)) unlock(&mheap_.speciallock) + case _KindSpecialBubble: + st := (*specialBubble)(unsafe.Pointer(s)) + lock(&mheap_.speciallock) + mheap_.specialBubbleAlloc.free(unsafe.Pointer(st)) + unlock(&mheap_.speciallock) default: throw("bad special kind") panic("not reached") diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index 94ab87f6db..cd40586bc2 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -1096,6 +1096,7 @@ const ( waitReasonSynctestChanReceive // "chan receive (synctest)" waitReasonSynctestChanSend // "chan send (synctest)" waitReasonSynctestSelect // "select (synctest)" + waitReasonSynctestWaitGroupWait // "sync.WaitGroup.Wait (synctest)" waitReasonCleanupWait // "cleanup wait" ) @@ -1145,6 +1146,7 @@ var waitReasonStrings = [...]string{ waitReasonSynctestChanReceive: "chan receive (synctest)", waitReasonSynctestChanSend: "chan send (synctest)", waitReasonSynctestSelect: "select (synctest)", + waitReasonSynctestWaitGroupWait: "sync.WaitGroup.Wait (synctest)", waitReasonCleanupWait: "cleanup wait", } @@ -1190,18 +1192,18 @@ func (w waitReason) isIdleInSynctest() bool { // isIdleInSynctest indicates that a goroutine is considered idle by synctest.Wait. var isIdleInSynctest = [len(waitReasonStrings)]bool{ - waitReasonChanReceiveNilChan: true, - waitReasonChanSendNilChan: true, - waitReasonSelectNoCases: true, - waitReasonSleep: true, - waitReasonSyncCondWait: true, - waitReasonSyncWaitGroupWait: true, - waitReasonCoroutine: true, - waitReasonSynctestRun: true, - waitReasonSynctestWait: true, - waitReasonSynctestChanReceive: true, - waitReasonSynctestChanSend: true, - waitReasonSynctestSelect: true, + waitReasonChanReceiveNilChan: true, + waitReasonChanSendNilChan: true, + waitReasonSelectNoCases: true, + waitReasonSleep: true, + waitReasonSyncCondWait: true, + waitReasonSynctestWaitGroupWait: true, + waitReasonCoroutine: true, + waitReasonSynctestRun: true, + waitReasonSynctestWait: true, + waitReasonSynctestChanReceive: true, + waitReasonSynctestChanSend: true, + waitReasonSynctestSelect: true, } var ( diff --git a/src/runtime/sema.go b/src/runtime/sema.go index 4890df3464..7d6fc6d57d 100644 --- a/src/runtime/sema.go +++ b/src/runtime/sema.go @@ -106,8 +106,12 @@ func sync_runtime_SemacquireRWMutex(addr *uint32, lifo bool, skipframes int) { } //go:linkname sync_runtime_SemacquireWaitGroup sync.runtime_SemacquireWaitGroup -func sync_runtime_SemacquireWaitGroup(addr *uint32) { - semacquire1(addr, false, semaBlockProfile, 0, waitReasonSyncWaitGroupWait) +func sync_runtime_SemacquireWaitGroup(addr *uint32, synctestDurable bool) { + reason := waitReasonSyncWaitGroupWait + if synctestDurable { + reason = waitReasonSynctestWaitGroupWait + } + semacquire1(addr, false, semaBlockProfile, 0, reason) } //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go index ff1979a8d8..f676afa20d 100644 --- a/src/runtime/synctest.go +++ b/src/runtime/synctest.go @@ -5,6 +5,7 @@ package runtime import ( + "internal/runtime/atomic" "internal/runtime/sys" "unsafe" ) @@ -13,12 +14,13 @@ import ( type synctestBubble struct { mu mutex timers timers - now int64 // current fake time - root *g // caller of synctest.Run - waiter *g // caller of synctest.Wait - main *g // goroutine started by synctest.Run - waiting bool // true if a goroutine is calling synctest.Wait - done bool // true if main has exited + id uint64 // unique id + now int64 // current fake time + root *g // caller of synctest.Run + waiter *g // caller of synctest.Wait + main *g // goroutine started by synctest.Run + waiting bool // true if a goroutine is calling synctest.Wait + done bool // true if main has exited // The bubble is active (not blocked) so long as running > 0 || active > 0. // @@ -163,6 +165,8 @@ func (bubble *synctestBubble) raceaddr() unsafe.Pointer { return unsafe.Pointer(bubble) } +var bubbleGen atomic.Uint64 // bubble ID counter + //go:linkname synctestRun internal/synctest.Run func synctestRun(f func()) { if debug.asynctimerchan.Load() != 0 { @@ -174,6 +178,7 @@ func synctestRun(f func()) { panic("synctest.Run called from within a synctest bubble") } bubble := &synctestBubble{ + id: bubbleGen.Add(1), total: 1, running: 1, root: gp, @@ -313,6 +318,11 @@ func synctestwait_c(gp *g, _ unsafe.Pointer) bool { return true } +//go:linkname synctest_isInBubble internal/synctest.IsInBubble +func synctest_isInBubble() bool { + return getg().bubble != nil +} + //go:linkname synctest_acquire internal/synctest.acquire func synctest_acquire() any { if bubble := getg().bubble; bubble != nil { @@ -339,3 +349,85 @@ func synctest_inBubble(bubble any, f func()) { }() f() } + +// specialBubble is a special used to associate objects with bubbles. +type specialBubble struct { + _ sys.NotInHeap + special special + bubbleid uint64 +} + +// getOrSetBubbleSpecial checks the special record for p's bubble membership. +// +// If add is true and p is not associated with any bubble, +// it adds a special record for p associating it with bubbleid. +// +// It returns ok==true if p is associated with bubbleid +// (including if a new association was added), +// and ok==false if not. +func getOrSetBubbleSpecial(p unsafe.Pointer, bubbleid uint64, add bool) (ok bool) { + span := spanOfHeap(uintptr(p)) + if span == nil { + throw("getOrSetBubbleSpecial on invalid pointer") + } + + // Ensure that the span is swept. + // Sweeping accesses the specials list w/o locks, so we have + // to synchronize with it. And it's just much safer. + mp := acquirem() + span.ensureSwept() + + offset := uintptr(p) - span.base() + + lock(&span.speciallock) + + // Find splice point, check for existing record. + iter, exists := span.specialFindSplicePoint(offset, _KindSpecialBubble) + if exists { + // p is already associated with a bubble. + // Return true iff it's the same bubble. + s := (*specialBubble)((unsafe.Pointer)(*iter)) + ok = s.bubbleid == bubbleid + } else if add { + // p is not associated with a bubble, + // and we've been asked to add an association. + s := (*specialBubble)(mheap_.specialBubbleAlloc.alloc()) + s.bubbleid = bubbleid + s.special.kind = _KindSpecialBubble + s.special.offset = offset + s.special.next = *iter + *iter = (*special)(unsafe.Pointer(s)) + spanHasSpecials(span) + ok = true + } else { + // p is not associated with a bubble. + ok = false + } + + unlock(&span.speciallock) + releasem(mp) + + return ok +} + +// synctest_associate associates p with the current bubble. +// It returns false if p is already associated with a different bubble. +// +//go:linkname synctest_associate internal/synctest.associate +func synctest_associate(p unsafe.Pointer) (ok bool) { + return getOrSetBubbleSpecial(p, getg().bubble.id, true) +} + +// synctest_disassociate disassociates p from its bubble. +// +//go:linkname synctest_disassociate internal/synctest.disassociate +func synctest_disassociate(p unsafe.Pointer) { + removespecial(p, _KindSpecialBubble) +} + +// synctest_isAssociated reports whether p is associated with the current bubble. +// +//go:linkname synctest_isAssociated internal/synctest.isAssociated +func synctest_isAssociated(p unsafe.Pointer) bool { + return getOrSetBubbleSpecial(p, getg().bubble.id, false) +} diff --git a/src/sync/runtime.go b/src/sync/runtime.go index 99e5bccbee..ae3368e58d 100644 --- a/src/sync/runtime.go +++ b/src/sync/runtime.go @@ -14,7 +14,7 @@ import "unsafe" func runtime_Semacquire(s *uint32) // SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait. -func runtime_SemacquireWaitGroup(s *uint32) +func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool) // Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended // Mutexes and RWMutexes. diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index c850f58ed1..efc63be099 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -6,6 +6,7 @@ package sync import ( "internal/race" + "internal/synctest" "sync/atomic" "unsafe" ) @@ -47,10 +48,17 @@ import ( type WaitGroup struct { noCopy noCopy - state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count. + // Bits (high to low): + // bits[0:32] counter + // bits[32] flag: synctest bubble membership + // bits[33:64] wait count + state atomic.Uint64 sema uint32 } +// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble. +const waitGroupBubbleFlag = 0x8000_0000 + // Add adds delta, which may be negative, to the [WaitGroup] task counter. // If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released. // If the counter goes negative, Add panics. @@ -75,9 +83,27 @@ func (wg *WaitGroup) Add(delta int) { race.Disable() defer race.Enable() } + if synctest.IsInBubble() { + // If Add is called from within a bubble, then all Add calls must be made + // from the same bubble. + if !synctest.Associate(wg) { + // wg is already associated with a different bubble. + fatal("sync: WaitGroup.Add called from multiple synctest bubbles") + } else { + state := wg.state.Or(waitGroupBubbleFlag) + if state != 0 && state&waitGroupBubbleFlag == 0 { + // Add has been called from outside this bubble. + fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") + } + } + } state := wg.state.Add(uint64(delta) << 32) + if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() { + // Add has been called from within a synctest bubble (and we aren't in one). + fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") + } v := int32(state >> 32) - w := uint32(state) + w := uint32(state & 0x7fffffff) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be @@ -90,6 +116,13 @@ func (wg *WaitGroup) Add(delta int) { if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } + if v == 0 && state&waitGroupBubbleFlag != 0 { + // Disassociate the WaitGroup from its bubble. + synctest.Disassociate(wg) + if w == 0 { + wg.state.Store(0) + } + } if v > 0 || w == 0 { return } @@ -147,7 +180,21 @@ func (wg *WaitGroup) Wait() { // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } - runtime_SemacquireWaitGroup(&wg.sema) + synctestDurable := false + if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() { + if race.Enabled { + race.Enable() + } + if synctest.IsAssociated(wg) { + // Add was called within the current bubble, + // so this Wait is durably blocking. + synctestDurable = true + } + if race.Enabled { + race.Disable() + } + } + runtime_SemacquireWaitGroup(&wg.sema, synctestDurable) if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } diff --git a/src/testing/synctest/synctest.go b/src/testing/synctest/synctest.go index c7e93b2201..1664cb8484 100644 --- a/src/testing/synctest/synctest.go +++ b/src/testing/synctest/synctest.go @@ -72,10 +72,17 @@ // - a blocking select statement where every case is a channel created // within the bubble // - [sync.Cond.Wait] -// - [sync.WaitGroup.Wait] +// - [sync.WaitGroup.Wait], when [sync.WaitGroup.Add] was called within the bubble // - [time.Sleep] // -// Locking a [sync.Mutex] or [sync.RWMutex] is not durably blocking. +// Operations not in the above list are not durably blocking. +// In particular, the following operations may block a goroutine, +// but are not durably blocking because the goroutine can be unblocked +// by an event occurring outside its bubble: +// +// - locking a [sync.Mutex] or [sync.RWMutex] +// - blocking on I/O, such as reading from a network socket +// - system calls // // # Isolation // @@ -83,6 +90,10 @@ // is associated with it. Operating on a bubbled channel, timer, or // ticker from outside the bubble panics. // +// A [sync.WaitGroup] becomes associated with a bubble on the first +// call to Add or Go. Once a WaitGroup is associated with a bubble, +// calling Add or Go from outside that bubble panics. +// // Cleanup functions and finalizers registered with // [runtime.AddCleanup] and [runtime.SetFinalizer] // run outside of any bubble.