Skip to content

Commit

Permalink
Feature/support for multiple hub types (OrleansContrib#9)
Browse files Browse the repository at this point in the history
* refactor(OrleansHubLifetimeManager): refactor `if statement` in `ProcessAllMessage`

* feat(user): implement user group

* user:  update `UserGroupPrefix`

* refactor(User): extracted `GroupGrain` logic to  `ConnectionGroupGrain`. `UserGrain` and `GroupGrain`  inherits from `ConnectionGroupGrain`.

* user: solutionfile

* all: switch grain states as internal

* fix(ConnectionGroup): `RemoveMember` wont throw an exception if the connection doesnt exists

* test(InvokeConnectionAsync): added test to ensure an exception thrown if OnConnect is not called

* test(InvokeConnectionAsync): fix failing test

* removed unused code

* feat(all): add support for multiple hubtypes + add `HubContext`

* connectiongroup: fix subscription leak

* refactor(all): `GetGrain` usages with extensions

* chore(all): rename properties for readability purposes

* docs(readme): update `Hub Context` section

* readme: fixed paket command

* fix(stream providers): fix stream issues between multiple servers

* HubContext: renamed property

* tests(OrleansHubLifetimeManager): added test for specific hub type messages + update test method names convention

* chore(logging): added basic logging

* feat(disconnected users): stream per connectionId

* refactor(ConnectionGrain): rename `ConnectionGroupGrain` to `ConnectionGrain`

* push minor change

* removed serialization attribute

* chore(serialization): removed all serialization
  • Loading branch information
claylaut authored and galvesribeiro committed Nov 21, 2017
1 parent 8baec1f commit 1306b2a
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 177 deletions.
50 changes: 37 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
**SignalR.Orleans** is a package that allow us to enhance the _real-time_ capabilities of SignalR by leveraging Orleans distributed cloud platform capabilities.


Installation
============
# Installation

Installation is performed via [NuGet](https://www.nuget.org/packages/SignalR.Orleans/)

Expand All @@ -27,29 +26,33 @@ From Package Manager:

> \# dotnet add package SignalR.Orleans --version 1.0.0-preview-1
Packet:
Paket:

> \# paket add SignalR.Orleans --version 1.0.0-preview-1
Code Examples
=============
# Configuration

## Silo
First we need to have an Orleans cluster up and running.

```c#
var siloConfg = ClusterConfiguration.LocalhostPrimarySilo().AddSignalR();
```cs
var siloConfig = ClusterConfiguration.LocalhostPrimarySilo()
.AddSignalR();

var silo = new SiloHostBuilder()
.UseConfiguration(siloConfg)
.UseConfiguration(siloConfig)
.UseSignalR()
.Build();
await silo.StartAsync();
```

## Client
Now your SignalR aplication needs to connect to the Orleans Cluster by using an Orleans Client.

```c#
```cs
var clientConfig = ClientConfiguration.LocalhostSilo()
.AddSignalR();
.AddSignalR();

var client = new ClientBuilder()
.UseConfiguration(clientConfig)
.UseSignalR()
Expand All @@ -59,7 +62,7 @@ await client.Connect();

Somewhere in your `Startup.cs`:

```c#
```cs
public void ConfigureServices(IServiceCollection services)
{
...
Expand All @@ -69,7 +72,28 @@ public void ConfigureServices(IServiceCollection services)
...
}
```

Great! Now you have an Orleans backplane built in Orleans!

PRs and feedback is **very** welcome!
# Features
## Hub Context
`HubContext` gives you the ability to communicate with the client from orleans grains (outside the hub).

Sample usage: Receiving server push notifications from message brokers, web hooks, etc. Ideally first update your grain state and then push signalr message to the client.

### Example:
```cs
public class UserNotificationGrain : Grain<UserNotificationState>, IUserNotificationGrain
{
private HubContext<IUserNotificationHub> _hubContext;

public override async Task OnActivateAsync()
{
_hubContext = GrainFactory.GetHub<IUserNotificationHub>();
// some code...
await _hubContext.User(this.GetPrimaryKeyString()).SendSignalRMessage("Broadcast", State.UserNotification);
}
}
```

# Contributions
PRs and feedback are **very** welcome!
7 changes: 6 additions & 1 deletion SignalR.Orleans.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2008
VisualStudioVersion = 15.0.27004.2009
MinimumVisualStudioVersion = 15.0.26124.0
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{20D142F6-AE1A-4AF1-A314-13E6AAF040ED}"
EndProject
Expand All @@ -10,6 +10,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{DB316F2F-3
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalR.Orleans.Tests", "test\SignalR.Orleans.Tests\SignalR.Orleans.Tests.csproj", "{C047C695-9F3C-4DDE-9D0C-06608E191B67}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{6813D377-6B57-40E3-9FDD-8D20F7CADEDF}"
ProjectSection(SolutionItems) = preProject
README.md = README.md
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
21 changes: 15 additions & 6 deletions src/SignalR.Orleans/Clients/ClientGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,44 @@ internal class ClientGrain : Grain<ClientState>, IClientGrain
public override Task OnActivateAsync()
{
this._streamProvider = this.GetStreamProvider(Constants.STREAM_PROVIDER);
this._clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, this.GetPrimaryKeyString());
if (this.State.ServerId != Guid.Empty)
this._serverStream = this._streamProvider.GetStream<ClientMessage>(this.State.ServerId, Constants.SERVERS_STREAM);
if (this.State.ServerId == Guid.Empty)
return Task.CompletedTask;

this._clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, this.State.ConnectionId);
this._serverStream = this._streamProvider.GetStream<ClientMessage>(this.State.ServerId, Constants.SERVERS_STREAM);
return Task.CompletedTask;
}

public Task SendMessage(object message)
{
if (this.State.ServerId == Guid.Empty) throw new InvalidOperationException("Client not connected.");
return this._serverStream.OnNextAsync(new ClientMessage { ConnectionId = this.GetPrimaryKeyString(), Payload = message });
if (string.IsNullOrWhiteSpace(this.State.HubName)) throw new InvalidOperationException("Client hubname not set.");
if (string.IsNullOrWhiteSpace(this.State.ConnectionId)) throw new InvalidOperationException("Client ConnectionId not set.");
return this._serverStream.OnNextAsync(new ClientMessage { ConnectionId = State.ConnectionId, Payload = message, HubName = State.HubName });
}

public Task OnConnect(Guid serverId)
public Task OnConnect(Guid serverId, string hubName, string connectionId)
{
this.State.ServerId = serverId;
this.State.HubName = hubName;
this.State.ConnectionId = connectionId;
this._serverStream = this._streamProvider.GetStream<ClientMessage>(this.State.ServerId, Constants.SERVERS_STREAM);
this._clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, this.State.ConnectionId);
return this.WriteStateAsync();
}

public async Task OnDisconnect()
{
await this._clientDisconnectStream.OnNextAsync(this.State.ConnectionId);
await this.ClearStateAsync();
await this._clientDisconnectStream.OnNextAsync(this.GetPrimaryKeyString());
this.DeactivateOnIdle();
}
}

internal class ClientState
{
public Guid ServerId { get; set; }
public string ConnectionId { get; set; }
public string HubName { get; set; }
}
}
1 change: 1 addition & 0 deletions src/SignalR.Orleans/Clients/ClientMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace SignalR.Orleans.Clients
{
public class ClientMessage
{
public string HubName { get; set; }
public string ConnectionId { get; set; }
public object Payload { get; set; }
}
Expand Down
4 changes: 2 additions & 2 deletions src/SignalR.Orleans/Clients/IClientGrain.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
using Orleans;
using System;
using System.Threading.Tasks;
using Orleans;

namespace SignalR.Orleans.Clients
{
public interface IClientGrain : IGrainWithStringKey
{
Task SendMessage(object message);
Task OnConnect(Guid serverId);
Task OnConnect(Guid serverId, string hubName, string connectionId);
Task OnDisconnect();
}
}
2 changes: 0 additions & 2 deletions src/SignalR.Orleans/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ public static class Constants
public const string STORAGE_PROVIDER = "ORLEANS_SIGNALR_STORAGE_PROVIDER";
public const string SERVERS_STREAM = "SERVERS_STREAM";
public const string STREAM_PROVIDER = "ORLEANS_SIGNALR_STREAM_PROVIDER";
public const string CLIENT_DISCONNECT_STREAM = "CLIENT_DISCONNECT_STREAM";
public static readonly Guid CLIENT_DISCONNECT_STREAM_ID = Guid.Parse("bdcff7e7-3734-48ab-8599-17d915011b85");
public static readonly Guid ALL_STREAM_ID = Guid.Parse("fbe53ecd-d896-4916-8281-5571d6733566");

}
}
19 changes: 13 additions & 6 deletions src/SignalR.Orleans/Core/ConnectionGrain.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Orleans;
using Orleans.Streams;
using SignalR.Orleans.Clients;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand All @@ -13,26 +12,28 @@ namespace SignalR.Orleans.Core
public override async Task OnActivateAsync()
{
this._streamProvider = this.GetStreamProvider(Constants.STREAM_PROVIDER);

var subscriptionTasks = new List<Task>();
foreach (var connection in this.State.Connections)
{
var clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, connection.Key);
var subscriptions = await clientDisconnectStream.GetAllSubscriptionHandles();
foreach (var subscription in subscriptions)
{
subscriptionTasks.Add(subscription.ResumeAsync(async (item, token) => await this.Remove(item)));
subscriptionTasks.Add(subscription.ResumeAsync(async (connectionId, token) => await this.Remove(connectionId)));
}
}
await Task.WhenAll(subscriptionTasks);
}

public virtual async Task Add(string connectionId)
public virtual async Task Add(string hubName, string connectionId)
{
if (!this.State.Connections.ContainsKey(connectionId))
{
if (string.IsNullOrWhiteSpace(State.HubName))
State.HubName = hubName;

var clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, connectionId);
var subscription = await clientDisconnectStream.SubscribeAsync(async (item, token) => await this.Remove(item));
var subscription = await clientDisconnectStream.SubscribeAsync(async (connId, token) => await this.Remove(connId));
this.State.Connections.Add(connectionId, subscription);
await this.WriteStateAsync();
}
Expand Down Expand Up @@ -61,16 +62,22 @@ public virtual Task SendMessage(object message)
var tasks = new List<Task>();
foreach (var connection in this.State.Connections)
{
var client = GrainFactory.GetGrain<IClientGrain>(connection.Key);
var client = GrainFactory.GetClientGrain(State.HubName, connection.Key);
tasks.Add(client.SendMessage(message));
}

return Task.WhenAll(tasks);
}

public Task<int> Count()
{
return Task.FromResult(State.Connections.Count);
}
}

internal abstract class ConnectionState
{
public Dictionary<string, StreamSubscriptionHandle<string>> Connections { get; set; } = new Dictionary<string, StreamSubscriptionHandle<string>>();
public string HubName { get; set; }
}
}
37 changes: 37 additions & 0 deletions src/SignalR.Orleans/Core/GrainExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using SignalR.Orleans.Clients;
using SignalR.Orleans.Core;
using SignalR.Orleans.Groups;
using SignalR.Orleans.Users;
using System;
using System.Threading.Tasks;

// ReSharper disable once CheckNamespace
namespace Orleans
{
public static class GrainSignalRExtensions
{
public static async Task SendSignalRMessage(this IConnectionGrain grain, string methodName, params object[] message)
{
var invocationMessage = new InvocationMessage(Guid.NewGuid().ToString(), nonBlocking: true, target: methodName, arguments: message);
await grain.SendMessage(invocationMessage);
}
}

public static class GrainFactoryExtensions
{
public static HubContext<THub> GetHub<THub>(this IGrainFactory grainFactory)
{
return new HubContext<THub>(grainFactory);
}

internal static IClientGrain GetClientGrain(this IGrainFactory factory, string hubName, string connectionId)
=> factory.GetGrain<IClientGrain>(Utils.BuildGrainId(hubName, connectionId));

internal static IGroupGrain GetGroupGrain(this IGrainFactory factory, string hubName, string groupName)
=> factory.GetGrain<IGroupGrain>(Utils.BuildGrainId(hubName, groupName));

internal static IUserGrain GetUserGrain(this IGrainFactory factory, string hubName, string userId)
=> factory.GetGrain<IUserGrain>(Utils.BuildGrainId(hubName, userId));
}
}
27 changes: 27 additions & 0 deletions src/SignalR.Orleans/Core/HubContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Orleans;
using SignalR.Orleans.Clients;
using SignalR.Orleans.Groups;
using SignalR.Orleans.Users;

namespace SignalR.Orleans.Core
{
public class HubContext<THub>
{
private readonly IGrainFactory _grainFactory;
private readonly string _hubName;

public HubContext(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
var hubType = typeof(THub);
_hubName = hubType.IsInterface && hubType.Name.StartsWith("I")
? hubType.Name.Substring(1)
: hubType.Name;
}

public IClientGrain Client(string connectionId) => _grainFactory.GetClientGrain(_hubName, connectionId);
public IGroupGrain Group(string groupName) => _grainFactory.GetGroupGrain(_hubName, groupName);
public IUserGrain User(string userId) => _grainFactory.GetUserGrain(_hubName, userId);

}
}
3 changes: 2 additions & 1 deletion src/SignalR.Orleans/Core/IConnectionGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ namespace SignalR.Orleans.Core
{
public interface IConnectionGrain : IGrainWithStringKey
{
Task Add(string connectionId);
Task Add(string hubName, string connectionId);
Task Remove(string connectionId);
Task SendMessage(object message);
Task<int> Count();
}
}
10 changes: 10 additions & 0 deletions src/SignalR.Orleans/Core/Utils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace SignalR.Orleans.Core
{
internal static class Utils
{
internal static string BuildGrainId(string hubName, string key) => $"{hubName}:{key}".ToLower();

internal static string BuildStreamHubName(string hubName) => $"registered-hub::{hubName}".ToLower();

}
}
6 changes: 0 additions & 6 deletions src/SignalR.Orleans/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@
using Orleans;
using Orleans.Hosting;
using Orleans.Runtime.Configuration;
using Orleans.Serialization;
using SignalR.Orleans;
using SignalR.Orleans.Clients;
using System.Reflection;

namespace Microsoft.Extensions.DependencyInjection
{
public static class OrleansDependencyInjectionExtensions
{
public static ClusterConfiguration AddSignalR(this ClusterConfiguration config)
{
config.Globals.SerializationProviders.Add(typeof(HubMessageSerializer).GetTypeInfo());
config.Globals.FallbackSerializationProvider = typeof(ILBasedSerializer).GetTypeInfo();
config.AddSimpleMessageStreamProvider(Constants.STREAM_PROVIDER);
try
{
Expand All @@ -30,8 +26,6 @@ public static ClusterConfiguration AddSignalR(this ClusterConfiguration config)

public static ClientConfiguration AddSignalR(this ClientConfiguration config)
{
config.FallbackSerializationProvider = typeof(ILBasedSerializer).GetTypeInfo();
config.SerializationProviders.Add(typeof(HubMessageSerializer).GetTypeInfo());
config.AddSimpleMessageStreamProvider(Constants.STREAM_PROVIDER);
return config;
}
Expand Down
Loading

0 comments on commit 1306b2a

Please sign in to comment.