Skip to content

[BUG] EnqueueM ignores CAS result, causing data loss under concurrent writes #9

@srgg

Description

@srgg

Describe the bug
The EnqueueM method in mpmc/rb2.go ignores the return value of atomic.CompareAndSwapUint32 when advancing the tail pointer, causing data loss when multiple goroutines write concurrently.

mpmc/rb2.go, line 38:

  holder = &rb.data[tail]
  atomic.CompareAndSwapUint32(&rb.tail, tail, nt)  // result ignored!

Root Cause

When two goroutines race to enqueue:

  1. Goroutine A reads tail=5, calculates nt=6
  2. Goroutine B reads tail=5, calculates nt=6
  3. Goroutine A successfully CAS tail from 5→6
  4. Goroutine B's CAS fails (tail is now 6, not 5), but code continues
  5. Both goroutines write to rb.data[5] — one overwrites the other

The same issue exists in EnqueueMRich (line 92) and Enqueue (line 146).

To Reproduce

  import (
        "fmt"
        "sync"
        "github.com/hedzr/go-ringbuf/v2/mpmc"
  )

  func main() {
        buf := mpmc.NewOverlappedRingBuffer[int](1000)

        var wg sync.WaitGroup
        writesPerGoroutine := 100
        numGoroutines := 2

        for g := 0; g < numGoroutines; g++ {
                wg.Add(1)
                go func(base int) {
                        defer wg.Done()
                        for i := 0; i < writesPerGoroutine; i++ {
                                buf.EnqueueM(base + i)
                        }
                }(g * writesPerGoroutine)
        }

        wg.Wait()

        // Count records
        count := 0
        for !buf.IsEmpty() {
                _, err := buf.Dequeue()
                if err == nil {
                        count++
                }
        }

        expected := numGoroutines * writesPerGoroutine
        fmt.Printf("Expected: %d, Got: %d\n", expected, count)
        if count != expected {
                fmt.Println("BUG: Data loss detected!")
        }
  }

Expected behavior
Run with GOMAXPROCS > 1 to trigger the race. Test passes with GOMAXPROCS=1 (no true parallelism).
All 200 records should be in the buffer.

Actual Behavior

Randomly lose records (e.g., 175, 185, 193, 198 instead of 200).

Additional context
Suggested Fix

Check the CAS result and retry on failure:

holder = &rb.data[tail]
if !atomic.CompareAndSwapUint32(&rb.tail, tail, nt) {
continue // retry with fresh tail value
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions