Skip to content

Commit c393463

Browse files
committed
deduplicate group messages across connections
1 parent d12915f commit c393463

File tree

5 files changed

+96
-7
lines changed

5 files changed

+96
-7
lines changed

src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
226226
{
227227
// Each task represents the list of tasks for each of the writes within a group
228228
List<Task>? tasks = null;
229-
SerializedHubMessage? message = null;
229+
HashSet<string>? connections = null;
230230

231231
foreach (var groupName in groupNames)
232232
{
@@ -238,7 +238,26 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
238238
var group = _groups[groupName];
239239
if (group != null)
240240
{
241-
DefaultHubLifetimeManager<THub>.SendToGroupConnections(methodName, args, group, null, null, ref tasks, ref message, cancellationToken);
241+
foreach (var connection in group)
242+
{
243+
if (connections == null)
244+
{
245+
connections = new HashSet<string>();
246+
}
247+
connections.Add(connection.Key);
248+
}
249+
}
250+
}
251+
252+
if (connections != null)
253+
{
254+
foreach (var connectionId in connections)
255+
{
256+
if (tasks == null)
257+
{
258+
tasks = new List<Task>();
259+
}
260+
tasks.Add(SendConnectionAsync(connectionId, methodName, args, cancellationToken));
242261
}
243262
}
244263

src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests/DefaultHubLifetimeManagerTests.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,23 @@ public async Task SendUsersAsyncWillCancelWithToken()
255255
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
256256
}
257257
}
258+
259+
[Fact]
260+
public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce()
261+
{
262+
using (var client1 = new TestClient())
263+
{
264+
var manager = CreateNewHubLifetimeManager();
265+
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
266+
await manager.OnConnectedAsync(connection1).DefaultTimeout();
267+
await manager.AddToGroupAsync(connection1.ConnectionId, "group1").DefaultTimeout();
268+
await manager.AddToGroupAsync(connection1.ConnectionId, "group2").DefaultTimeout();
269+
await manager.SendGroupsAsync(new List<string> { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout();
270+
var message = Assert.IsType<InvocationMessage>(client1.TryRead());
271+
Assert.Equal("Hello", message.Target);
272+
Assert.Single(message.Arguments);
273+
Assert.Equal("World", (string)message.Arguments[0]);
274+
Assert.Null(client1.TryRead());
275+
}
276+
}
258277
}

src/SignalR/server/StackExchangeRedis/src/Internal/RedisSubscriptionManager.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,14 @@ public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connec
6363
_lock.Release();
6464
}
6565
}
66+
67+
public HubConnectionStore? GetStore(string id)
68+
{
69+
if (_subscriptions.TryGetValue(id, out var store))
70+
{
71+
return store;
72+
}
73+
74+
return null;
75+
}
6676
}

src/SignalR/server/StackExchangeRedis/src/RedisHubLifetimeManager.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,21 +254,38 @@ public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, s
254254
}
255255

256256
/// <inheritdoc />
257-
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default)
257+
public override async Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default)
258258
{
259259
ArgumentNullException.ThrowIfNull(groupNames);
260-
var publishTasks = new List<Task>(groupNames.Count);
261-
var payload = _protocol.WriteInvocation(methodName, args);
260+
HashSet<string>? connections = null;
262261

263262
foreach (var groupName in groupNames)
264263
{
265264
if (!string.IsNullOrEmpty(groupName))
266265
{
267-
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
266+
var groupChannel = _channels.Group(groupName);
267+
if (groupChannel != null)
268+
{
269+
var connectionStore = _groups.GetStore(groupChannel);
270+
if (connectionStore != null)
271+
{
272+
foreach (var connection in connectionStore)
273+
{
274+
if (connections == null)
275+
{
276+
connections = new HashSet<string>();
277+
}
278+
connections.Add(connection.ConnectionId);
279+
}
280+
}
281+
}
268282
}
269283
}
270284

271-
return Task.WhenAll(publishTasks);
285+
if (connections != null)
286+
{
287+
await SendConnectionsAsync(connections.ToList(), methodName, args, cancellationToken);
288+
}
272289
}
273290

274291
/// <inheritdoc />

src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,30 @@ public async Task PatternGroupAndUser()
145145
}
146146
}
147147

148+
[Fact]
149+
public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce()
150+
{
151+
var server = new TestRedisServer();
152+
153+
using (var client = new TestClient())
154+
{
155+
var manager = CreateLifetimeManager(server);
156+
var connection = HubConnectionContextUtils.Create(client.Connection);
157+
158+
await manager.OnConnectedAsync(connection).DefaultTimeout();
159+
await manager.AddToGroupAsync(connection.ConnectionId, "group1").DefaultTimeout();
160+
await manager.AddToGroupAsync(connection.ConnectionId, "group2").DefaultTimeout();
161+
162+
await manager.SendGroupsAsync(new[] { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout();
163+
164+
var message = Assert.IsType<InvocationMessage>(await client.ReadAsync().DefaultTimeout());
165+
Assert.Equal("Hello", message.Target);
166+
Assert.Single(message.Arguments);
167+
Assert.Equal("World", (string)message.Arguments[0]);
168+
Assert.Null(client.TryRead());
169+
}
170+
}
171+
148172
public override TestRedisServer CreateBackplane()
149173
{
150174
return new TestRedisServer();

0 commit comments

Comments
 (0)