From 6f98580d27bc677dd779ae6269d9dcb50c65ef55 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Thu, 13 Nov 2025 16:07:07 +0000 Subject: [PATCH 1/8] Ensure sessions are processed with fifo even if concurrent Signed-off-by: Joni Collinge --- .../azure/servicebus/subscription.go | 9 +- .../servicebus/topics/servicebus_test.go | 301 ++++++++++++++++++ .../servicebus/topics/servicebus_test.go | 188 +++++++++++ 3 files changed, 496 insertions(+), 2 deletions(-) create mode 100644 pubsub/azure/servicebus/topics/servicebus_test.go diff --git a/common/component/azure/servicebus/subscription.go b/common/component/azure/servicebus/subscription.go index ecdf7c00bb..c8a1b4aa88 100644 --- a/common/component/azure/servicebus/subscription.go +++ b/common/component/azure/servicebus/subscription.go @@ -273,8 +273,13 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle continue } - // Handle the messages in background - go s.handleAsync(ctx, msgs, handler, receiver) + // If we require sessions then we must process the message + // synchronously to ensure the FIFO order is maintained. + if s.requireSessions { + s.handleAsync(ctx, msgs, handler, receiver) + } else { + go s.handleAsync(ctx, msgs, handler, receiver) + } } } diff --git a/pubsub/azure/servicebus/topics/servicebus_test.go b/pubsub/azure/servicebus/topics/servicebus_test.go new file mode 100644 index 0000000000..971c2f397e --- /dev/null +++ b/pubsub/azure/servicebus/topics/servicebus_test.go @@ -0,0 +1,301 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topics + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + impl "github.com/dapr/components-contrib/common/component/azure/servicebus" + "github.com/dapr/kit/logger" + "github.com/dapr/kit/ptr" +) + +type mockReceiver struct { + messages []*azservicebus.ReceivedMessage + messageIndex int + sessionID string + mu sync.Mutex + closed bool +} + +func newMockReceiver(sessionID string, messages []*azservicebus.ReceivedMessage) *mockReceiver { + return &mockReceiver{ + sessionID: sessionID, + messages: messages, + } +} + +func (m *mockReceiver) ReceiveMessages(ctx context.Context, count int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + return nil, errors.New("receiver closed") + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + if m.messageIndex >= len(m.messages) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(100 * time.Millisecond): + return nil, errors.New("no more messages") + } + } + + end := m.messageIndex + count + if end > len(m.messages) { + end = len(m.messages) + } + + result := m.messages[m.messageIndex:end] + m.messageIndex = end + return result, nil +} + +func (m *mockReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error { + return nil +} + +func (m *mockReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error { + return nil +} + +func (m *mockReceiver) Close(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + m.closed = true + return nil +} + +func TestSessionOrderingWithSingleHandler(t *testing.T) { + const numMessages = 10 + sessionID := "test-session-1" + + messages := make([]*azservicebus.ReceivedMessage, numMessages) + for i := 0; i < numMessages; i++ { + seqNum := int64(i + 1) + messages[i] = &azservicebus.ReceivedMessage{ + MessageID: fmt.Sprintf("msg-%d", i), + SessionID: &sessionID, + SequenceNumber: &seqNum, + Body: []byte(fmt.Sprintf("message-%d", i)), + } + } + + sub := impl.NewSubscription( + impl.SubscriptionOptions{ + MaxActiveMessages: 100, + TimeoutInSec: 5, + MaxBulkSubCount: ptr.Of(1), + MaxConcurrentHandlers: 1, + Entity: "test-topic", + LockRenewalInSec: 30, + RequireSessions: true, + SessionIdleTimeout: time.Second * 5, + }, + logger.NewLogger("test"), + ) + + var ( + processedOrder []int + orderMu sync.Mutex + ) + + handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) { + var msgIndex int + _, err := fmt.Sscanf(string(msgs[0].Body), "message-%d", &msgIndex) + require.NoError(t, err) + + time.Sleep(10 * time.Millisecond) + + orderMu.Lock() + processedOrder = append(processedOrder, msgIndex) + orderMu.Unlock() + + return nil, nil + } + + receiver := newMockReceiver(sessionID, messages) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, "test-session") + }() + + <-done + + expectedOrder := make([]int, numMessages) + for i := range expectedOrder { + expectedOrder[i] = i + } + + assert.Equal(t, expectedOrder, processedOrder, "messages must be processed in order") +} + +func TestMultipleSessionsConcurrentHandler(t *testing.T) { + const ( + numSessions = 5 + messagesPerSession = 10 + maxConcurrentLimit = 3 + ) + + sessionIDs := make([]string, numSessions) + for i := 0; i < numSessions; i++ { + sessionIDs[i] = fmt.Sprintf("session-%d", i) + } + + allMessages := make(map[string][]*azservicebus.ReceivedMessage) + for _, sessionID := range sessionIDs { + messages := make([]*azservicebus.ReceivedMessage, messagesPerSession) + for i := 0; i < messagesPerSession; i++ { + seqNum := int64(i + 1) + sessID := sessionID + messages[i] = &azservicebus.ReceivedMessage{ + MessageID: fmt.Sprintf("%s-msg-%d", sessionID, i), + SessionID: &sessID, + SequenceNumber: &seqNum, + Body: []byte(fmt.Sprintf("%s:%d", sessionID, i)), + } + } + allMessages[sessionID] = messages + } + + sub := impl.NewSubscription( + impl.SubscriptionOptions{ + MaxActiveMessages: 100, + TimeoutInSec: 5, + MaxBulkSubCount: ptr.Of(1), + MaxConcurrentHandlers: maxConcurrentLimit, + Entity: "test-topic", + LockRenewalInSec: 30, + RequireSessions: true, + SessionIdleTimeout: time.Second * 5, + }, + logger.NewLogger("test"), + ) + + var ( + mu sync.Mutex + globalOrder []string // tracks session IDs in the order messages were received + sessionOrders = make(map[string][]int) + concurrentHandlers atomic.Int32 + maxConcurrentHandlers atomic.Int32 + ) + + handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) { + msg := msgs[0] + sessionID := *msg.SessionID + + current := concurrentHandlers.Add(1) + defer concurrentHandlers.Add(-1) + + for { + max := maxConcurrentHandlers.Load() + if current <= max || maxConcurrentHandlers.CompareAndSwap(max, current) { + break + } + } + + var msgIndex int + parts := strings.Split(string(msg.Body), ":") + require.Len(t, parts, 2) + _, err := fmt.Sscanf(parts[1], "%d", &msgIndex) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) + + mu.Lock() + globalOrder = append(globalOrder, sessionID) + sessionOrders[sessionID] = append(sessionOrders[sessionID], msgIndex) + mu.Unlock() + + return nil, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var wg sync.WaitGroup + for _, sessionID := range sessionIDs { + wg.Add(1) + go func() { + defer wg.Done() + receiver := newMockReceiver(sessionID, allMessages[sessionID]) + done := make(chan struct{}) + go func() { + defer close(done) + _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, fmt.Sprintf("session-%s", sessionID)) + }() + <-done + }() + } + + wg.Wait() + + // Verify FIFO ordering per session + for _, sessionID := range sessionIDs { + order := sessionOrders[sessionID] + require.Len(t, order, messagesPerSession, "session %s should process all messages", sessionID) + + for i := 0; i < messagesPerSession; i++ { + assert.Equal(t, i, order[i], "session %s message %d out of order", sessionID, i) + } + } + + // Verify concurrent handler limits + assert.LessOrEqual(t, maxConcurrentHandlers.Load(), int32(maxConcurrentLimit), + "concurrent handlers should not exceed configured maximum") + + assert.Greater(t, maxConcurrentHandlers.Load(), int32(1), + "multiple handlers should run concurrently across sessions") + + // Check global order to prove concurrent processing + // If processed sequentially, all messages from one session would come before the next + // If processed concurrently, session IDs will be interleaved + hasInterleaving := false + seenSessions := make(map[string]bool) + lastSession := "" + + for _, sessionID := range globalOrder { + if sessionID != lastSession && seenSessions[sessionID] { + // We've seen this session before but with a different session in between + hasInterleaving = true + break + } + seenSessions[sessionID] = true + lastSession = sessionID + } + + assert.True(t, hasInterleaving, + "global order must show session interleaving, proving concurrent processing") +} diff --git a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go index de47f202f1..3c93802095 100644 --- a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go @@ -18,6 +18,7 @@ import ( "fmt" "regexp" "strconv" + "sync" "testing" "time" @@ -1256,6 +1257,193 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { Run() } +// TestServicebusWithConcurrentSessionsFIFO validates that multiple sessions can be +// processed concurrently while each session maintains strict FIFO ordering. +func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { + topic := "sessions-concurrent-fifo" + numSessions := 5 + + sessionWatcher := watcher.NewUnordered() + + var ( + mu sync.Mutex + globalOrder []string // tracks session IDs in the order messages were received + ) + + subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { + return func(ctx flow.Context, s common.Service) error { + return multierr.Combine( + s.AddTopicEventHandler(&common.Subscription{ + PubsubName: pubsubName, + Topic: topicName, + Route: "/orders", + Metadata: map[string]string{ + "requireSessions": "true", + "maxConcurrentSessions": "5", + }, + }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { + messagesWatcher.Observe(e.Data) + + // Track session ID in global order + match := sessionIDRegex.FindStringSubmatch(string(e.Data)) + if len(match) > 0 { + sessionID := match[1] + mu.Lock() + globalOrder = append(globalOrder, sessionID) + mu.Unlock() + } + + ctx.Logf("Message Received appID: %s, pubsub: %s, topic: %s, id: %s, data: %s", + appID, e.PubsubName, e.Topic, e.ID, e.Data) + return false, nil + }), + ) + } + } + + publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + messages := make([]string, numMessages) + for i := range messages { + var msgSuffix string + if metadata["SessionId"] != "" { + msgSuffix = fmt.Sprintf(", sessionId: %s", metadata["SessionId"]) + } + messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s%s", + metadata[messageKey], topicName, i, uuid.New().String(), msgSuffix) + } + + for _, messageWatcher := range messageWatchers { + messageWatcher.ExpectStrings(messages...) + } + + client := sidecar.GetClient(ctx, sidecarName) + ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName) + + var publishOptions dapr.PublishEventOption + if metadata != nil { + publishOptions = dapr.PublishEventWithMetadata(metadata) + } + + for _, message := range messages { + ctx.Logf("Publishing: %q", message) + var err error + if publishOptions != nil { + err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions) + } else { + err = client.PublishEvent(ctx, pubsubName, topicName, message) + } + require.NoError(ctx, err, "error publishing message") + } + return nil + } + } + + assertMessages := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable { + return func(ctx flow.Context) error { + for _, m := range messageWatchers { + _, exp, obs := m.Partial(ctx, timeout) + + var observed []string + if obs != nil { + for _, v := range obs.([]interface{}) { + observed = append(observed, v.(string)) + } + } + var expected []string + if exp != nil { + for _, v := range exp.([]interface{}) { + expected = append(expected, v.(string)) + } + } + + // Group messages by session + sessionMessages := make(map[string][]string) + for _, msg := range observed { + match := sessionIDRegex.FindStringSubmatch(msg) + if len(match) > 0 { + sessionID := match[1] + sessionMessages[sessionID] = append(sessionMessages[sessionID], msg) + } else { + t.Error("session id not found in message") + } + } + + require.Greater(t, len(sessionMessages), 1, + "should receive messages from multiple sessions concurrently") + + // Verify FIFO ordering per session + for sessionID, msgs := range sessionMessages { + var expectedForSession []string + for _, msg := range expected { + match := sessionIDRegex.FindStringSubmatch(msg) + if len(match) > 0 && match[1] == sessionID { + expectedForSession = append(expectedForSession, msg) + } + } + + require.Equal(t, expectedForSession, msgs, + "session %s messages must be in FIFO order", sessionID) + } + + // Check global order to prove concurrent processing + // If processed sequentially, all messages from one session would come before the next + // If processed concurrently, session IDs will be interleaved + mu.Lock() + orderCopy := make([]string, len(globalOrder)) + copy(orderCopy, globalOrder) + mu.Unlock() + + if len(orderCopy) > 1 { + // Check if we have session interleaving + hasInterleaving := false + seenSessions := make(map[string]bool) + lastSession := "" + + for _, sessionID := range orderCopy { + if sessionID != lastSession && seenSessions[sessionID] { + // We've seen this session before but with a different session in between + hasInterleaving = true + break + } + seenSessions[sessionID] = true + lastSession = sessionID + } + + require.True(t, hasInterleaving, + "global order must show session interleaving, proving concurrent processing") + + ctx.Logf("Successfully processed %d sessions concurrently with FIFO ordering maintained", + len(sessionMessages)) + } + } + return nil + } + } + + f := flow.New(t, "servicebus certification concurrent sessions FIFO test"). + Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), + subscriberApplicationWithSessions(appID1, topic, sessionWatcher))). + Step(sidecar.Run(sidecarName1, + append(componentRuntimeOptions(), + embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)), + embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)), + embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)), + )..., + )) + + for i := 0; i < numSessions; i++ { + sessionID := fmt.Sprintf("session-%d", i) + f = f.Step(fmt.Sprintf("publish messages to %s", sessionID), + publishMessages(map[string]string{"SessionId": sessionID}, sidecarName1, topic, sessionWatcher)) + } + + f.Step("verify all sessions processed with FIFO ordering and concurrency", assertMessages(30*time.Second, sessionWatcher)). + Step("reset", flow.Reset(sessionWatcher)). + Run() +} + // TestServicebusWithSessionsRoundRobin tests that if we publish messages to the same // topic but with 2 different session ids (session1 and session2), then eventually // the receiver will receive messages from both the sessions. From 94fca15813d1c8235352982d02ae402f6349441a Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Thu, 13 Nov 2025 16:48:09 +0000 Subject: [PATCH 2/8] lint Signed-off-by: Joni Collinge --- pubsub/azure/servicebus/topics/servicebus_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pubsub/azure/servicebus/topics/servicebus_test.go b/pubsub/azure/servicebus/topics/servicebus_test.go index 971c2f397e..e8a5d6a1b8 100644 --- a/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/pubsub/azure/servicebus/topics/servicebus_test.go @@ -98,7 +98,7 @@ func TestSessionOrderingWithSingleHandler(t *testing.T) { sessionID := "test-session-1" messages := make([]*azservicebus.ReceivedMessage, numMessages) - for i := 0; i < numMessages; i++ { + for i := range numMessages { seqNum := int64(i + 1) messages[i] = &azservicebus.ReceivedMessage{ MessageID: fmt.Sprintf("msg-%d", i), @@ -143,7 +143,7 @@ func TestSessionOrderingWithSingleHandler(t *testing.T) { receiver := newMockReceiver(sessionID, messages) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) defer cancel() done := make(chan struct{}) @@ -170,14 +170,14 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { ) sessionIDs := make([]string, numSessions) - for i := 0; i < numSessions; i++ { + for i := range numSessions { sessionIDs[i] = fmt.Sprintf("session-%d", i) } allMessages := make(map[string][]*azservicebus.ReceivedMessage) for _, sessionID := range sessionIDs { messages := make([]*azservicebus.ReceivedMessage, messagesPerSession) - for i := 0; i < messagesPerSession; i++ { + for i := range messagesPerSession { seqNum := int64(i + 1) sessID := sessionID messages[i] = &azservicebus.ReceivedMessage{ @@ -242,7 +242,7 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { return nil, nil } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) defer cancel() var wg sync.WaitGroup @@ -254,7 +254,7 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, fmt.Sprintf("session-%s", sessionID)) + _ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, "session-"+sessionID) }() <-done }() From 8333864c672163c2a494b5ffe625f568b8905e1e Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Fri, 14 Nov 2025 17:11:43 +0000 Subject: [PATCH 3/8] lint Signed-off-by: Joni Collinge --- pubsub/azure/servicebus/topics/servicebus_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/azure/servicebus/topics/servicebus_test.go b/pubsub/azure/servicebus/topics/servicebus_test.go index e8a5d6a1b8..d437b91302 100644 --- a/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/pubsub/azure/servicebus/topics/servicebus_test.go @@ -267,7 +267,7 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { order := sessionOrders[sessionID] require.Len(t, order, messagesPerSession, "session %s should process all messages", sessionID) - for i := 0; i < messagesPerSession; i++ { + for i := range messagesPerSession { assert.Equal(t, i, order[i], "session %s message %d out of order", sessionID, i) } } From 0df89bfbb120984217d6ba7bfada10e393dec4ab Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Sat, 15 Nov 2025 07:51:24 +0000 Subject: [PATCH 4/8] Extend test Signed-off-by: Joni Collinge --- .../servicebus/topics/servicebus_test.go | 81 ++++++++++++++++++- 1 file changed, 78 insertions(+), 3 deletions(-) diff --git a/pubsub/azure/servicebus/topics/servicebus_test.go b/pubsub/azure/servicebus/topics/servicebus_test.go index d437b91302..c59b115ce7 100644 --- a/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/pubsub/azure/servicebus/topics/servicebus_test.go @@ -204,20 +204,51 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { logger.NewLogger("test"), ) + // Track processing times and active messages per session + type messageProcessing struct { + sessionID string + messageID string + seqNum int64 + startTime time.Time + endTime time.Time + msgIndex int + } + var ( mu sync.Mutex - globalOrder []string // tracks session IDs in the order messages were received + globalOrder []string sessionOrders = make(map[string][]int) concurrentHandlers atomic.Int32 maxConcurrentHandlers atomic.Int32 + processingLog []messageProcessing + activePerSession = make(map[string]int32) + parallelViolations atomic.Int32 ) handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) { msg := msgs[0] sessionID := *msg.SessionID + startTime := time.Now() + + // Track concurrent processing within the same session + mu.Lock() + activeCount := activePerSession[sessionID] + if activeCount > 0 { + // Another message from this session is already being processed + parallelViolations.Add(1) + t.Errorf("Session %s has %d messages processing in parallel at %v", + sessionID, activeCount+1, startTime) + } + activePerSession[sessionID]++ + mu.Unlock() current := concurrentHandlers.Add(1) - defer concurrentHandlers.Add(-1) + defer func() { + concurrentHandlers.Add(-1) + mu.Lock() + activePerSession[sessionID]-- + mu.Unlock() + }() for { max := maxConcurrentHandlers.Load() @@ -234,9 +265,19 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { time.Sleep(50 * time.Millisecond) + endTime := time.Now() + mu.Lock() globalOrder = append(globalOrder, sessionID) sessionOrders[sessionID] = append(sessionOrders[sessionID], msgIndex) + processingLog = append(processingLog, messageProcessing{ + sessionID: sessionID, + messageID: msg.MessageID, + seqNum: *msg.SequenceNumber, + startTime: startTime, + endTime: endTime, + msgIndex: msgIndex, + }) mu.Unlock() return nil, nil @@ -262,6 +303,10 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { wg.Wait() + // Verify no parallel session processing was detected + assert.Equal(t, int32(0), parallelViolations.Load(), + "N messages from the same session should process in parallel") + // Verify FIFO ordering per session for _, sessionID := range sessionIDs { order := sessionOrders[sessionID] @@ -272,6 +317,36 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { } } + // Verify strict ordering, message N+1 must start after message N ends + sessionProcessingTimes := make(map[string][]messageProcessing) + for _, proc := range processingLog { + sessionProcessingTimes[proc.sessionID] = append(sessionProcessingTimes[proc.sessionID], proc) + } + + for sessionID, procs := range sessionProcessingTimes { + require.Len(t, procs, messagesPerSession, "session %s should have all processing records", sessionID) + + // Sort by message index to ensure we check in FIFO order + sortedProcs := make([]messageProcessing, len(procs)) + for _, proc := range procs { + sortedProcs[proc.msgIndex] = proc + } + + // Verify each message starts after the previous one completes + for i := 1; i < len(sortedProcs); i++ { + prev := sortedProcs[i-1] + curr := sortedProcs[i] + + assert.False(t, curr.startTime.Before(prev.endTime), + "Session %s: message %d started at %v before message %d ended at %v (overlap detected)", + sessionID, curr.msgIndex, curr.startTime, prev.msgIndex, prev.endTime) + + // Also verify sequence numbers are strictly increasing + assert.Equal(t, prev.seqNum+1, curr.seqNum, + "Session %s: sequence numbers should be consecutive", sessionID) + } + } + // Verify concurrent handler limits assert.LessOrEqual(t, maxConcurrentHandlers.Load(), int32(maxConcurrentLimit), "concurrent handlers should not exceed configured maximum") @@ -297,5 +372,5 @@ func TestMultipleSessionsConcurrentHandler(t *testing.T) { } assert.True(t, hasInterleaving, - "global order must show session interleaving, proving concurrent processing") + "global order must show session interleaving, proving concurrent processing across sessions") } From 298ae264eb256e3dda2375fd61f91fd7fbed596a Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Mon, 17 Nov 2025 11:18:14 +0000 Subject: [PATCH 5/8] Expand comments Signed-off-by: Joni Collinge --- .../azure/servicebus/subscription.go | 12 +- .../servicebus/topics/servicebus_test.go | 103 ++++++++++++++++-- 2 files changed, 102 insertions(+), 13 deletions(-) diff --git a/common/component/azure/servicebus/subscription.go b/common/component/azure/servicebus/subscription.go index c8a1b4aa88..a08d4b0530 100644 --- a/common/component/azure/servicebus/subscription.go +++ b/common/component/azure/servicebus/subscription.go @@ -275,10 +275,14 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle // If we require sessions then we must process the message // synchronously to ensure the FIFO order is maintained. + // This is considered safe as even when using bulk receives, + // the messages are merged into a single request to the app + // containing multiple messages and thus it becomes an app + // concern to process them in order. if s.requireSessions { - s.handleAsync(ctx, msgs, handler, receiver) + s.handleMessages(ctx, msgs, handler, receiver) } else { - go s.handleAsync(ctx, msgs, handler, receiver) + go s.handleMessages(ctx, msgs, handler, receiver) } } } @@ -398,8 +402,8 @@ func (s *Subscription) doRenewLocksSession(ctx context.Context, sessionReceiver } } -// handleAsync handles messages from azure service bus and is meant to be called in a goroutine (go s.handleAsync). -func (s *Subscription) handleAsync(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) { +// handleMessages handles messages from azure service bus and can be called synchronously or asynchronously depending on order requirements. +func (s *Subscription) handleMessages(ctx context.Context, msgs []*azservicebus.ReceivedMessage, handler HandlerFn, receiver Receiver) { var ( consumeToken bool takenConcurrentHandler bool diff --git a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go index 3c93802095..e9703b5a97 100644 --- a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go @@ -19,6 +19,7 @@ import ( "regexp" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -1085,10 +1086,16 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { sessionWatcher := watcher.NewOrdered() + // Track active messages per session to ensure no parallel processing within a session + var ( + fifoMu sync.Mutex + fifoActivePerSess = make(map[string]int) + fifoParallelIssues atomic.Int32 + ) + // subscriber of the given topic subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { return func(ctx flow.Context, s common.Service) error { - // Setup the /orders event handler. return multierr.Combine( s.AddTopicEventHandler(&common.Subscription{ PubsubName: pubsubName, @@ -1099,9 +1106,31 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { "maxConcurrentSessions": "1", }, }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - // Track/Observe the data of the event. + // Extract session ID (if present) to track concurrency + var sessionID string + if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 { + sessionID = m[1] + } + + fifoMu.Lock() + active := fifoActivePerSess[sessionID] + if active > 0 { + fifoParallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, active) + } + fifoActivePerSess[sessionID] = active + 1 + fifoMu.Unlock() + + // Simulate handler work to widen potential overlap window + time.Sleep(20 * time.Millisecond) + messagesWatcher.Observe(e.Data) ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) + + fifoMu.Lock() + fifoActivePerSess[sessionID]-- + fifoMu.Unlock() + return false, nil }), ) @@ -1222,6 +1251,9 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { if !assert.Equal(t, ordered, observed) { t.Errorf("expected: %v, observed: %v", ordered, observed) } + + // Assert no parallel violations within the single session + assert.Equal(t, int32(0), fifoParallelIssues.Load(), "no parallel processing within a session expected") } return nil @@ -1266,8 +1298,10 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { sessionWatcher := watcher.NewUnordered() var ( - mu sync.Mutex - globalOrder []string // tracks session IDs in the order messages were received + mu sync.Mutex + globalOrder []string // tracks session IDs in the order messages were received + activePerSession = make(map[string]int) + parallelIssues atomic.Int32 ) subscriberApplicationWithSessions := func(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { @@ -1284,17 +1318,34 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { messagesWatcher.Observe(e.Data) - // Track session ID in global order + // Track session ID and enforce single in-flight per session match := sessionIDRegex.FindStringSubmatch(string(e.Data)) - if len(match) > 0 { - sessionID := match[1] + var sessionID string + if len(match) > 1 { + sessionID = match[1] mu.Lock() + inflight := activePerSession[sessionID] + if inflight > 0 { + parallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, inflight) + } + activePerSession[sessionID] = inflight + 1 globalOrder = append(globalOrder, sessionID) mu.Unlock() } + // Simulate work + time.Sleep(30 * time.Millisecond) + ctx.Logf("Message Received appID: %s, pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) + + if sessionID != "" { + mu.Lock() + activePerSession[sessionID]-- + mu.Unlock() + } + return false, nil }), ) @@ -1415,6 +1466,9 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { ctx.Logf("Successfully processed %d sessions concurrently with FIFO ordering maintained", len(sessionMessages)) + + // Assert no parallel violations within a single session + assert.Equal(t, int32(0), parallelIssues.Load(), "no parallel processing within a session expected") } } return nil @@ -1454,10 +1508,16 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { sessionWatcher := watcher.NewUnordered() + // Concurrency tracking for round-robin scenario + var ( + rrMu sync.Mutex + rrActivePerSess = make(map[string]int) + rrParallelIssues atomic.Int32 + ) + // subscriber of the given topic subscriberApplicationWithSessions := func(appID string, topicName string, messageWatcher *watcher.Watcher) app.SetupFn { return func(ctx flow.Context, s common.Service) error { - // Setup the /orders event handler. return multierr.Combine( s.AddTopicEventHandler(&common.Subscription{ PubsubName: pubsubName, @@ -1469,9 +1529,31 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { "sessionIdleTimeoutInSec": "2", // timeout and try another session }, }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { - // Track/Observe the data of the event. + // Extract session ID + var sessionID string + if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 { + sessionID = m[1] + } + + rrMu.Lock() + active := rrActivePerSess[sessionID] + if active > 0 { + rrParallelIssues.Add(1) + ctx.Logf("Session %s already has %d active messages", sessionID, active) + } + rrActivePerSess[sessionID] = active + 1 + rrMu.Unlock() + + // Simulate work + time.Sleep(15 * time.Millisecond) + messageWatcher.Observe(e.Data) ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data) + + rrMu.Lock() + rrActivePerSess[sessionID]-- + rrMu.Unlock() + return false, nil }), ) @@ -1529,6 +1611,9 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { m.Assert(ctx, 25*timeout) } + // Assert no parallel violations + assert.Equal(t, int32(0), rrParallelIssues.Load(), "no parallel processing within a session expected") + return nil } } From 07a0fd168bb90e9d0e4eebe2c843ace8ed2e6d5f Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Mon, 17 Nov 2025 12:16:48 +0000 Subject: [PATCH 6/8] Fix string conversion Signed-off-by: Joni Collinge --- .../pubsub/azure/servicebus/topics/servicebus_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go index e9703b5a97..2fbdca1a7b 100644 --- a/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go +++ b/tests/certification/pubsub/azure/servicebus/topics/servicebus_test.go @@ -1108,7 +1108,7 @@ func TestServicebusWithSessionsFIFO(t *testing.T) { }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { // Extract session ID (if present) to track concurrency var sessionID string - if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 { + if m := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)); len(m) > 1 { sessionID = m[1] } @@ -1319,7 +1319,7 @@ func TestServicebusWithConcurrentSessionsFIFO(t *testing.T) { messagesWatcher.Observe(e.Data) // Track session ID and enforce single in-flight per session - match := sessionIDRegex.FindStringSubmatch(string(e.Data)) + match := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)) var sessionID string if len(match) > 1 { sessionID = match[1] @@ -1531,7 +1531,7 @@ func TestServicebusWithSessionsRoundRobin(t *testing.T) { }, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) { // Extract session ID var sessionID string - if m := sessionIDRegex.FindStringSubmatch(string(e.Data)); len(m) > 1 { + if m := sessionIDRegex.FindStringSubmatch(fmt.Sprintf("%s", e.Data)); len(m) > 1 { sessionID = m[1] } From de6d78aa5c669ab6e7f1bcb957cda2e3a8422ab8 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Mon, 17 Nov 2025 12:35:48 +0000 Subject: [PATCH 7/8] Skip other tests Signed-off-by: Joni Collinge --- .github/scripts/test-info.mjs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index 18eac1c187..3e683cded2 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -885,6 +885,10 @@ function GenerateMatrix(testKind, enableCloudTests) { continue } + if (name !== 'pubsub.azure.servicebus.topics') { + continue + } + // Skip cloud-only tests if enableCloudTests is false if (!enableCloudTests) { if ( From ac25ba6f35a206e59e78024bd3a85cbe4904279d Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Mon, 17 Nov 2025 12:57:08 +0000 Subject: [PATCH 8/8] Remove skip other tests Signed-off-by: Joni Collinge --- .github/scripts/test-info.mjs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index 3e683cded2..18eac1c187 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -885,10 +885,6 @@ function GenerateMatrix(testKind, enableCloudTests) { continue } - if (name !== 'pubsub.azure.servicebus.topics') { - continue - } - // Skip cloud-only tests if enableCloudTests is false if (!enableCloudTests) { if (