Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions common/component/azure/servicebus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,17 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle
continue
}

// Handle the messages in background
go s.handleAsync(ctx, msgs, handler, receiver)
// If we require sessions then we must process the message
// synchronously to ensure the FIFO order is maintained.
// This is considered safe as even when using bulk receives,
// the messages are merged into a single request to the app
// containing multiple messages and thus it becomes an app
// concern to process them in order.
if s.requireSessions {
s.handleMessages(ctx, msgs, handler, receiver)
} else {
go s.handleMessages(ctx, msgs, handler, receiver)
}
}
}

Expand Down Expand Up @@ -393,8 +402,8 @@ func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver
}
}

// handleAsync handles messages from azure service bus and is meant to be called in a goroutine (go s.handleAsync).
func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) {
// handleMessages handles messages from azure service bus and can be called synchronously or asynchronously depending on order requirements.
func (s *Subscription) handleMessages(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) {
var (
consumeToken bool
takenConcurrentHandler bool
Expand Down
Loading
Loading