Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
galvesribeiro committed Nov 3, 2017
0 parents commit 7c04c8a
Show file tree
Hide file tree
Showing 14 changed files with 502 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/obj
**/bin
17 changes: 17 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"taskName": "build",
"command": "dotnet build",
"type": "shell",
"group": "build",
"presentation": {
"reveal": "silent"
},
"problemMatcher": "$msCompile"
}
]
}
39 changes: 39 additions & 0 deletions OrleansR.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{20D142F6-AE1A-4AF1-A314-13E6AAF040ED}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OrleansR", "src\OrleansR\OrleansR.csproj", "{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|x64.ActiveCfg = Debug|x64
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|x64.Build.0 = Debug|x64
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|x86.ActiveCfg = Debug|x86
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Debug|x86.Build.0 = Debug|x86
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|Any CPU.Build.0 = Release|Any CPU
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|x64.ActiveCfg = Release|x64
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|x64.Build.0 = Release|x64
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|x86.ActiveCfg = Release|x86
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6}.Release|x86.Build.0 = Release|x86
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{559ED786-70A6-4BBB-8F09-C544F8E9D7E6} = {20D142F6-AE1A-4AF1-A314-13E6AAF040ED}
EndGlobalSection
EndGlobal
Empty file added README.md
Empty file.
43 changes: 43 additions & 0 deletions src/OrleansR/Clients/ClientGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Streams;

namespace OrleansR.Clients
{
public class ClientGrain : Grain<ClientState>, IClientGrain
{
private IStreamProvider _streamProvider;
private IAsyncStream<ClientMessage> _serverStream;
private IAsyncStream<string> _clientDisconnectStream;

public override Task OnActivateAsync()
{
this._streamProvider = this.GetStreamProvider(Constants.STREAM_PROVIDER);
this._clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, this.GetPrimaryKeyString());
if (this.State.ServerId != Guid.Empty)
this._serverStream = this._streamProvider.GetStream<ClientMessage>(this.State.ServerId, Constants.SERVERS_STREAM);
return Task.CompletedTask;
}

public Task SendMessage(object message)
{
if (this.State.ServerId == Guid.Empty) throw new InvalidOperationException("Client not connected.");
return this._serverStream.OnNextAsync(new ClientMessage { ConnectionId = this.GetPrimaryKeyString(), Payload = message });
}

public Task OnConnect(Guid serverId)
{
this.State.ServerId = serverId;
this._serverStream = this._streamProvider.GetStream<ClientMessage>(this.State.ServerId, Constants.SERVERS_STREAM);
return this.WriteStateAsync();
}

public Task OnDisconnect() => this._clientDisconnectStream.OnNextAsync(this.GetPrimaryKeyString());
}

public class ClientState
{
public Guid ServerId { get; set; }
}
}
8 changes: 8 additions & 0 deletions src/OrleansR/Clients/ClientMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace OrleansR.Clients
{
public class ClientMessage
{
public string ConnectionId { get; set; }
public object Payload { get; set; }
}
}
13 changes: 13 additions & 0 deletions src/OrleansR/Clients/IClientGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;
using Orleans;

namespace OrleansR.Clients
{
public interface IClientGrain : IGrainWithStringKey
{
Task SendMessage(object message);
Task OnConnect(Guid serverId);
Task OnDisconnect();
}
}
14 changes: 14 additions & 0 deletions src/OrleansR/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace OrleansR
{
public static class Constants
{
public const string SERVERS_STREAM = "SERVERS_STREAM";
public const string STREAM_PROVIDER = "HUB_STREAM_PROVIDER";
public const string CLIENT_DISCONNECT_STREAM = "CLIENT_DISCONNECT_STREAM";
public static readonly Guid CLIENT_DISCONNECT_STREAM_ID = Guid.Parse("bdcff7e7-3734-48ab-8599-17d915011b85");
public static readonly Guid ALL_STREAM_ID = Guid.Parse("fbe53ecd-d896-4916-8281-5571d6733566");

}
}
75 changes: 75 additions & 0 deletions src/OrleansR/Groups/GroupGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using OrleansR.Clients;
using Orleans;
using Orleans.Providers;
using Orleans.Streams;

namespace OrleansR.Groups
{
[StorageProvider(ProviderName = "GroupState")]
public class GroupGrain : Grain<GroupState>, IGroupGrain
{
private IStreamProvider _streamProvider;

public override async Task OnActivateAsync()
{
this._streamProvider = this.GetStreamProvider(Constants.STREAM_PROVIDER);

var subscriptionTasks = new List<Task>();
foreach (var member in this.State.Members)
{
var clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, member.Key);
var subscriptions = await clientDisconnectStream.GetAllSubscriptionHandles();
foreach (var subscription in subscriptions)
{
subscriptionTasks.Add(subscription.ResumeAsync(async (item, token) => await this.RemoveMember(item)));
}
}
await Task.WhenAll(subscriptionTasks);
}

public async Task AddMember(string connectionId)
{
if (!this.State.Members.ContainsKey(connectionId))
{
var clientDisconnectStream = this._streamProvider.GetStream<string>(Constants.CLIENT_DISCONNECT_STREAM_ID, connectionId);
var subscription = await clientDisconnectStream.SubscribeAsync(async (item, token) => await this.RemoveMember(item));
this.State.Members.Add(connectionId, subscription);
await this.WriteStateAsync();
}
}

public async Task RemoveMember(string connectionId)
{
await this.State.Members[connectionId]?.UnsubscribeAsync();
this.State.Members.Remove(connectionId);
if (this.State.Members.Count == 0)
{
await this.ClearStateAsync();
this.DeactivateOnIdle();
}
else
{
await this.WriteStateAsync();
}
}

public Task SendMessage(object message)
{
var tasks = new List<Task>();
foreach (var member in this.State.Members)
{
var client = GrainFactory.GetGrain<IClientGrain>(member.Key);
tasks.Add(client.SendMessage(message));
}

return Task.WhenAll(tasks);
}
}

public class GroupState
{
public Dictionary<string, StreamSubscriptionHandle<string>> Members { get; set; } = new Dictionary<string, StreamSubscriptionHandle<string>>();
}
}
7 changes: 7 additions & 0 deletions src/OrleansR/Groups/GroupMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace OrleansR.Groups
{
public class GroupMessage
{
public object Payload { get; set; }
}
}
13 changes: 13 additions & 0 deletions src/OrleansR/Groups/IGroupGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;
using Orleans;

namespace OrleansR.Groups
{
public interface IGroupGrain : IGrainWithStringKey
{
Task AddMember(string connectionId);
Task RemoveMember(string connectionId);
Task SendMessage(object message);
}
}
32 changes: 32 additions & 0 deletions src/OrleansR/HubMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;
using OrleansR.Clients;
using OrleansR.Groups;
using Orleans.Runtime;
using Orleans.Serialization;

namespace OrleansR
{
public class HubMessageSerializer : IExternalSerializer
{
private readonly IReadOnlyList<Type> _supportedType = new List<Type> { typeof(GroupMessage), typeof(AllMessage), typeof(ClientMessage) };
private readonly ILBasedSerializer _serializer;

public HubMessageSerializer(IServiceProvider serviceProvider)
{
this._serializer = ActivatorUtilities.CreateInstance<ILBasedSerializer>(serviceProvider);
}

public object DeepCopy(object source, ICopyContext context) => this._serializer.DeepCopy(source, context);

public object Deserialize(Type expectedType, IDeserializationContext context) => this._serializer.Deserialize(expectedType, context);

public void Initialize(Logger logger) => this._serializer.Initialize(logger);

public bool IsSupportedType(Type itemType) => this._supportedType.Contains(itemType);

public void Serialize(object item, ISerializationContext context, Type expectedType) => this._serializer.Serialize(item, context, expectedType);
}
}
Loading

0 comments on commit 7c04c8a

Please sign in to comment.