Skip to content

Commit

Permalink
Merge pull request #147 from adjust/reject-faulty-delivery
Browse files Browse the repository at this point in the history
reject a faulty delivery logic
  • Loading branch information
mariaefi29 authored Jun 23, 2023
2 parents 7bdfe78 + c97e3e0 commit 52c05b0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 39 deletions.
28 changes: 0 additions & 28 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,6 @@ func (delivery *redisDelivery) Header() http.Header {
return delivery.header
}

func newDelivery(
ctx context.Context,
payload string,
unackedKey string,
rejectedKey string,
pushKey string,
redisClient RedisClient,
errChan chan<- error,
) (*redisDelivery, error) {
rd := redisDelivery{
ctx: ctx,
payload: payload,
unackedKey: unackedKey,
rejectedKey: rejectedKey,
pushKey: pushKey,
redisClient: redisClient,
errChan: errChan,
}

var err error

if rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload); err != nil {
return nil, err
}

return &rd, nil
}

func (delivery *redisDelivery) String() string {
return fmt.Sprintf("[%s %s]", delivery.clearPayload, delivery.unackedKey)
}
Expand Down
35 changes: 24 additions & 11 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,13 @@ func (queue *redisQueue) consumeBatch() error {
if err == ErrorNotFound {
return nil
}

if err != nil {
return err
}

d, err := queue.newDelivery(payload)
if err != nil {
return err
return fmt.Errorf("create new delivery: %w", err)
}

queue.deliveryChan <- d
Expand All @@ -229,15 +228,29 @@ func (queue *redisQueue) consumeBatch() error {
}

func (queue *redisQueue) newDelivery(payload string) (Delivery, error) {
return newDelivery(
queue.ackCtx,
payload,
queue.unackedKey,
queue.rejectedKey,
queue.pushKey,
queue.redisClient,
queue.errChan,
)
rd := &redisDelivery{
ctx: queue.ackCtx,
payload: payload,
unackedKey: queue.unackedKey,
rejectedKey: queue.rejectedKey,
pushKey: queue.pushKey,
redisClient: queue.redisClient,
errChan: queue.errChan,
}

var err error
rd.header, rd.clearPayload, err = ExtractHeaderAndPayload(payload)
if err == nil {
return rd, nil
}

// we need to reject a delivery here to move the delivery from the unacked to the rejected list.
rejectErr := rd.Reject()
if rejectErr != nil {
return nil, fmt.Errorf("%s, reject faulty delivery: %w", err, rejectErr)
}

return nil, err
}

// StopConsuming can be used to stop all consumers on this queue. It returns a
Expand Down
40 changes: 40 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,46 @@ func TestReturnRejected(t *testing.T) {
eventuallyRejected(t, queue, 0)
}

func TestRejectFaultyMessages(t *testing.T) {
redisAddr, closer := testRedis(t)
defer closer()

connection, err := OpenConnection("faulty-conn", "tcp", redisAddr, 1, nil)
require.NoError(t, err)
queue, err := connection.OpenQueue("faulty-q")
require.NoError(t, err)
_, err = queue.PurgeReady()
require.NoError(t, err)

for i := 0; i < 6; i++ {
// if there is no line separator after the header in the message,
// it will lead to an error and the message will be rejected
err := queue.Publish(fmt.Sprintf("%sreturn-d%d", jsonHeaderSignature, i))
require.NoError(t, err)
}

eventuallyReady(t, queue, 6)
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 0)

require.NoError(t, queue.StartConsuming(10, time.Millisecond))
eventuallyReady(t, queue, 0)
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 6)

consumer := NewTestConsumer("faulty-cons")
consumer.AutoAck = false
_, err = queue.AddConsumer("cons", consumer)
require.NoError(t, err)
eventuallyReady(t, queue, 0)
eventuallyUnacked(t, queue, 0)
eventuallyRejected(t, queue, 6)

require.Len(t, consumer.Deliveries(), 0)

<-queue.StopConsuming()
}

func TestPushQueue(t *testing.T) {
redisAddr, closer := testRedis(t)
defer closer()
Expand Down

0 comments on commit 52c05b0

Please sign in to comment.