Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Return BatchDisposition Errors with LockTokenIDs #129

Merged
merged 5 commits into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions batch_disposition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type (
Status MessageStatus
cursor int
}
// BatchDispositionError is an error which returns a collection of DispositionError.
BatchDispositionError struct {
Errors []DispositionError
}
// DispositionError is an error associated with a LockTokenID.
DispositionError struct {
LockTokenID *uuid.UUID
err error
}
)

const (
Expand All @@ -25,6 +34,23 @@ const (
Abort MessageStatus = MessageStatus(abandonedDisposition)
)

func (bde BatchDispositionError) Error() string {
msg := ""
if len(bde.Errors) != 0 {
msg = fmt.Sprintf("Operation failed, %d error(s) reported.", len(bde.Errors))
}
return msg
}

func (de DispositionError) Error() string {
return de.err.Error()
}

// UnWrap will return the private error.
func (de DispositionError) UnWrap() error {
return de.err
}

// Done communicates whether there are more messages remaining to be iterated over.
func (bdi *BatchDispositionIterator) Done() bool {
return len(bdi.LockTokenIDs) == bdi.cursor
Expand All @@ -39,7 +65,8 @@ func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) {
return uuid
}

func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) error {
func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) BatchDispositionError {
batchError := BatchDispositionError{}
for !bdi.Done() {
if uuid := bdi.Next(); uuid != nil {
m := &Message{
Expand All @@ -48,21 +75,24 @@ func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConn
m.ec = ec
err := m.sendDisposition(ctx, bdi.Status)
if err != nil {
return err
batchError.Errors = append(batchError.Errors, DispositionError{
LockTokenID: uuid,
err: err,
})
}
}
}
return nil
return batchError
}

// SendBatchDisposition updates the LockToken id to the desired status.
// SendBatchDisposition updates the LockTokenIDs to the disposition status.
func (q *Queue) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
span, ctx := q.startSpanFromContext(ctx, "sb.Queue.SendBatchDisposition")
defer span.Finish()
return iterator.doUpdate(ctx, q)
}

// SendBatchDisposition updates the LockToken id to the desired status.
// SendBatchDisposition updates the LockTokenIDs to the desired disposition status.
func (s *Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error {
span, ctx := s.startSpanFromContext(ctx, "sb.Subscription.SendBatchDisposition")
defer span.Finish()
Expand Down
12 changes: 10 additions & 2 deletions batch_disposition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package servicebus

import (
"context"
"fmt"
"testing"

"github.com/Azure/azure-amqp-common-go/uuid"
Expand Down Expand Up @@ -36,12 +37,19 @@ func TestBatchDispositionUnsupportedStatus(t *testing.T) {
id := uuid.UUID{}
bdi := BatchDispositionIterator{
LockTokenIDs: []*uuid.UUID{
&id,
&id, &id, &id,
},
Status: status,
}

subscription := Subscription{}
err := subscription.SendBatchDisposition(context.Background(), bdi)
assert.EqualErrorf(t, err, "unsupported bulk disposition status \"suspended\"", err.Error())
be := err.(BatchDispositionError)
assert.NotNil(t, be, fmt.Sprintf("Wrong error type %T", err))
assert.EqualErrorf(t, err, fmt.Sprintf("Operation failed, %d error(s) reported.", len(be.Errors)), err.Error())

for _, innerErr := range be.Errors {
assert.NotNil(t, innerErr.UnWrap(), "Unwrapped error is nil")
assert.EqualErrorf(t, innerErr, "unsupported bulk disposition status \"suspended\"", innerErr.Error())
}
}