Describe the bug
The EnqueueM method in mpmc/rb2.go ignores the return value of atomic.CompareAndSwapUint32 when advancing the tail pointer, causing data loss when multiple goroutines write concurrently.
mpmc/rb2.go, line 38:
holder = &rb.data[tail]
atomic.CompareAndSwapUint32(&rb.tail, tail, nt) // result ignored!
Root Cause
When two goroutines race to enqueue:
- Goroutine A reads tail=5, calculates nt=6
- Goroutine B reads tail=5, calculates nt=6
- Goroutine A successfully CAS tail from 5→6
- Goroutine B's CAS fails (tail is now 6, not 5), but code continues
- Both goroutines write to rb.data[5] — one overwrites the other
The same issue exists in EnqueueMRich (line 92) and Enqueue (line 146).
To Reproduce
import (
"fmt"
"sync"
"github.com/hedzr/go-ringbuf/v2/mpmc"
)
func main() {
buf := mpmc.NewOverlappedRingBuffer[int](1000)
var wg sync.WaitGroup
writesPerGoroutine := 100
numGoroutines := 2
for g := 0; g < numGoroutines; g++ {
wg.Add(1)
go func(base int) {
defer wg.Done()
for i := 0; i < writesPerGoroutine; i++ {
buf.EnqueueM(base + i)
}
}(g * writesPerGoroutine)
}
wg.Wait()
// Count records
count := 0
for !buf.IsEmpty() {
_, err := buf.Dequeue()
if err == nil {
count++
}
}
expected := numGoroutines * writesPerGoroutine
fmt.Printf("Expected: %d, Got: %d\n", expected, count)
if count != expected {
fmt.Println("BUG: Data loss detected!")
}
}
Expected behavior
Run with GOMAXPROCS > 1 to trigger the race. Test passes with GOMAXPROCS=1 (no true parallelism).
All 200 records should be in the buffer.
Actual Behavior
Randomly lose records (e.g., 175, 185, 193, 198 instead of 200).
Additional context
Suggested Fix
Check the CAS result and retry on failure:
holder = &rb.data[tail]
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
continue // retry with fresh tail value
}
Describe the bug
The EnqueueM method in mpmc/rb2.go ignores the return value of atomic.CompareAndSwapUint32 when advancing the tail pointer, causing data loss when multiple goroutines write concurrently.
Root Cause
When two goroutines race to enqueue:
The same issue exists in EnqueueMRich (line 92) and Enqueue (line 146).
To Reproduce
Expected behavior
Run with GOMAXPROCS > 1 to trigger the race. Test passes with GOMAXPROCS=1 (no true parallelism).
All 200 records should be in the buffer.
Actual Behavior
Randomly lose records (e.g., 175, 185, 193, 198 instead of 200).
Additional context
Suggested Fix
Check the CAS result and retry on failure:
holder = &rb.data[tail]
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
continue // retry with fresh tail value
}