Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory queue: cancel in-progress writes on queue closed, not producer closed #38094

Merged
merged 8 commits into from
Mar 4, 2024
Prev Previous commit
Next Next commit
candidate test for wait group panic
  • Loading branch information
faec committed Feb 29, 2024
commit 461f8e12ea637a609e76143cd9324c026a43027e
101 changes: 91 additions & 10 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"testing"
"time"

"gotest.tools/assert"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand Down Expand Up @@ -149,6 +148,88 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {
"test not flagged as successful, p.Publish likely blocked indefinitely")
}

func TestProducerClosePreservesEventCount(t *testing.T) {
// Check for https://github.com/elastic/beats/issues/37702, a problem
// where canceling a producer while it was waiting on a response
// to an insert request could lead to inaccurate event totals.

var activeEvents atomic.Int64

q := NewQueue(nil, nil,
Settings{
Events: 4, // Queue size
MaxGetRequest: 2,
FlushTimeout: 10 * time.Millisecond,
}, 1)

p := q.Producer(queue.ProducerConfig{
ACK: func(count int) {
activeEvents.Add(-int64(count))
},
OnDrop: func(e interface{}) {
//activeEvents.Add(-1)
},
DropOnCancel: false,
})

// Asynchronously, send 4 events to the queue.
// Three will be enqueued, and one will be buffered,
// until we start reading from the queue.
// This needs to run in a goroutine because the buffered
// event will block until the queue handles it.
var wgProducer sync.WaitGroup
wgProducer.Add(1)
go func() {
for i := 0; i < 4; i++ {
event := i
// For proper navigation of the race conditions inherent to this
// test: increment active events before the publish attempt, then
// decrement afterwards if it failed (otherwise the event count
// could become negative even under correct queue operation).
activeEvents.Add(1)
_, ok := p.Publish(event)
if !ok {
activeEvents.Add(-1)
}
}
wgProducer.Done()
}()

// This sleep is regrettable, but there's no deterministic way to know when
// the producer code has buffered an event in the queue's channel.
// However, the test is written to produce false negatives only:
// - If this test fails, it _always_ indicates a bug.
// - If there is a bug, this test will _often_ fail.
time.Sleep(10 * time.Millisecond)

// Cancel the producer, then read and acknowledge two batches. If the
// Publish calls and the queue code are working, activeEvents should
// _usually_ end up as 0, but _always_ end up non-negative.
p.Cancel()

// The queue reads also need to be done in a goroutine, in case the
// producer cancellation signal went through before the Publish
// requests -- if only 2 events entered the queue, then the second
// Get call will block until the queue itself is cancelled.
go func() {
for i := 0; i < 2; i++ {
batch, err := q.Get(2)
// Only error to worry about is queue closing, which isn't
// a test failure.
if err == nil {
batch.Done()
}
}
}()

// One last sleep to let things percolate, then we close the queue
// to unblock any helpers and verify that the final active event
// count isn't negative.
time.Sleep(10 * time.Millisecond)
q.Close()
assert.False(t, activeEvents.Load() < 0, "active event count should never be negative")
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10
Expand Down Expand Up @@ -190,7 +271,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te

// Read events, don't yet ack them
batch, err := testQueue.Get(eventsToTest)
assert.NilError(t, err, "error in Get")
assert.NoError(t, err, "error in Get")
t.Logf("Got batch of %d events", batch.Count())

queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))
Expand All @@ -206,7 +287,7 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup
// wait briefly to avoid races across all the queue channels
time.Sleep(time.Millisecond * 100)
testMetrics, err := q.Metrics()
assert.NilError(t, err, "error calling metrics for test %s", test)
assert.NoError(t, err, "error calling metrics for test %s", test)
assert.Equal(t, testMetrics.EventCount.ValueOr(0), uint64(evtCount), "incorrect EventCount for %s", test)
assert.Equal(t, testMetrics.EventLimit.ValueOr(0), uint64(evtLimit), "incorrect EventLimit for %s", test)
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
Expand Down Expand Up @@ -266,18 +347,18 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")

metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i),
fmt.Sprintf("Oldest entry ID before ACKing event %v should be %v", i, i))

batch.Done()
waiter.waitForEvents(1)
metrics, err = q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(i+1),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be %v", i, i+1))

Expand All @@ -297,7 +378,7 @@ func TestEntryIDs(t *testing.T) {

for i := 0; i < entryCount; i++ {
batch, err := q.Get(1)
assert.NilError(t, err, "Queue read should succeed")
assert.NoError(t, err, "Queue read should succeed")
assert.Equal(t, batch.Count(), 1, "Returned batch should have 1 entry")
batches = append(batches, batch)
}
Expand All @@ -318,15 +399,15 @@ func TestEntryIDs(t *testing.T) {
// the slight nondeterminism.
time.Sleep(1 * time.Millisecond)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(0),
fmt.Sprintf("Oldest entry ID after ACKing event %v should be 0", i))
}
// ACK the first batch, which should unblock all the later ones
batches[0].Done()
waiter.waitForEvents(100)
metrics, err := q.Metrics()
assert.NilError(t, err, "Queue metrics call should succeed")
assert.NoError(t, err, "Queue metrics call should succeed")
assert.Equal(t, metrics.OldestEntryID, queue.EntryID(100),
fmt.Sprintf("Oldest entry ID after ACKing event 0 should be %v", queue.EntryID(entryCount)))

Expand Down