Skip to content

Commit 298ae26

Browse files
committed
Expand comments
Signed-off-by: Joni Collinge <jonathancollinge@live.com>
1 parent 0df89bf commit 298ae26

File tree

2 files changed

+102
-13
lines changed

2 files changed

+102
-13
lines changed

common/component/azure/servicebus/subscription.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,14 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle
275275

276276
// If we require sessions then we must process the message
277277
// synchronously to ensure the FIFO order is maintained.
278+
// This is considered safe as even when using bulk receives,
279+
// the messages are merged into a single request to the app
280+
// containing multiple messages and thus it becomes an app
281+
// concern to process them in order.
278282
if s.requireSessions {
279-
s.handleAsync(ctx, msgs, handler, receiver)
283+
s.handleMessages(ctx, msgs, handler, receiver)
280284
} else {
281-
go s.handleAsync(ctx, msgs, handler, receiver)
285+
go s.handleMessages(ctx, msgs, handler, receiver)
282286
}
283287
}
284288
}
@@ -398,8 +402,8 @@ func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver
398402
}
399403
}
400404

401-
// handleAsync handles messages from azure service bus and is meant to be called in a goroutine (go s.handleAsync).
402-
func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) {
405+
// handleMessages handles messages from azure service bus and can be called synchronously or asynchronously depending on order requirements.
406+
func (s *Subscription) handleMessages(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) {
403407
var (
404408
consumeToken bool
405409
takenConcurrentHandler bool

tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go

Lines changed: 94 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"regexp"
2020
"strconv"
2121
"sync"
22+
"sync/atomic"
2223
"testing"
2324
"time"
2425

@@ -1085,10 +1086,16 @@ func TestServicebusWithSessionsFIFO(t *testing.T) {
10851086

10861087
sessionWatcher := watcher.NewOrdered()
10871088

1089+
// Track active messages per session to ensure no parallel processing within a session
1090+
var (
1091+
fifoMu sync.Mutex
1092+
fifoActivePerSess = make(map[string]int)
1093+
fifoParallelIssues atomic.Int32
1094+
)
1095+
10881096
// subscriber of the given topic
10891097
subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
10901098
return func(ctx flow.Context, s common.Service) error {
1091-
// Setup the /orders event handler.
10921099
return multierr.Combine(
10931100
s.AddTopicEventHandler(&common.Subscription{
10941101
PubsubName: pubsubName,
@@ -1099,9 +1106,31 @@ func TestServicebusWithSessionsFIFO(t *testing.T) {
10991106
"maxConcurrentSessions": "1",
11001107
},
11011108
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
1102-
// Track/Observe the data of the event.
1109+
// Extract session ID (if present) to track concurrency
1110+
var sessionID string
1111+
if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 {
1112+
sessionID = m[1]
1113+
}
1114+
1115+
fifoMu.Lock()
1116+
active := fifoActivePerSess[sessionID]
1117+
if active > 0 {
1118+
fifoParallelIssues.Add(1)
1119+
ctx.Logf("Session %s already has %d active messages", sessionID, active)
1120+
}
1121+
fifoActivePerSess[sessionID] = active + 1
1122+
fifoMu.Unlock()
1123+
1124+
// Simulate handler work to widen potential overlap window
1125+
time.Sleep(20 * time.Millisecond)
1126+
11031127
messagesWatcher.Observe(e.Data)
11041128
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
1129+
1130+
fifoMu.Lock()
1131+
fifoActivePerSess[sessionID]--
1132+
fifoMu.Unlock()
1133+
11051134
return false, nil
11061135
}),
11071136
)
@@ -1222,6 +1251,9 @@ func TestServicebusWithSessionsFIFO(t *testing.T) {
12221251
if !assert.Equal(t, ordered, observed) {
12231252
t.Errorf("expected: %v, observed: %v", ordered, observed)
12241253
}
1254+
1255+
// Assert no parallel violations within the single session
1256+
assert.Equal(t, int32(0), fifoParallelIssues.Load(), "no parallel processing within a session expected")
12251257
}
12261258

12271259
return nil
@@ -1266,8 +1298,10 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) {
12661298
sessionWatcher := watcher.NewUnordered()
12671299

12681300
var (
1269-
mu sync.Mutex
1270-
globalOrder []string // tracks session IDs in the order messages were received
1301+
mu sync.Mutex
1302+
globalOrder []string // tracks session IDs in the order messages were received
1303+
activePerSession = make(map[string]int)
1304+
parallelIssues atomic.Int32
12711305
)
12721306

12731307
subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
@@ -1284,17 +1318,34 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) {
12841318
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
12851319
messagesWatcher.Observe(e.Data)
12861320

1287-
// Track session ID in global order
1321+
// Track session ID and enforce single in-flight per session
12881322
match := sessionIDRegex.FindStringSubmatch(string(e.Data))
1289-
if len(match) > 0 {
1290-
sessionID := match[1]
1323+
var sessionID string
1324+
if len(match) > 1 {
1325+
sessionID = match[1]
12911326
mu.Lock()
1327+
inflight := activePerSession[sessionID]
1328+
if inflight > 0 {
1329+
parallelIssues.Add(1)
1330+
ctx.Logf("Session %s already has %d active messages", sessionID, inflight)
1331+
}
1332+
activePerSession[sessionID] = inflight + 1
12921333
globalOrder = append(globalOrder, sessionID)
12931334
mu.Unlock()
12941335
}
12951336

1337+
// Simulate work
1338+
time.Sleep(30 * time.Millisecond)
1339+
12961340
ctx.Logf("Message Received appID: %s, pubsub: %s, topic: %s, id: %s, data: %s",
12971341
appID, e.PubsubName, e.Topic, e.ID, e.Data)
1342+
1343+
if sessionID != "" {
1344+
mu.Lock()
1345+
activePerSession[sessionID]--
1346+
mu.Unlock()
1347+
}
1348+
12981349
return false, nil
12991350
}),
13001351
)
@@ -1415,6 +1466,9 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) {
14151466

14161467
ctx.Logf("Successfully processed %d sessions concurrently with FIFO ordering maintained",
14171468
len(sessionMessages))
1469+
1470+
// Assert no parallel violations within a single session
1471+
assert.Equal(t, int32(0), parallelIssues.Load(), "no parallel processing within a session expected")
14181472
}
14191473
}
14201474
return nil
@@ -1454,10 +1508,16 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) {
14541508

14551509
sessionWatcher := watcher.NewUnordered()
14561510

1511+
// Concurrency tracking for round-robin scenario
1512+
var (
1513+
rrMu sync.Mutex
1514+
rrActivePerSess = make(map[string]int)
1515+
rrParallelIssues atomic.Int32
1516+
)
1517+
14571518
// subscriber of the given topic
14581519
subscriberApplicationWithSessions := func(appID string, topicName string, messageWatcher *watcher.Watcher) app.SetupFn {
14591520
return func(ctx flow.Context, s common.Service) error {
1460-
// Setup the /orders event handler.
14611521
return multierr.Combine(
14621522
s.AddTopicEventHandler(&common.Subscription{
14631523
PubsubName: pubsubName,
@@ -1469,9 +1529,31 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) {
14691529
"sessionIdleTimeoutInSec": "2", // timeout and try another session
14701530
},
14711531
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
1472-
// Track/Observe the data of the event.
1532+
// Extract session ID
1533+
var sessionID string
1534+
if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 {
1535+
sessionID = m[1]
1536+
}
1537+
1538+
rrMu.Lock()
1539+
active := rrActivePerSess[sessionID]
1540+
if active > 0 {
1541+
rrParallelIssues.Add(1)
1542+
ctx.Logf("Session %s already has %d active messages", sessionID, active)
1543+
}
1544+
rrActivePerSess[sessionID] = active + 1
1545+
rrMu.Unlock()
1546+
1547+
// Simulate work
1548+
time.Sleep(15 * time.Millisecond)
1549+
14731550
messageWatcher.Observe(e.Data)
14741551
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
1552+
1553+
rrMu.Lock()
1554+
rrActivePerSess[sessionID]--
1555+
rrMu.Unlock()
1556+
14751557
return false, nil
14761558
}),
14771559
)
@@ -1529,6 +1611,9 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) {
15291611
m.Assert(ctx, 25*timeout)
15301612
}
15311613

1614+
// Assert no parallel violations
1615+
assert.Equal(t, int32(0), rrParallelIssues.Load(), "no parallel processing within a session expected")
1616+
15321617
return nil
15331618
}
15341619
}

0 commit comments

Comments
 (0)