Skip to content

Commit

Permalink
Merge pull request Azure#13 from Azure/disposition
Browse files Browse the repository at this point in the history
change handler to return disposition action rather than error
  • Loading branch information
devigned authored Jun 2, 2018
2 parents 089e807 + faddc66 commit f2bff6a
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 40 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ handleErr(err)

// Receive message from queue
listenHandle, err := q.Receive(context.Background(),
func(ctx context.Context, event *servicebus.Event) error {
fmt.Println(string(event.Data))
return nil
func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
fmt.Println(string(msg.Data))
return msg.Accept()
})
handleErr(err)
defer listenHandle.Close(context.Background)
Expand Down
4 changes: 2 additions & 2 deletions _examples/helloworld/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Message) error {
listenHandle, err := q.Receive(ctx, func(ctx context.Context, event *servicebus.Message) servicebus.DispositionAction {
text := string(event.Data)
if text == "exit\n" {
fmt.Println("Oh snap!! Someone told me to exit!")
exit <- *new(struct{})
} else {
fmt.Println(string(event.Data))
}
return nil
return event.Accept()
})
defer listenHandle.Close(context.Background())

Expand Down
132 changes: 117 additions & 15 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package servicebus
// SOFTWARE

import (
"context"

"pack.ag/amqp"
)

Expand All @@ -36,6 +38,28 @@ type (
GroupSequence *uint32
message *amqp.Message
}

// DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
DispositionAction func(ctx context.Context)

// MessageErrorCondition represents a well-known collection of AMQP errors
MessageErrorCondition string
)

// Error Conditions
const (
ErrorInternalError MessageErrorCondition = "amqp:internal-error"
ErrorNotFound MessageErrorCondition = "amqp:not-found"
ErrorUnauthorizedAccess MessageErrorCondition = "amqp:unauthorized-access"
ErrorDecodeError MessageErrorCondition = "amqp:decode-error"
ErrorResourceLimitExceeded MessageErrorCondition = "amqp:resource-limit-exceeded"
ErrorNotAllowed MessageErrorCondition = "amqp:not-allowed"
ErrorInvalidField MessageErrorCondition = "amqp:invalid-field"
ErrorNotImplemented MessageErrorCondition = "amqp:not-implemented"
ErrorResourceLocked MessageErrorCondition = "amqp:resource-locked"
ErrorPreconditionFailed MessageErrorCondition = "amqp:precondition-failed"
ErrorResourceDeleted MessageErrorCondition = "amqp:resource-deleted"
ErrorIllegalState MessageErrorCondition = "amqp:illegal-state"
)

// NewMessageFromString builds an Message from a string message
Expand All @@ -50,17 +74,95 @@ func NewMessage(data []byte) *Message {
}
}

// Accept will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (m *Message) Accept() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Accept")
defer span.Finish()

m.message.Accept()
}
}

// FailButRetry will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (m *Message) FailButRetry() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.FailButRetry")
defer span.Finish()

m.message.Modify(true, false, nil)
}
}

// FailButRetryElsewhere will notify Azure Service Bus the message failed but should be re-queued for deliver to any
// other link but this one.
func (m *Message) FailButRetryElsewhere() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.FailButRetryElsewhere")
defer span.Finish()

m.message.Modify(true, true, nil)
}
}

// Release will notify Azure Service Bus the message should be re-queued without failure.
func (m *Message) Release() DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Release")
defer span.Finish()

m.message.Release()
}
}

// Reject will notify Azure Service Bus the message failed and should not re-queued
func (m *Message) Reject(err error) DispositionAction {
return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.Reject")
defer span.Finish()

amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(ErrorInternalError),
Description: err.Error(),
}
m.message.Reject(&amqpErr)
}
}

// RejectWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context
func (m *Message) RejectWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction {
var info map[string]interface{}
if additionalData != nil {
info = make(map[string]interface{}, len(additionalData))
for key, val := range additionalData {
info[key] = val
}
}

return func(ctx context.Context) {
span, _ := m.startSpanFromContext(ctx, "sb.Message.RejectWithInfo")
defer span.Finish()

amqpErr := amqp.Error{
Condition: amqp.ErrorCondition(condition),
Description: err.Error(),
Info: info,
}
m.message.Reject(&amqpErr)
}
}

// Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker
func (e *Message) Set(key, value string) {
if e.Properties == nil {
e.Properties = make(map[string]interface{})
func (m *Message) Set(key, value string) {
if m.Properties == nil {
m.Properties = make(map[string]interface{})
}
e.Properties[key] = value
m.Properties[key] = value
}

// ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (e *Message) ForeachKey(handler func(key, val string) error) error {
for key, value := range e.Properties {
func (m *Message) ForeachKey(handler func(key, val string) error) error {
for key, value := range m.Properties {
err := handler(key, value.(string))
if err != nil {
return err
Expand All @@ -69,24 +171,24 @@ func (e *Message) ForeachKey(handler func(key, val string) error) error {
return nil
}

func (e *Message) toMsg() *amqp.Message {
msg := e.message
func (m *Message) toMsg() *amqp.Message {
msg := m.message
if msg == nil {
msg = amqp.NewMessage(e.Data)
msg = amqp.NewMessage(m.Data)
}

msg.Properties = &amqp.MessageProperties{
MessageID: e.ID,
MessageID: m.ID,
}

if e.GroupID != nil && e.GroupSequence != nil {
msg.Properties.GroupID = *e.GroupID
msg.Properties.GroupSequence = *e.GroupSequence
if m.GroupID != nil && m.GroupSequence != nil {
msg.Properties.GroupID = *m.GroupID
msg.Properties.GroupSequence = *m.GroupSequence
}

if len(e.Properties) > 0 {
if len(m.Properties) > 0 {
msg.ApplicationProperties = make(map[string]interface{})
for key, value := range e.Properties {
for key, value := range m.Properties {
msg.ApplicationProperties[key] = value
}
}
Expand Down
2 changes: 1 addition & 1 deletion namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
}

// Handler is the function signature for any receiver of AMQP messages
Handler func(context.Context, *Message) error
Handler func(context.Context, *Message) DispositionAction

// NamespaceOption provides structure for configuring a new Service Bus namespace
NamespaceOption func(h *Namespace) error
Expand Down
2 changes: 1 addition & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"context"
"encoding/xml"
"errors"
"io/ioutil"
"io/ioutil"
"net/http"
"sync"
"time"
Expand Down
10 changes: 5 additions & 5 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,11 @@ func testQueueSendAndReceiveInOrder(ctx context.Context, t *testing.T, queue *Qu
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
queue.Receive(ctx, func(ctx context.Context, event *Message) error {
queue.Receive(ctx, func(ctx context.Context, event *Message) DispositionAction {
assert.Equal(t, messages[count], string(event.Data))
count++
wg.Done()
return nil
return event.Accept()
})
end, _ := ctx.Deadline()
waitUntil(t, &wg, time.Until(end))
Expand Down Expand Up @@ -436,7 +436,7 @@ func testDuplicateDetection(ctx context.Context, t *testing.T, queue *Queue) {
var wg sync.WaitGroup
wg.Add(2)
received := make(map[interface{}]string)
queue.Receive(ctx, func(ctx context.Context, event *Message) error {
queue.Receive(ctx, func(ctx context.Context, event *Message) DispositionAction {
// we should get 2 messages discarding the duplicate ID
received[event.ID] = string(event.Data)
wg.Done()
Expand Down Expand Up @@ -498,13 +498,13 @@ func testQueueWithRequiredSessionSendAndReceive(ctx context.Context, t *testing.
wg.Add(numMessages)
// ensure in-order processing of messages from the queue
count := 0
handler := func(ctx context.Context, event *Message) error {
handler := func(ctx context.Context, event *Message) DispositionAction {
if !assert.Equal(t, messages[count], string(event.Data)) {
assert.FailNow(t, fmt.Sprintf("message %d %q didn't match %q", count, messages[count], string(event.Data)))
}
count++
wg.Done()
return nil
return event.Accept()
}
listenHandle, err := queue.Receive(ctx, handler, ReceiverWithSession(sessionID))
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ func (r *receiver) handleMessage(ctx context.Context, msg *amqp.Message, handler
id := messageID(msg)
span.SetTag("amqp.message-id", id)

err = handler(ctx, event)
if err != nil {
msg.Modify(true, false, nil)
log.For(ctx).Error(fmt.Errorf("message modify(true, false, nil): id: %v", id))
return
dispositionAction := handler(ctx, event)
if dispositionAction != nil {
dispositionAction(ctx)
} else {
log.For(ctx).Info(fmt.Sprintf("disposition action not provided auto accepted message id %q", id))
event.Accept()
}
msg.Accept()
}

func extractWireContext(reader opentracing.TextMapReader) (opentracing.SpanContext, error) {
Expand Down Expand Up @@ -236,7 +236,8 @@ func (r *receiver) newSessionAndLink(ctx context.Context) error {

opts := []amqp.LinkOption{
amqp.LinkSourceAddress(r.entityPath),
amqp.LinkCredit(100),
amqp.LinkCredit(10),
amqp.LinkReceiverSettle(amqp.ModeSecond),
}

if r.requiredSessionID != nil {
Expand Down
2 changes: 1 addition & 1 deletion sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *sender) newSessionAndLink(ctx context.Context) error {

amqpSender, err := amqpSession.NewSender(
amqp.LinkTargetAddress(s.getAddress()),
amqp.LinkReceiverSettle(amqp.ModeSecond))
amqp.LinkSenderSettle(amqp.ModeUnsettled))
if err != nil {
log.For(ctx).Error(err)
return err
Expand Down
6 changes: 3 additions & 3 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"context"
"encoding/xml"
"errors"
"io/ioutil"
"io/ioutil"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -141,7 +141,7 @@ func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...Sub
AtomSchema: atomSchema,
},
Content: &subscriptionContent{
Type: applicationXML,
Type: applicationXML,
SubscriptionDescription: *sd,
},
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*Subscript
func subscriptionEntryToEntity(entry *subscriptionEntry) *SubscriptionEntity {
return &SubscriptionEntity{
SubscriptionDescription: &entry.Content.SubscriptionDescription,
Name: entry.Title,
Name: entry.Title,
}
}

Expand Down
4 changes: 2 additions & 2 deletions subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func testSubscriptionReceive(ctx context.Context, t *testing.T, topic *Topic, su

var wg sync.WaitGroup
wg.Add(1)
_, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Message) error {
_, err = sub.Receive(ctx, func(eventCtx context.Context, evt *Message) DispositionAction {
wg.Done()
return nil
return evt.Accept()
})
if err != nil {
t.Fatal(err)
Expand Down
13 changes: 13 additions & 0 deletions tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ func (ns *Namespace) startSpanFromContext(ctx context.Context, operationName str
return span, ctx
}

func (m *Message) startSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...)
ApplyComponentInfo(span)
span.SetTag("amqp.message-id", m.ID)
if m.GroupID != nil {
span.SetTag("amqp.message-group-id", *m.GroupID)
}
if m.GroupSequence != nil {
span.SetTag("amqp.message-group-sequence", *m.GroupSequence)
}
return span, ctx
}

func (em *EntityManager) startSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, operationName, opts...)
ApplyComponentInfo(span)
Expand Down

0 comments on commit f2bff6a

Please sign in to comment.