Skip to content

Commit

Permalink
more reliable websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
Hecate2 committed Feb 14, 2023
1 parent b8a46fb commit e79c917
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions Fairy.WebSocket.Subscribe.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
using Neo.Json;
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using System.Collections.Concurrent;
using System.Net.WebSockets;

namespace Neo.Plugins
{
public partial class Fairy : RpcServer
{
protected TaskCompletionSource<Block> committedBlock = new();
protected Block committedBlock;
protected ConcurrentDictionary<SemaphoreSlim, WebSocket> committedBlockSemaphores = new();

protected void RegisterBlockchainEvents()
{
Blockchain.Committed += delegate (NeoSystem @system, Block @block) { committedBlock.SetResult(block); };
Blockchain.Committed += delegate (NeoSystem @system, Block @block)
{
committedBlock = block;
List<SemaphoreSlim> keysToRemove = new();
foreach (var item in committedBlockSemaphores)
if (item.Value.State == WebSocketState.Open)
item.Key.Release();
else
keysToRemove.Add(item.Key);
foreach (SemaphoreSlim key in keysToRemove)
committedBlockSemaphores.TryRemove(key, out _);
};

This comment has been minimized.

Copy link
@Hecate2

Hecate2 Feb 14, 2023

Author Owner

We could have asked the delegate function to send websocket contents directly, but semaphores should be better for the websocket dispatchers to be less coupled with detailed logics of the contents sent by websockets

}

[WebsocketControlMethod]
Expand Down Expand Up @@ -47,18 +60,30 @@ protected virtual Action SubscribeCommittedBlock(WebSocket webSocket, JArray _pa
{
return async () =>
{
SemaphoreSlim semaphore = new(1);
committedBlockSemaphores[semaphore] = webSocket;
while (true)
{
Block block = await committedBlock.Task;
if (webSocket.State == WebSocketState.Open)
try
{
await webSocket.SendAsync(block.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None);
if (cancellationToken.IsCancellationRequested)
return;
await semaphore.WaitAsync();
switch (webSocket.State)
{
case WebSocketState.Open:
await webSocket.SendAsync(committedBlock.ToJson(system.Settings).ToByteArray(false), WebSocketMessageType.Text, true, CancellationToken.None);
if (cancellationToken.IsCancellationRequested)
return;
break;
case WebSocketState.Closed:
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None);
webSocket.Dispose();
return;
default:
break;
}
}
else
catch
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, webSocket.State.ToString(), CancellationToken.None);
webSocket.Dispose();
return;
}
Expand Down

0 comments on commit e79c917

Please sign in to comment.