From eb19c199b7a0086c185939b79fc0dd9a60aad52a Mon Sep 17 00:00:00 2001 From: Zhenghui Yan Date: Mon, 31 Mar 2025 14:16:51 +0800 Subject: [PATCH 1/3] Support distributed tracing in server sdk. --- .../HubHost/ServiceLifetimeManager.cs | 53 ++++++-- .../HubHost/ServiceLifetimeManagerBase.cs | 126 +++++++++++++++--- .../Internals/SignalRServerActivitySource.cs | 13 ++ .../ServiceLifetimeManagerFacts.cs | 88 ++++++++++++ 4 files changed, 249 insertions(+), 31 deletions(-) create mode 100644 src/Microsoft.Azure.SignalR/Internals/SignalRServerActivitySource.cs diff --git a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs index 442836021..2ccd4cdf5 100644 --- a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs +++ b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs @@ -1,8 +1,9 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -88,12 +89,15 @@ public override async Task SendConnectionAsync(string connectionId, string metho if (_clientConnectionManager.TryGetClientConnection(connectionId, out var clientConnection)) { - var message = CreateMessage(connectionId, methodName, args, clientConnection); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = CreateMessage(connectionId, methodName, args, clientConnection, activity); var messageWithTracingId = (IMessageWithTracingId)message; try { // Write directly to this connection await clientConnection.ServiceConnection.WriteAsync(message); + activity?.Stop(); if (messageWithTracingId.TracingId != null) { @@ -102,11 +106,26 @@ public override async Task SendConnectionAsync(string connectionId, string metho } catch (ServiceConnectionNotActiveException) { - // Fallback to send message through other server connections - // Although in current design the server connection drop leads to routed client connection drops - // The message thrown here is misleading to the customer - // Also sending the message back provides the support when later we support client connection migration - await WriteAsync(message); + try + { + // Fallback to send message through other server connections + // Although in current design the server connection drop leads to routed client connection drops + // The message thrown here is misleading to the customer + // Also sending the message back provides the support when later we support client connection migration + await WriteAsync(message); + activity?.Stop(); + } + catch (Exception ex) + { + if (activity is not null) + { + activity.SetStatus(ActivityStatusCode.Error); + activity.SetTag("error.type", ex.GetType().FullName); + activity.Stop(); + } + + throw; + } } } else @@ -129,18 +148,28 @@ public override async Task InvokeConnectionAsync(string connectionId, stri } var invocationId = _clientInvocationManager.Caller.GenerateInvocationId(connectionId); - var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, invocationId))); + var activity = CreateActivity(methodName); + var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, activity, invocationId))); await WriteAsync(message); var task = _clientInvocationManager.Caller.AddInvocation(_hub, connectionId, invocationId, cancellationToken); // Exception handling follows https://source.dot.net/#Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs,349 try { - return await task; + activity?.Start(); + var result = await task; + activity?.Stop(); + return result; } - catch + catch (Exception ex) { _clientInvocationManager.Caller.RemoveInvocation(invocationId); + if (activity is not null) + { + activity.SetStatus(ActivityStatusCode.Error); + activity.SetTag("error.type", ex.GetType().FullName); + activity.Stop(); + } throw; } } @@ -186,7 +215,7 @@ public override bool TryGetReturnType(string invocationId, [System.Diagnostics.C } #endif - private MultiConnectionDataMessage CreateMessage(string connectionId, string methodName, object[] args, IClientConnection clientConnection) + private MultiConnectionDataMessage CreateMessage(string connectionId, string methodName, object[] args, IClientConnection clientConnection, Activity activity) { IDictionary> payloads; if (clientConnection.HubProtocol != null) @@ -198,7 +227,7 @@ private MultiConnectionDataMessage CreateMessage(string connectionId, string met } else { - payloads = SerializeAllProtocols(methodName, args); + payloads = SerializeAllProtocols(methodName, args, activity); } // don't use ConnectionDataMessage here, since handshake message is also wrapped into ConnectionDataMessage. diff --git a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs index 0255c9bd2..ae06c6146 100644 --- a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs +++ b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs @@ -3,11 +3,13 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Protocol; +using Microsoft.Azure.SignalR.Internals; using Microsoft.Azure.SignalR.Protocol; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,18 +18,25 @@ namespace Microsoft.Azure.SignalR; internal abstract class ServiceLifetimeManagerBase : HubLifetimeManager where THub : Hub { + internal const string ActivityName = "Microsoft.Azure.SignalR.Server.InvocationOut"; + protected const string NullOrEmptyStringErrorMessage = "Argument cannot be null or empty."; protected const string TtlOutOfRangeErrorMessage = "Ttl cannot be less than 0."; protected readonly IServiceConnectionManager ServiceConnectionContainer; + protected ILogger Logger { get; set; } private readonly DefaultHubMessageSerializer _messageSerializer; + private readonly ActivitySource _activitySource; + private readonly string _serviceName; public ServiceLifetimeManagerBase(IServiceConnectionManager serviceConnectionManager, IHubProtocolResolver protocolResolver, IOptions globalHubOptions, IOptions> hubOptions, ILogger logger) { Logger = logger ?? throw new ArgumentNullException(nameof(logger)); ServiceConnectionContainer = serviceConnectionManager; _messageSerializer = new DefaultHubMessageSerializer(protocolResolver, globalHubOptions.Value.SupportedProtocols, hubOptions.Value.SupportedProtocols); + _activitySource = SignalRServerActivitySource.Instance.ActivitySource; + _serviceName = typeof(THub).Name; } public override Task OnConnectedAsync(HubConnectionContext connection) @@ -47,12 +56,14 @@ public override Task SendAllAsync(string methodName, object[] args, Cancellation throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new BroadcastDataMessage(null, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new BroadcastDataMessage(null, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToBroadcastMessage(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendAllExceptAsync(string methodName, object[] args, IReadOnlyList excludedIds, CancellationToken cancellationToken = default) @@ -62,12 +73,14 @@ public override Task SendAllExceptAsync(string methodName, object[] args, IReadO throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new BroadcastDataMessage(excludedIds, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new BroadcastDataMessage(excludedIds, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToBroadcastMessage(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendConnectionAsync(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default) @@ -82,12 +95,14 @@ public override Task SendConnectionAsync(string connectionId, string methodName, throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new MultiConnectionDataMessage(new[] { connectionId }, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new MultiConnectionDataMessage(new[] { connectionId }, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToSendMessageToConnections(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendConnectionsAsync(IReadOnlyList connectionIds, string methodName, object[] args, CancellationToken cancellationToken = default) @@ -107,12 +122,14 @@ public override Task SendConnectionsAsync(IReadOnlyList connectionIds, s return Task.CompletedTask; } - var message = AppendMessageTracingId(new MultiConnectionDataMessage(connectionIds, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new MultiConnectionDataMessage(connectionIds, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToSendMessageToConnections(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendGroupAsync(string groupName, string methodName, object[] args, CancellationToken cancellationToken = default) @@ -127,12 +144,14 @@ public override Task SendGroupAsync(string groupName, string methodName, object[ throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new GroupBroadcastDataMessage(groupName, null, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new GroupBroadcastDataMessage(groupName, null, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToBroadcastMessageToGroup(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendGroupsAsync(IReadOnlyList groupNames, string methodName, object[] args, CancellationToken cancellationToken = default) @@ -152,14 +171,16 @@ public override Task SendGroupsAsync(IReadOnlyList groupNames, string me return Task.CompletedTask; } - var message = AppendMessageTracingId(new MultiGroupBroadcastDataMessage(groupNames, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new MultiGroupBroadcastDataMessage(groupNames, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToBroadcastMessageToGroups(Logger, message); } // Send this message from a random service connection because this message involves of multiple groups. // Unless we send message for each group one by one, we can not guarantee the message order for all groups. - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendGroupExceptAsync(string groupName, string methodName, object[] args, IReadOnlyList excludedIds, CancellationToken cancellationToken = default) @@ -174,12 +195,14 @@ public override Task SendGroupExceptAsync(string groupName, string methodName, o throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new GroupBroadcastDataMessage(groupName, excludedIds, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new GroupBroadcastDataMessage(groupName, excludedIds, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToBroadcastMessageToGroup(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendUserAsync(string userId, string methodName, object[] args, CancellationToken cancellationToken = default) @@ -194,12 +217,14 @@ public override Task SendUserAsync(string userId, string methodName, object[] ar throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(methodName)); } - var message = AppendMessageTracingId(new UserDataMessage(userId, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new UserDataMessage(userId, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToSendMessageToUser(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task SendUsersAsync(IReadOnlyList userIds, string methodName, object[] args, @@ -220,12 +245,14 @@ public override Task SendUsersAsync(IReadOnlyList userIds, string method return Task.CompletedTask; } - var message = AppendMessageTracingId(new MultiUserDataMessage(userIds, SerializeAllProtocols(methodName, args))); + var activity = CreateActivity(methodName); + activity?.Start(); + var message = AppendMessageTracingId(new MultiUserDataMessage(userIds, SerializeAllProtocols(methodName, args, activity))); if (message.TracingId != null) { MessageLog.StartToSendMessageToUsers(Logger, message); } - return WriteAsync(message); + return WriteAsync(message, activity); } public override Task AddToGroupAsync(string connectionId, string groupName, CancellationToken cancellationToken = default) @@ -268,6 +295,26 @@ public override Task RemoveFromGroupAsync(string connectionId, string groupName, return WriteAckableMessageAsync(message, cancellationToken); } + protected async Task WriteAsync(T message, Activity activity) where T : ServiceMessage, IMessageWithTracingId + { + if (activity is not null) + { + try + { + await WriteAsync(message); + activity.Stop(); + } + catch (Exception ex) + { + activity.SetStatus(ActivityStatusCode.Error); + activity.SetTag("error.type", ex.GetType().FullName); + activity.Stop(); + + throw; + } + } + } + protected Task WriteAsync(T message) where T : ServiceMessage, IMessageWithTracingId => WriteCoreAsync(message, m => ServiceConnectionContainer.WriteAsync(message)); @@ -284,7 +331,7 @@ protected static bool IsInvalidArgument(IReadOnlyList list) return list == null; } - protected IDictionary> SerializeAllProtocols(string method, object[] args, string invocationId = null) + protected IDictionary> SerializeAllProtocols(string method, object[] args, Activity activity, string invocationId = null) { InvocationMessage message; if (invocationId == null) @@ -295,6 +342,10 @@ protected IDictionary> SerializeAllProtocols(string { message = new InvocationMessage(invocationId, method, args); } + if (activity is not null) + { + InjectHeaders(activity, message); + } var serializedHubMessages = _messageSerializer.SerializeMessage(message); var payloads = new ArrayDictionary>(serializedHubMessages.Count); foreach (var serializedMessage in serializedHubMessages) @@ -304,6 +355,43 @@ protected IDictionary> SerializeAllProtocols(string return payloads; } + protected Activity CreateActivity(string methodName) + { + var activity = _activitySource.CreateActivity(ActivityName, ActivityKind.Client); + if (activity is null && Activity.Current is not null && Logger.IsEnabled(LogLevel.Critical)) + { + activity = new Activity(ActivityName); + } + + if (activity is not null) + { + if (!string.IsNullOrEmpty(_serviceName)) + { + activity.DisplayName = $"{_serviceName}/{methodName}"; + activity.SetTag("rpc.service", _serviceName); + } + else + { + activity.DisplayName = methodName; + } + + activity.SetTag("rpc.system", "signalr"); + activity.SetTag("rpc.method", methodName); + } + + return activity; + } + + private static void InjectHeaders(Activity activity, HubInvocationMessage invocationMessage) + { + DistributedContextPropagator.Current.Inject(activity, invocationMessage, static (carrier, key, value) => + { + var invocationMessage = (HubInvocationMessage)carrier; + invocationMessage.Headers ??= new Dictionary(); + invocationMessage.Headers[key] = value; + }); + } + protected IDictionary> SerializeAllProtocols(HubMessage message) { var serializedHubMessages = _messageSerializer.SerializeMessage(message); diff --git a/src/Microsoft.Azure.SignalR/Internals/SignalRServerActivitySource.cs b/src/Microsoft.Azure.SignalR/Internals/SignalRServerActivitySource.cs new file mode 100644 index 000000000..511c646c1 --- /dev/null +++ b/src/Microsoft.Azure.SignalR/Internals/SignalRServerActivitySource.cs @@ -0,0 +1,13 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Diagnostics; + +namespace Microsoft.Azure.SignalR.Internals; + +internal sealed class SignalRServerActivitySource +{ + public static readonly SignalRServerActivitySource Instance = new(); + + public ActivitySource ActivitySource { get; } = new ActivitySource("Microsoft.Azure.SignalR.Server"); +} diff --git a/test/Microsoft.Azure.SignalR.Tests/ServiceLifetimeManagerFacts.cs b/test/Microsoft.Azure.SignalR.Tests/ServiceLifetimeManagerFacts.cs index b5216242c..a9c81b4a5 100644 --- a/test/Microsoft.Azure.SignalR.Tests/ServiceLifetimeManagerFacts.cs +++ b/test/Microsoft.Azure.SignalR.Tests/ServiceLifetimeManagerFacts.cs @@ -4,12 +4,14 @@ using System; using System.Buffers; using System.Collections.Generic; +using System.Diagnostics; using System.Security.Claims; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Internal; +using Microsoft.Azure.SignalR.Internals; using Microsoft.Azure.SignalR.Protocol; using Microsoft.Azure.SignalR.Tests.Common; using Microsoft.Extensions.Logging; @@ -263,6 +265,83 @@ public async Task DoNotSetUserIdWithoutFeatureTest() Assert.Null(hubConnectionContext.Features.Get()); } + [Theory] + [InlineData("SendAllAsync", typeof(BroadcastDataMessage))] + [InlineData("SendAllExceptAsync", typeof(BroadcastDataMessage))] + [InlineData("SendConnectionAsync", typeof(MultiConnectionDataMessage))] + [InlineData("SendConnectionsAsync", typeof(MultiConnectionDataMessage))] + [InlineData("SendGroupAsync", typeof(GroupBroadcastDataMessage))] + [InlineData("SendGroupsAsync", typeof(MultiGroupBroadcastDataMessage))] + [InlineData("SendGroupExceptAsync", typeof(GroupBroadcastDataMessage))] + [InlineData("SendUserAsync", typeof(UserDataMessage))] + [InlineData("SendUsersAsync", typeof(MultiUserDataMessage))] + public async Task ServiceLifetimeManagerTraceTest(string methodName, Type messageType) + { + var proxy = new ServiceConnectionProxy(); + var blazorDetector = new DefaultBlazorDetector(); + + var serviceConnectionManager = new ServiceConnectionManager(); + serviceConnectionManager.SetServiceConnection(proxy.ServiceConnectionContainer); + var clientInvocationManager = new DefaultClientInvocationManager(); + + var serverSource = SignalRServerActivitySource.Instance.ActivitySource; + + using var listener = new ActivityListener + { + ShouldListenTo = activitySource => ReferenceEquals(activitySource, serverSource), + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + ActivityStarted = _ => { } + }; + ActivitySource.AddActivityListener(listener); + + var serviceLifetimeManager = new ServiceLifetimeManager(serviceConnectionManager, + proxy.ClientConnectionManager, HubProtocolResolver, Logger, Marker, GlobalHubOptions, LocalHubOptions, blazorDetector, new DefaultServerNameProvider(), clientInvocationManager); + + var serverTask = proxy.WaitForServerConnectionAsync(1); + _ = proxy.StartAsync(); + await proxy.WaitForServerConnectionsInited().OrTimeout(); + await serverTask.OrTimeout(); + + var task = proxy.WaitForApplicationMessageAsync(messageType); + + var invokeTask = InvokeMethod(serviceLifetimeManager, methodName); + + var message = await task.OrTimeout(); + + if (typeof(IAckableMessage).IsAssignableFrom(messageType)) + { + var ackId = (message as IAckableMessage).AckId; + Assert.NotEqual(0, ackId); + await proxy.WriteMessageAsync(new AckMessage(ackId, (int)AckStatus.Ok)); + } + + // Need to return in time, or it indicate a timeout when sending ack-able messages. + await invokeTask.OrTimeout(); + + VerifyServiceMessage(methodName, message); + + var multicastDataMessage = Assert.IsAssignableFrom(message); + Assert.Equal(2, multicastDataMessage.Payloads.Count); + + Assert.True(multicastDataMessage.Payloads.ContainsKey("json")); + ReadOnlySequence jsonPayload = new(multicastDataMessage.Payloads["json"]); + new SignalRProtocol.JsonHubProtocol().TryParseMessage(ref jsonPayload, new TestHubInvocationBinder(), out var jsonMessage); + var jsonInvocationMessage = Assert.IsAssignableFrom(jsonMessage); + Assert.NotEmpty(jsonInvocationMessage.Headers); + Assert.Contains(jsonInvocationMessage.Headers, h => h.Key == "traceparent"); + Assert.NotEmpty(jsonInvocationMessage.Headers["traceparent"]); + + Assert.True(multicastDataMessage.Payloads.ContainsKey("messagepack")); + ReadOnlySequence messagePackPayload = new(multicastDataMessage.Payloads["messagepack"]); + new SignalRProtocol.MessagePackHubProtocol().TryParseMessage(ref messagePackPayload, new TestHubInvocationBinder(), out var messagePackMessage); + var messagePackInvocationMessage = Assert.IsAssignableFrom(messagePackMessage); + Assert.NotEmpty(messagePackInvocationMessage.Headers); + Assert.Contains(messagePackInvocationMessage.Headers, h => h.Key == "traceparent"); + Assert.NotEmpty(messagePackInvocationMessage.Headers["traceparent"]); + + Assert.Equal(jsonInvocationMessage.Headers["traceparent"], messagePackInvocationMessage.Headers["traceparent"]); + } + private static async Task InvokeMethod(HubLifetimeManager serviceLifetimeManager, string methodName) { switch (methodName) @@ -422,4 +501,13 @@ public void WriteMessage(SignalRProtocol.HubMessage message, IBufferWriter throw new NotImplementedException(); } } + + private sealed class TestHubInvocationBinder : IInvocationBinder + { + public IReadOnlyList GetParameterTypes(string methodName) => new[] { typeof(string) }; + + public Type GetReturnType(string invocationId) => typeof(void); + + public Type GetStreamItemType(string streamId) => typeof(int); + } } From e65897ab004711e949e2609538a8004366897484 Mon Sep 17 00:00:00 2001 From: Zhenghui Yan Date: Mon, 7 Apr 2025 11:06:46 +0800 Subject: [PATCH 2/3] fix build --- .../WebsocketsHubLifetimeManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.SignalR.Management/WebsocketsHubLifetimeManager.cs b/src/Microsoft.Azure.SignalR.Management/WebsocketsHubLifetimeManager.cs index 8fa74bde2..f006ac6ef 100644 --- a/src/Microsoft.Azure.SignalR.Management/WebsocketsHubLifetimeManager.cs +++ b/src/Microsoft.Azure.SignalR.Management/WebsocketsHubLifetimeManager.cs @@ -274,7 +274,7 @@ public override async Task InvokeConnectionAsync(string connectionId, stri var cancellationTokenInUse = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken).Token; var invocationId = _clientInvocationManager.Caller.GenerateInvocationId(connectionId); - var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, invocationId))); + var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, null, invocationId))); await WriteAsync(message); var task = _clientInvocationManager.Caller.AddInvocation(_hub, connectionId, invocationId, cancellationTokenInUse); From be052c9b449de21722b0ea8676bf2d7a84bd3831 Mon Sep 17 00:00:00 2001 From: Zhenghui Yan Date: Mon, 7 Apr 2025 11:24:41 +0800 Subject: [PATCH 3/3] fix bug. --- .../HubHost/ServiceLifetimeManagerBase.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs index ae06c6146..6ca08c5c7 100644 --- a/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs +++ b/src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs @@ -313,6 +313,10 @@ protected async Task WriteAsync(T message, Activity activity) where T : Servi throw; } } + else + { + await WriteAsync(message); + } } protected Task WriteAsync(T message) where T : ServiceMessage, IMessageWithTracingId =>