From 028d635d8b344e25d3736d43ac9af384d6138358 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Wed, 8 Feb 2023 20:35:00 +0100 Subject: [PATCH] Fix ack failure for payload with header (#137) --- delivery.go | 23 ++++++++++++----------- queue_test.go | 12 +++++++++--- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/delivery.go b/delivery.go index 44552cd..51395e6 100644 --- a/delivery.go +++ b/delivery.go @@ -21,14 +21,15 @@ var ( ) type redisDelivery struct { - ctx context.Context - payload string - header http.Header - unackedKey string - rejectedKey string - pushKey string - redisClient RedisClient - errChan chan<- error + ctx context.Context + payload string + clearPayload string + header http.Header + unackedKey string + rejectedKey string + pushKey string + redisClient RedisClient + errChan chan<- error } func (delivery *redisDelivery) Header() http.Header { @@ -56,7 +57,7 @@ func newDelivery( var err error - if rd.header, rd.payload, err = ExtractHeaderAndPayload(payload); err != nil { + if rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload); err != nil { return nil, err } @@ -64,11 +65,11 @@ func newDelivery( } func (delivery *redisDelivery) String() string { - return fmt.Sprintf("[%s %s]", delivery.payload, delivery.unackedKey) + return fmt.Sprintf("[%s %s]", delivery.clearPayload, delivery.unackedKey) } func (delivery *redisDelivery) Payload() string { - return delivery.payload + return delivery.clearPayload } // blocking versions of the functions below with the following behavior: diff --git a/queue_test.go b/queue_test.go index fdc1ecd..21021d9 100644 --- a/queue_test.go +++ b/queue_test.go @@ -208,16 +208,21 @@ func TestConsumerCommon(t *testing.T) { assert.NoError(t, err) assert.Nil(t, consumer.Last()) - assert.NoError(t, queue1.Publish("cons-d1")) + assert.NoError(t, queue1.Publish(PayloadWithHeader("cons-d1", http.Header{"foo": []string{"bar1"}}))) eventuallyReady(t, queue1, 0) eventuallyUnacked(t, queue1, 1) require.NotNil(t, consumer.Last()) assert.Equal(t, "cons-d1", consumer.Last().Payload()) + assert.Equal(t, http.Header{"foo": []string{"bar1"}}, consumer.Last().(WithHeader).Header()) - assert.NoError(t, queue1.Publish("cons-d2")) + assert.NoError(t, queue1.Publish(PayloadWithHeader("cons-d2", http.Header{"foo": []string{"bar2"}}))) eventuallyReady(t, queue1, 0) eventuallyUnacked(t, queue1, 2) assert.Equal(t, "cons-d2", consumer.Last().Payload()) + assert.Equal(t, http.Header{"foo": []string{"bar2"}}, consumer.Last().(WithHeader).Header()) + + assert.Regexp(t, `\[cons-d2 rmq::connection::cons-conn-\w{6}::queue::\[cons-q\]::unacked\]`, + fmt.Sprintf("%s", consumer.Last())) assert.NoError(t, consumer.Deliveries()[0].Ack()) eventuallyReady(t, queue1, 0) @@ -229,11 +234,12 @@ func TestConsumerCommon(t *testing.T) { assert.Equal(t, ErrorNotFound, consumer.Deliveries()[0].Ack()) - assert.NoError(t, queue1.Publish("cons-d3")) + assert.NoError(t, queue1.Publish(PayloadWithHeader("cons-d3", http.Header{"foo": []string{"bar3"}}))) eventuallyReady(t, queue1, 0) eventuallyUnacked(t, queue1, 1) eventuallyRejected(t, queue1, 0) assert.Equal(t, "cons-d3", consumer.Last().Payload()) + assert.Equal(t, http.Header{"foo": []string{"bar3"}}, consumer.Last().(WithHeader).Header()) assert.NoError(t, consumer.Last().Reject()) eventuallyReady(t, queue1, 0) eventuallyUnacked(t, queue1, 0)