Skip to content

Commit

Permalink
Use safe accessors in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vearutop committed Jun 30, 2023
1 parent 182dcc5 commit a20d049
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions queue_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,35 +205,35 @@ func TestClusterConsumerCommon(t *testing.T) {
assert.NoError(t, queue1.StartConsuming(10, time.Millisecond))
_, err = queue1.AddConsumer("cons-cons", consumer)
assert.NoError(t, err)
assert.Nil(t, consumer.LastDelivery)
assert.Nil(t, consumer.Last())

assert.NoError(t, queue1.Publish("cons-d1"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
require.NotNil(t, consumer.LastDelivery)
assert.Equal(t, "cons-d1", consumer.LastDelivery.Payload())
require.NotNil(t, consumer.Last())
assert.Equal(t, "cons-d1", consumer.Last().Payload())

assert.NoError(t, queue1.Publish("cons-d2"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 2)
assert.Equal(t, "cons-d2", consumer.LastDelivery.Payload())
assert.Equal(t, "cons-d2", consumer.Last().Payload())

assert.NoError(t, consumer.LastDeliveries[0].Ack())
assert.NoError(t, consumer.Deliveries()[0].Ack())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)

assert.NoError(t, consumer.LastDeliveries[1].Ack())
assert.NoError(t, consumer.Deliveries()[1].Ack())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)

assert.Equal(t, ErrorNotFound, consumer.LastDeliveries[0].Ack())
assert.Equal(t, ErrorNotFound, consumer.Deliveries()[0].Ack())

assert.NoError(t, queue1.Publish("cons-d3"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
eventuallyRejected(t, queue1, 0)
assert.Equal(t, "cons-d3", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Reject())
assert.Equal(t, "cons-d3", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Reject())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)
eventuallyRejected(t, queue1, 1)
Expand All @@ -242,8 +242,8 @@ func TestClusterConsumerCommon(t *testing.T) {
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
eventuallyRejected(t, queue1, 1)
assert.Equal(t, "cons-d4", consumer.LastDelivery.Payload())
assert.NoError(t, consumer.LastDelivery.Reject())
assert.Equal(t, "cons-d4", consumer.Last().Payload())
assert.NoError(t, consumer.Last().Reject())
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 0)
eventuallyRejected(t, queue1, 2)
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestClusterMulti(t *testing.T) {
eventuallyReady(t, queue, 10)
eventuallyUnacked(t, queue, 10)

assert.NoError(t, consumer.LastDelivery.Ack())
assert.NoError(t, consumer.Last().Ack())
// Assert that after the consumer acks a message the ready count drops to 9 and unacked remains at 10
// TODO use util funcs instead
assert.Eventually(t, func() bool {
Expand All @@ -344,7 +344,7 @@ func TestClusterMulti(t *testing.T) {
eventuallyReady(t, queue, 9)
eventuallyUnacked(t, queue, 10)

assert.NoError(t, consumer.LastDelivery.Ack())
assert.NoError(t, consumer.Last().Ack())
// Assert that after the consumer acks a message the ready count drops to 8 and unacked remains at 10
// TODO use the util funcs instead
assert.Eventually(t, func() bool {
Expand Down Expand Up @@ -397,38 +397,38 @@ func TestClusterBatch(t *testing.T) {
_, err = queue.AddBatchConsumer("batch-cons", 2, 50*time.Millisecond, consumer)
assert.NoError(t, err)
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 2
return len(consumer.Last()) == 2
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d0", consumer.LastBatch[0].Payload())
assert.Equal(t, "batch-d1", consumer.LastBatch[1].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.NoError(t, consumer.LastBatch[1].Ack())
assert.Equal(t, "batch-d0", consumer.Last()[0].Payload())
assert.Equal(t, "batch-d1", consumer.Last()[1].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
assert.NoError(t, consumer.Last()[1].Ack())
eventuallyUnacked(t, queue, 3)
eventuallyRejected(t, queue, 1)

consumer.Finish()
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 2
return len(consumer.Last()) == 2
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d2", consumer.LastBatch[0].Payload())
assert.Equal(t, "batch-d3", consumer.LastBatch[1].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.NoError(t, consumer.LastBatch[1].Ack())
assert.Equal(t, "batch-d2", consumer.Last()[0].Payload())
assert.Equal(t, "batch-d3", consumer.Last()[1].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
assert.NoError(t, consumer.Last()[1].Ack())
eventuallyUnacked(t, queue, 1)
eventuallyRejected(t, queue, 2)

consumer.Finish()
// Last Batch is cleared out
assert.Len(t, consumer.LastBatch, 0)
assert.Len(t, consumer.Last(), 0)
eventuallyUnacked(t, queue, 1)
eventuallyRejected(t, queue, 2)

// After a pause the batch consumer will pull down another batch
assert.Eventually(t, func() bool {
return len(consumer.LastBatch) == 1
return len(consumer.Last()) == 1
}, 10*time.Second, 2*time.Millisecond)
assert.Equal(t, "batch-d4", consumer.LastBatch[0].Payload())
assert.NoError(t, consumer.LastBatch[0].Reject())
assert.Equal(t, "batch-d4", consumer.Last()[0].Payload())
assert.NoError(t, consumer.Last()[0].Reject())
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 3)
}
Expand Down Expand Up @@ -466,13 +466,13 @@ func TestClusterReturnRejected(t *testing.T) {
eventuallyUnacked(t, queue, 6)
eventuallyRejected(t, queue, 0)

assert.Len(t, consumer.LastDeliveries, 6)
assert.NoError(t, consumer.LastDeliveries[0].Reject())
assert.NoError(t, consumer.LastDeliveries[1].Ack())
assert.NoError(t, consumer.LastDeliveries[2].Reject())
assert.NoError(t, consumer.LastDeliveries[3].Reject())
assert.Len(t, consumer.Deliveries(), 6)
assert.NoError(t, consumer.Deliveries()[0].Reject())
assert.NoError(t, consumer.Deliveries()[1].Ack())
assert.NoError(t, consumer.Deliveries()[2].Reject())
assert.NoError(t, consumer.Deliveries()[3].Reject())
// delivery 4 still open
assert.NoError(t, consumer.LastDeliveries[5].Reject())
assert.NoError(t, consumer.Deliveries()[5].Reject())

eventuallyReady(t, queue, 0)
eventuallyUnacked(t, queue, 1) // delivery 4
Expand Down Expand Up @@ -528,14 +528,14 @@ func TestClusterPushQueue(t *testing.T) {

assert.NoError(t, queue1.Publish("d1"))
eventuallyUnacked(t, queue1, 1)
require.Len(t, consumer1.LastDeliveries, 1)
require.Len(t, consumer1.Deliveries(), 1)

assert.NoError(t, consumer1.LastDelivery.Push())
assert.NoError(t, consumer1.Last().Push())
eventuallyUnacked(t, queue1, 0)
eventuallyUnacked(t, queue2, 1)
require.Len(t, consumer2.LastDeliveries, 1)
require.Len(t, consumer2.Deliveries(), 1)

assert.NoError(t, consumer2.LastDelivery.Push())
assert.NoError(t, consumer2.Last().Push())
eventuallyRejected(t, queue2, 1)
}

Expand Down Expand Up @@ -573,7 +573,7 @@ func TestClusterStopConsuming_Consumer(t *testing.T) {

var consumedCount int64
for i := 0; i < 10; i++ {
consumedCount += int64(len(consumers[i].LastDeliveries))
consumedCount += int64(len(consumers[i].Deliveries()))
}

// make sure all deliveries are either ready, unacked or consumed (acked)
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestClusterStopConsuming_BatchConsumer(t *testing.T) {

var consumedCount int64
for i := 0; i < 10; i++ {
consumedCount += consumers[i].ConsumedCount
consumedCount += consumers[i].Consumed()
}

// make sure all deliveries are either ready, unacked or consumed (acked)
Expand Down Expand Up @@ -788,7 +788,7 @@ func BenchmarkClusterQueue(b *testing.B) {

sum := 0
for _, consumer := range consumers {
sum += len(consumer.LastDeliveries)
sum += len(consumer.Deliveries())
}
fmt.Printf("consumed %d\n", sum)

Expand Down Expand Up @@ -861,12 +861,12 @@ func TestClusterMix(t *testing.T) {
assert.NoError(t, queue1.StartConsuming(10, time.Millisecond))
_, err = queue1.AddConsumer("cons-cons", consumer1)
assert.NoError(t, err)
assert.Nil(t, consumer1.LastDelivery)
assert.Nil(t, consumer1.Last())

assert.NoError(t, queue1.Publish("cons-d1"))
eventuallyReady(t, queue1, 0)
eventuallyUnacked(t, queue1, 1)
require.NotNil(t, consumer1.LastDelivery)
require.NotNil(t, consumer1.Last())
assert.Regexp(t, // using [queue]
`\[cons-d1 rmq::connection::conn1-\w{6}::queue::\[cons-q\]::unacked\]`,
fmt.Sprintf("%s", consumer1.Last()),
Expand All @@ -884,12 +884,12 @@ func TestClusterMix(t *testing.T) {
assert.NoError(t, queue2.StartConsuming(20, time.Millisecond))
_, err = queue2.AddConsumer("cons-cons", consumer2)
assert.NoError(t, err)
assert.Nil(t, consumer2.LastDelivery)
assert.Nil(t, consumer2.Last())

assert.NoError(t, queue2.Publish("cons-d2"))
eventuallyReady(t, queue2, 0)
eventuallyUnacked(t, queue2, 1)
require.NotNil(t, consumer2.LastDelivery)
require.NotNil(t, consumer2.Last())
assert.Regexp(t, // using {queue}
`\[cons-d2 rmq::connection::conn2-\w{6}::queue::\{cons-q\}::unacked\]`,
fmt.Sprintf("%s", consumer2.Last()),
Expand Down

0 comments on commit a20d049

Please sign in to comment.