diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f499b913c9..6d58cd4091 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1774,22 +1774,30 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { break } receivedConsumer1++ - if cnt, has := consumer1Keys[cm.Key()]; !has { - consumer1Keys[cm.Key()] = 1 - } else { - consumer1Keys[cm.Key()] = cnt + 1 + cnt := 0 + if _, has := consumer1Keys[cm.Key()]; has { + cnt = consumer1Keys[cm.Key()] } + assert.Equal( + t, fmt.Sprintf("value-%d", cnt), + string(cm.Payload()), + ) + consumer1Keys[cm.Key()] = cnt + 1 consumer1.Ack(cm.Message) case cm, ok := <-consumer2.Chan(): if !ok { break } receivedConsumer2++ - if cnt, has := consumer2Keys[cm.Key()]; !has { - consumer2Keys[cm.Key()] = 1 - } else { - consumer2Keys[cm.Key()] = cnt + 1 + cnt := 0 + if _, has := consumer2Keys[cm.Key()]; has { + cnt = consumer2Keys[cm.Key()] } + assert.Equal( + t, fmt.Sprintf("value-%d", cnt), + string(cm.Payload()), + ) + consumer2Keys[cm.Key()] = cnt + 1 consumer2.Ack(cm.Message) } } @@ -1869,5 +1877,5 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { receivedMessageIndex++ } - // TODO: add OrderingKey support + // TODO: add OrderingKey support, see GH issue #401 }