Skip to content

Commit 0df89bf

Browse files
committed
Extend test
Signed-off-by: Joni Collinge <jonathancollinge@live.com>
1 parent 8333864 commit 0df89bf

File tree

1 file changed

+78
-3
lines changed

1 file changed

+78
-3
lines changed

pubsub/azure/servicebus/topics/servicebus_test.go

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,51 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) {
204204
logger.NewLogger("test"),
205205
)
206206

207+
// Track processing times and active messages per session
208+
type messageProcessing struct {
209+
sessionID string
210+
messageID string
211+
seqNum int64
212+
startTime time.Time
213+
endTime time.Time
214+
msgIndex int
215+
}
216+
207217
var (
208218
mu sync.Mutex
209-
globalOrder []string // tracks session IDs in the order messages were received
219+
globalOrder []string
210220
sessionOrders = make(map[string][]int)
211221
concurrentHandlers atomic.Int32
212222
maxConcurrentHandlers atomic.Int32
223+
processingLog []messageProcessing
224+
activePerSession = make(map[string]int32)
225+
parallelViolations atomic.Int32
213226
)
214227

215228
handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
216229
msg := msgs[0]
217230
sessionID := *msg.SessionID
231+
startTime := time.Now()
232+
233+
// Track concurrent processing within the same session
234+
mu.Lock()
235+
activeCount := activePerSession[sessionID]
236+
if activeCount > 0 {
237+
// Another message from this session is already being processed
238+
parallelViolations.Add(1)
239+
t.Errorf("Session %s has %d messages processing in parallel at %v",
240+
sessionID, activeCount+1, startTime)
241+
}
242+
activePerSession[sessionID]++
243+
mu.Unlock()
218244

219245
current := concurrentHandlers.Add(1)
220-
defer concurrentHandlers.Add(-1)
246+
defer func() {
247+
concurrentHandlers.Add(-1)
248+
mu.Lock()
249+
activePerSession[sessionID]--
250+
mu.Unlock()
251+
}()
221252

222253
for {
223254
max := maxConcurrentHandlers.Load()
@@ -234,9 +265,19 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) {
234265

235266
time.Sleep(50 * time.Millisecond)
236267

268+
endTime := time.Now()
269+
237270
mu.Lock()
238271
globalOrder = append(globalOrder, sessionID)
239272
sessionOrders[sessionID] = append(sessionOrders[sessionID], msgIndex)
273+
processingLog = append(processingLog, messageProcessing{
274+
sessionID: sessionID,
275+
messageID: msg.MessageID,
276+
seqNum: *msg.SequenceNumber,
277+
startTime: startTime,
278+
endTime: endTime,
279+
msgIndex: msgIndex,
280+
})
240281
mu.Unlock()
241282

242283
return nil, nil
@@ -262,6 +303,10 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) {
262303

263304
wg.Wait()
264305

306+
// Verify no parallel session processing was detected
307+
assert.Equal(t, int32(0), parallelViolations.Load(),
308+
"N messages from the same session should process in parallel")
309+
265310
// Verify FIFO ordering per session
266311
for _, sessionID := range sessionIDs {
267312
order := sessionOrders[sessionID]
@@ -272,6 +317,36 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) {
272317
}
273318
}
274319

320+
// Verify strict ordering, message N+1 must start after message N ends
321+
sessionProcessingTimes := make(map[string][]messageProcessing)
322+
for _, proc := range processingLog {
323+
sessionProcessingTimes[proc.sessionID] = append(sessionProcessingTimes[proc.sessionID], proc)
324+
}
325+
326+
for sessionID, procs := range sessionProcessingTimes {
327+
require.Len(t, procs, messagesPerSession, "session %s should have all processing records", sessionID)
328+
329+
// Sort by message index to ensure we check in FIFO order
330+
sortedProcs := make([]messageProcessing, len(procs))
331+
for _, proc := range procs {
332+
sortedProcs[proc.msgIndex] = proc
333+
}
334+
335+
// Verify each message starts after the previous one completes
336+
for i := 1; i < len(sortedProcs); i++ {
337+
prev := sortedProcs[i-1]
338+
curr := sortedProcs[i]
339+
340+
assert.False(t, curr.startTime.Before(prev.endTime),
341+
"Session %s: message %d started at %v before message %d ended at %v (overlap detected)",
342+
sessionID, curr.msgIndex, curr.startTime, prev.msgIndex, prev.endTime)
343+
344+
// Also verify sequence numbers are strictly increasing
345+
assert.Equal(t, prev.seqNum+1, curr.seqNum,
346+
"Session %s: sequence numbers should be consecutive", sessionID)
347+
}
348+
}
349+
275350
// Verify concurrent handler limits
276351
assert.LessOrEqual(t, maxConcurrentHandlers.Load(), int32(maxConcurrentLimit),
277352
"concurrent handlers should not exceed configured maximum")
@@ -297,5 +372,5 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) {
297372
}
298373

299374
assert.True(t, hasInterleaving,
300-
"global order must show session interleaving, proving concurrent processing")
375+
"global order must show session interleaving, proving concurrent processing across sessions")
301376
}

0 commit comments

Comments
 (0)