Skip to content

Deduplicate group messages across connections #61810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/SignalR/server/Core/src/DefaultHubLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
{
// Each task represents the list of tasks for each of the writes within a group
List<Task>? tasks = null;
SerializedHubMessage? message = null;
HashSet<string>? connections = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on how much we want to put into perf / allocations a few thoughts:

  • cache the HashSet? See the previous comment above for an simple approach -- but this is also no silver bullet, as it has (in the long term) an Gen2 -> Gen0 reference
  • HashSet is O(1), but still a bit costly due to hash computation, bucket lookup, etc. So for a small amount of connections a simple array with a linear scan (O(n)) could be faster -- the logic could be encapsulated in a new type "FastHashSet" which starts with the array-approach and switches to the HashSet when more items are present

Well, if that's a goal this can be done in a separate PR for sure.


foreach (var groupName in groupNames)
{
Expand All @@ -238,7 +238,26 @@ public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string me
var group = _groups[groupName];
if (group != null)
{
DefaultHubLifetimeManager<THub>.SendToGroupConnections(methodName, args, group, null, null, ref tasks, ref message, cancellationToken);
foreach (var connection in group)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
foreach (var connection in group)
foreach (var (connectionId, _) in group)

{
if (connections == null)
{
connections = new HashSet<string>();
}
connections.Add(connection.Key);
Comment on lines +243 to +247
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this could be written also as 1-liner.

Suggested change
if (connections == null)
{
connections = new HashSet<string>();
}
connections.Add(connection.Key);
(connections ?? []).Add(connection.Key);

For me it's readability is the same.


In L239 can you also check if the group has items?
If so, the check for connections == null can be removed altogehter, and initialize the HashSet just before the foreach-loop in L241, because if the group has items, then Add is called unconditionally.

The HashSet could then be also initialized with the proper capacity, to avoid potential resizing and copying of the internal structures of HashSet.

}
}
}

if (connections != null)
{
foreach (var connectionId in connections)
{
if (tasks == null)
{
tasks = new List<Task>();
}
Comment on lines +256 to +259
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashSet has a Count property, so initialize the tasks list to proper size before the loop, thus avoiding this if.

tasks.Add(SendConnectionAsync(connectionId, methodName, args, cancellationToken));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to SendConnectionAsync reserializes the method invocation payload for each connection.

// We're sending to a single connection
// Write message directly to connection without caching it in memory
var message = CreateInvocationMessage(methodName, args);

Prior to this change, SendToGroupConnections would call CreateSerializedInvocationMessage once, send the same serialized invocation payload to each client. I suspect that will have a far bigger performance impact than the HashSet, although I'd also like to pool that if we move forward with this change.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,23 @@ public async Task SendUsersAsyncWillCancelWithToken()
Assert.False(connection1.ConnectionAborted.IsCancellationRequested);
}
}

[Fact]
public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce()
{
using (var client1 = new TestClient())
{
var manager = CreateNewHubLifetimeManager();
var connection1 = HubConnectionContextUtils.Create(client1.Connection);
await manager.OnConnectedAsync(connection1).DefaultTimeout();
await manager.AddToGroupAsync(connection1.ConnectionId, "group1").DefaultTimeout();
await manager.AddToGroupAsync(connection1.ConnectionId, "group2").DefaultTimeout();
await manager.SendGroupsAsync(new List<string> { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout();
var message = Assert.IsType<InvocationMessage>(client1.TryRead());
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
Assert.Null(client1.TryRead());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,14 @@ public async Task RemoveSubscriptionAsync(string id, HubConnectionContext connec
_lock.Release();
}
}

public HubConnectionStore? GetStore(string id)
{
if (_subscriptions.TryGetValue(id, out var store))
{
return store;
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,38 @@ public override Task SendConnectionsAsync(IReadOnlyList<string> connectionIds, s
}

/// <inheritdoc />
public override Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default)
public override async Task SendGroupsAsync(IReadOnlyList<string> groupNames, string methodName, object?[] args, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(groupNames);
var publishTasks = new List<Task>(groupNames.Count);
var payload = _protocol.WriteInvocation(methodName, args);
HashSet<string>? connections = null;

foreach (var groupName in groupNames)
{
if (!string.IsNullOrEmpty(groupName))
{
publishTasks.Add(PublishAsync(_channels.Group(groupName), payload));
var groupChannel = _channels.Group(groupName);
if (groupChannel != null)
{
var connectionStore = _groups.GetStore(groupChannel);
if (connectionStore != null)
{
foreach (var connection in connectionStore)
{
if (connections == null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can similar optimizations as above be applied here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to the ToList() call below. I wonder if we shouldn't be using a pooled Span instead.

{
connections = new HashSet<string>();
}
connections.Add(connection.ConnectionId);
}
}
}
}
}

return Task.WhenAll(publishTasks);
if (connections != null)
{
await SendConnectionsAsync(connections.ToList(), methodName, args, cancellationToken);
}
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,30 @@ public async Task PatternGroupAndUser()
}
}

[Fact]
public async Task ConnectionInMultipleGroups_ReceivesMessageOnlyOnce()
{
var server = new TestRedisServer();

using (var client = new TestClient())
{
var manager = CreateLifetimeManager(server);
var connection = HubConnectionContextUtils.Create(client.Connection);

await manager.OnConnectedAsync(connection).DefaultTimeout();
await manager.AddToGroupAsync(connection.ConnectionId, "group1").DefaultTimeout();
await manager.AddToGroupAsync(connection.ConnectionId, "group2").DefaultTimeout();

await manager.SendGroupsAsync(new[] { "group1", "group2" }, "Hello", new object[] { "World" }).DefaultTimeout();

var message = Assert.IsType<InvocationMessage>(await client.ReadAsync().DefaultTimeout());
Assert.Equal("Hello", message.Target);
Assert.Single(message.Arguments);
Assert.Equal("World", (string)message.Arguments[0]);
Assert.Null(client.TryRead());
}
}

public override TestRedisServer CreateBackplane()
{
return new TestRedisServer();
Expand Down
Loading