Skip to content

Commit

Permalink
Enable race detector in tests, fix data races (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
vearutop authored Jan 27, 2023
1 parent 180d9e8 commit 445fd7e
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
- name: Test
id: test
run: |
go test -coverprofile=unit.coverprofile -covermode=atomic ./...
go test -race -coverprofile=unit.coverprofile -covermode=atomic ./...
go tool cover -func=./unit.coverprofile > unit.txt
TOTAL=$(grep 'total:' unit.txt)
echo "${TOTAL}"
Expand Down
16 changes: 8 additions & 8 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func TestCleaner(t *testing.T) {
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 4)

require.NotNil(t, consumer.LastDelivery)
assert.Equal(t, "del1", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Ack())
require.NotNil(t, consumer.Last())
assert.Equal(t, "del1", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Ack())
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 3)

consumer.Finish()
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 3)
assert.Equal(t, "del2", consumer.LastDelivery.Payload())
assert.Equal(t, "del2", consumer.Last().Payload())

queue.StopConsuming()
assert.NoError(t, conn.stopHeartbeat())
Expand Down Expand Up @@ -121,15 +121,15 @@ func TestCleaner(t *testing.T) {
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 6)
assert.Equal(t, "del4", consumer.LastDelivery.Payload())
assert.Equal(t, "del4", consumer.Last().Payload())

consumer.Finish() // unacked
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 6)

assert.Equal(t, "del5", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Ack())
assert.Equal(t, "del5", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Ack())
time.Sleep(10 * time.Millisecond)
eventuallyUnacked(t, queue, 2)
eventuallyReady(t, queue, 5)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestCleaner(t *testing.T) {
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
assert.Eventually(t, func() bool {
return len(consumer.LastDeliveries) == 9
return len(consumer.Deliveries()) == 9
}, 10*time.Second, 2*time.Millisecond)

queue.StopConsuming()
Expand Down
82 changes: 41 additions & 41 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,35 +204,35 @@ func TestConsumerCommon(t *testing.T) {
assert.NoError(t, queue1.StartConsuming(10, time.Millisecond))
_, err = queue1.AddConsumer("cons-cons", consumer)
assert.NoError(t, err)
assert.Nil(t, consumer.LastDelivery)
assert.Nil(t, consumer.Last())

assert.NoError(t, queue1.Publish("cons-d1"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
require.NotNil(t, consumer.LastDelivery)
assert.Equal(t, "cons-d1", consumer.LastDelivery.Payload())
require.NotNil(t, consumer.Last())
assert.Equal(t, "cons-d1", consumer.Last().Payload())

assert.NoError(t, queue1.Publish("cons-d2"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 2)
assert.Equal(t, "cons-d2", consumer.LastDelivery.Payload())
assert.Equal(t, "cons-d2", consumer.Last().Payload())

assert.NoError(t, consumer.LastDeliveries[0].Ack())
assert.NoError(t, consumer.Deliveries()[0].Ack())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)

assert.NoError(t, consumer.LastDeliveries[1].Ack())
assert.NoError(t, consumer.Deliveries()[1].Ack())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)

assert.Equal(t, ErrorNotFound, consumer.LastDeliveries[0].Ack())
assert.Equal(t, ErrorNotFound, consumer.Deliveries()[0].Ack())

assert.NoError(t, queue1.Publish("cons-d3"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
eventuallyRejected(t, queue1, 0)
assert.Equal(t, "cons-d3", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Reject())
assert.Equal(t, "cons-d3", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Reject())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)
eventuallyRejected(t, queue1, 1)
Expand All @@ -241,8 +241,8 @@ func TestConsumerCommon(t *testing.T) {
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
eventuallyRejected(t, queue1, 1)
assert.Equal(t, "cons-d4", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Reject())
assert.Equal(t, "cons-d4", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Reject())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)
eventuallyRejected(t, queue1, 2)
Expand Down Expand Up @@ -323,8 +323,8 @@ func TestMulti(t *testing.T) {
eventuallyReady(t, queue, 10)
eventuallyUnacked(t, queue, 10)

require.NotNil(t, consumer.LastDelivery)
assert.NoError(t, consumer.LastDelivery.Ack())
require.NotNil(t, consumer.Last())
assert.NoError(t, consumer.Last().Ack())
// Assert that after the consumer acks a message the ready count drops to 9 and unacked remains at 10
// TODO use util funcs instead
assert.Eventually(t, func() bool {
Expand All @@ -344,7 +344,7 @@ func TestMulti(t *testing.T) {
eventuallyReady(t, queue, 9)
eventuallyUnacked(t, queue, 10)

assert.NoError(t, consumer.LastDelivery.Ack())
assert.NoError(t, consumer.Last().Ack())
// Assert that after the consumer acks a message the ready count drops to 8 and unacked remains at 10
// TODO use the util funcs instead
assert.Eventually(t, func() bool {
Expand Down Expand Up @@ -397,38 +397,38 @@ func TestBatch(t *testing.T) {
_, err = queue.AddBatchConsumer("batch-cons", 2, 50*time.Millisecond, consumer)
assert.NoError(t, err)
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 2
return len(consumer.Last()) == 2
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d0", consumer.LastBatch[0].Payload())
assert.Equal(t, "batch-d1", consumer.LastBatch[1].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.NoError(t, consumer.LastBatch[1].Ack())
assert.Equal(t, "batch-d0", consumer.Last()[0].Payload())
assert.Equal(t, "batch-d1", consumer.Last()[1].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
assert.NoError(t, consumer.Last()[1].Ack())
eventuallyUnacked(t, queue, 3)
eventuallyRejected(t, queue, 1)

consumer.Finish()
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 2
return len(consumer.Last()) == 2
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d2", consumer.LastBatch[0].Payload())
assert.Equal(t, "batch-d3", consumer.LastBatch[1].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.NoError(t, consumer.LastBatch[1].Ack())
assert.Equal(t, "batch-d2", consumer.Last()[0].Payload())
assert.Equal(t, "batch-d3", consumer.Last()[1].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
assert.NoError(t, consumer.Last()[1].Ack())
eventuallyUnacked(t, queue, 1)
eventuallyRejected(t, queue, 2)

consumer.Finish()
// Last Batch is cleared out
assert.Len(t, consumer.LastBatch, 0)
assert.Len(t, consumer.Last(), 0)
eventuallyUnacked(t, queue, 1)
eventuallyRejected(t, queue, 2)

// After a pause the batch consumer will pull down another batch
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 1
return len(consumer.Last()) == 1
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d4", consumer.LastBatch[0].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.Equal(t, "batch-d4", consumer.Last()[0].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 3)
}
Expand Down Expand Up @@ -466,13 +466,13 @@ func TestReturnRejected(t *testing.T) {
eventuallyUnacked(t, queue, 6)
eventuallyRejected(t, queue, 0)

assert.Len(t, consumer.LastDeliveries, 6)
assert.NoError(t, consumer.LastDeliveries[0].Reject())
assert.NoError(t, consumer.LastDeliveries[1].Ack())
assert.NoError(t, consumer.LastDeliveries[2].Reject())
assert.NoError(t, consumer.LastDeliveries[3].Reject())
assert.Len(t, consumer.Deliveries(), 6)
assert.NoError(t, consumer.Deliveries()[0].Reject())
assert.NoError(t, consumer.Deliveries()[1].Ack())
assert.NoError(t, consumer.Deliveries()[2].Reject())
assert.NoError(t, consumer.Deliveries()[3].Reject())
// delivery 4 still open
assert.NoError(t, consumer.LastDeliveries[5].Reject())
assert.NoError(t, consumer.Deliveries()[5].Reject())

eventuallyReady(t, queue, 0)
eventuallyUnacked(t, queue, 1) // delivery 4
Expand Down Expand Up @@ -524,14 +524,14 @@ func TestPushQueue(t *testing.T) {

assert.NoError(t, queue1.Publish("d1"))
eventuallyUnacked(t, queue1, 1)
require.Len(t, consumer1.LastDeliveries, 1)
require.Len(t, consumer1.Deliveries(), 1)

assert.NoError(t, consumer1.LastDelivery.Push())
assert.NoError(t, consumer1.Last().Push())
eventuallyUnacked(t, queue1, 0)
eventuallyUnacked(t, queue2, 1)
require.Len(t, consumer2.LastDeliveries, 1)
require.Len(t, consumer2.Deliveries(), 1)

assert.NoError(t, consumer2.LastDelivery.Push())
assert.NoError(t, consumer2.Last().Push())
eventuallyRejected(t, queue2, 1)
}

Expand Down Expand Up @@ -569,7 +569,7 @@ func TestStopConsuming_Consumer(t *testing.T) {

var consumedCount int64
for i := 0; i < 10; i++ {
consumedCount += int64(len(consumers[i].LastDeliveries))
consumedCount += int64(len(consumers[i].Deliveries()))
}

// make sure all deliveries are either ready, unacked or consumed (acked)
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestStopConsuming_BatchConsumer(t *testing.T) {

var consumedCount int64
for i := 0; i < 10; i++ {
consumedCount += consumers[i].ConsumedCount
consumedCount += consumers[i].Consumed()
}

// make sure all deliveries are either ready, unacked or consumed (acked)
Expand Down Expand Up @@ -793,7 +793,7 @@ func BenchmarkQueue(b *testing.B) {

sum := 0
for _, consumer := range consumers {
sum += len(consumer.LastDeliveries)
sum += len(consumer.Deliveries())
}

assert.Equal(b, b.N, sum)
Expand Down
4 changes: 2 additions & 2 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestStats(t *testing.T) {
assert.NoError(t, q2.Publish("stats-d3"))
assert.NoError(t, q2.Publish("stats-d4"))
time.Sleep(2 * time.Millisecond)
assert.NoError(t, consumer.LastDeliveries[0].Ack())
assert.NoError(t, consumer.LastDeliveries[1].Reject())
assert.NoError(t, consumer.Deliveries()[0].Ack())
assert.NoError(t, consumer.Deliveries()[1].Reject())
_, err = q2.AddConsumer("stats-cons2", NewTestConsumer("hand-B"))
assert.NoError(t, err)

Expand Down
27 changes: 26 additions & 1 deletion test_batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package rmq

import "sync"

type TestBatchConsumer struct {
LastBatch Deliveries
mu sync.Mutex
// Deprecated: use Last() to avoid data races.
LastBatch Deliveries
// Deprecated use Consumed() to avoid data races.
ConsumedCount int64
AutoFinish bool

Expand All @@ -14,9 +19,26 @@ func NewTestBatchConsumer() *TestBatchConsumer {
}
}

func (consumer *TestBatchConsumer) Last() Deliveries {
consumer.mu.Lock()
defer consumer.mu.Unlock()

return consumer.LastBatch
}

func (consumer *TestBatchConsumer) Consumed() int64 {
consumer.mu.Lock()
defer consumer.mu.Unlock()

return consumer.ConsumedCount
}

func (consumer *TestBatchConsumer) Consume(batch Deliveries) {
consumer.mu.Lock()
consumer.LastBatch = batch
consumer.ConsumedCount += int64(len(batch))
consumer.mu.Unlock()

if consumer.AutoFinish {
batch.Ack()
} else {
Expand All @@ -27,6 +49,9 @@ func (consumer *TestBatchConsumer) Consume(batch Deliveries) {

func (consumer *TestBatchConsumer) Finish() {
// log.Printf("TestBatchConsumer.Finish()")
consumer.mu.Lock()
consumer.LastBatch = nil
consumer.mu.Unlock()

consumer.finish <- 1
}
22 changes: 21 additions & 1 deletion test_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rmq

import (
"sync"
"time"
)

Expand All @@ -10,7 +11,10 @@ type TestConsumer struct {
AutoFinish bool
SleepDuration time.Duration

LastDelivery Delivery
mu sync.Mutex
// Deprecated: use Last() to avoid data races.
LastDelivery Delivery
// Deprecated: use Deliveries() to avoid data races.
LastDeliveries []Delivery

finish chan int
Expand All @@ -29,9 +33,25 @@ func (consumer *TestConsumer) String() string {
return consumer.name
}

func (consumer *TestConsumer) Last() Delivery {
consumer.mu.Lock()
defer consumer.mu.Unlock()

return consumer.LastDelivery
}

func (consumer *TestConsumer) Deliveries() []Delivery {
consumer.mu.Lock()
defer consumer.mu.Unlock()

return consumer.LastDeliveries
}

func (consumer *TestConsumer) Consume(delivery Delivery) {
consumer.mu.Lock()
consumer.LastDelivery = delivery
consumer.LastDeliveries = append(consumer.LastDeliveries, delivery)
consumer.mu.Unlock()

if consumer.SleepDuration > 0 {
time.Sleep(consumer.SleepDuration)
Expand Down

0 comments on commit 445fd7e

Please sign in to comment.