Skip to content

Commit

Permalink
concurrent queue & remove websocket enable async & close processing
Browse files Browse the repository at this point in the history
  • Loading branch information
psygames committed Aug 23, 2024
1 parent e44a2ae commit 96703ee
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using System.Net.WebSockets;
using System.IO;
using System.Collections.Concurrent;

namespace UnityWebSocket
{
Expand Down Expand Up @@ -44,6 +45,9 @@ public WebSocketState ReadyState

private ClientWebSocket socket;
private bool isOpening => socket != null && socket.State == System.Net.WebSockets.WebSocketState.Open;
private ConcurrentQueue<SendBuffer> sendQueue = new ConcurrentQueue<SendBuffer>();
private readonly ConcurrentQueue<EventArgs> receiveQueue = new ConcurrentQueue<EventArgs>();
private bool closeProcessing;

#region APIs
public WebSocket(string address)
Expand All @@ -65,15 +69,17 @@ public WebSocket(string address, string[] subProtocols)

public void ConnectAsync()
{
#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
WebSocketManager.Instance.Add(this);
#endif
if (socket != null)
{
HandleError(new Exception("Socket is busy."));
return;
}

WebSocketManager.Instance.Add(this);

socket = new ClientWebSocket();

// support sub protocols
if (this.SubProtocols != null)
{
foreach (var protocol in this.SubProtocols)
Expand All @@ -83,31 +89,49 @@ public void ConnectAsync()
socket.Options.AddSubProtocol(protocol);
}
}

Task.Run(ConnectTask);
}

public void CloseAsync()
{
if (!isOpening) return;
SendBufferAsync(new SendBuffer(null, WebSocketMessageType.Close));
closeProcessing = true;
}

public void SendAsync(byte[] data)
{
if (!isOpening) return;
var buffer = new SendBuffer(data, WebSocketMessageType.Binary);
SendBufferAsync(buffer);
sendQueue.Enqueue(buffer);
}

public void SendAsync(string text)
{
if (!isOpening) return;
var data = Encoding.UTF8.GetBytes(text);
var buffer = new SendBuffer(data, WebSocketMessageType.Text);
SendBufferAsync(buffer);
sendQueue.Enqueue(buffer);
}
#endregion

class SendBuffer
{
public byte[] data;
public WebSocketMessageType type;
public SendBuffer(byte[] data, WebSocketMessageType type)
{
this.data = data;
this.type = type;
}
}

private void CleanSendQueue()
{
Log($"Clean Send Queue Begin ...");
while (sendQueue.TryDequeue(out var _)) ;
Log($"Clean Send Queue End !");
}

private async Task ConnectTask()
{
Expand All @@ -128,71 +152,34 @@ private async Task ConnectTask()

HandleOpen();

Log("Connect Task End !");
Log("Connect Task Success !");

await ReceiveTask();
}

class SendBuffer
{
public byte[] data;
public WebSocketMessageType type;
public SendBuffer(byte[] data, WebSocketMessageType type)
{
this.data = data;
this.type = type;
}
StartReceiveTask();
StartSendTask();
}

private object sendQueueLock = new object();
private Queue<SendBuffer> sendQueue = new Queue<SendBuffer>();
private bool isSendTaskRunning;

private void SendBufferAsync(SendBuffer buffer)
{
if (isSendTaskRunning)
{
lock (sendQueueLock)
{
if (buffer.type == WebSocketMessageType.Close)
{
sendQueue.Clear();
}
sendQueue.Enqueue(buffer);
}
}
else
{
isSendTaskRunning = true;
sendQueue.Enqueue(buffer);
Task.Run(SendTask);
}
}

private async Task SendTask()
private async void StartSendTask()
{
Log("Send Task Begin ...");

try
{
SendBuffer buffer = null;
while (sendQueue.Count > 0 && isOpening)
while (!closeProcessing)
{
lock (sendQueueLock)
{
buffer = sendQueue.Dequeue();
}
if (buffer.type == WebSocketMessageType.Close)
{
Log($"Close Send Begin ...");
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Normal Closure", CancellationToken.None);
Log($"Close Send End !");
}
else
while (!closeProcessing && sendQueue.TryDequeue(out var buffer))
{
Log($"Send, type: {buffer.type}, size: {buffer.data.Length}, queue left: {sendQueue.Count}");
await socket.SendAsync(new ArraySegment<byte>(buffer.data), buffer.type, true, CancellationToken.None);
}
await Task.Yield();
}

if (closeProcessing)
{
CleanSendQueue();
Log($"Close Send Begin ...");
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Normal Closure", CancellationToken.None);
Log($"Close Send Success !");
}
}
catch (Exception e)
Expand All @@ -201,13 +188,13 @@ private async Task SendTask()
}
finally
{
isSendTaskRunning = false;
closeProcessing = false;
}

Log("Send Task End !");
}

private async Task ReceiveTask()
private async void StartReceiveTask()
{
Log("Receive Task Begin ...");

Expand Down Expand Up @@ -261,72 +248,39 @@ private async Task ReceiveTask()

private void SocketDispose()
{
sendQueue.Clear();
CleanSendQueue();
socket.Dispose();
socket = null;
}

private void HandleOpen()
{
Log("OnOpen");
#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
HandleEventSync(new OpenEventArgs());
#else
OnOpen?.Invoke(this, new OpenEventArgs());
#endif
receiveQueue.Enqueue(new OpenEventArgs());
}

private void HandleMessage(Opcode opcode, byte[] rawData)
{
Log($"OnMessage, type: {opcode}, size: {rawData.Length}\n{BitConverter.ToString(rawData)}");
#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
HandleEventSync(new MessageEventArgs(opcode, rawData));
#else
OnMessage?.Invoke(this, new MessageEventArgs(opcode, rawData));
#endif
Log($"OnMessage, type: {opcode}, size: {rawData.Length}");
receiveQueue.Enqueue(new MessageEventArgs(opcode, rawData));
}

private void HandleClose(ushort code, string reason)
{
Log($"OnClose, code: {code}, reason: {reason}");
#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
HandleEventSync(new CloseEventArgs(code, reason));
#else
OnClose?.Invoke(this, new CloseEventArgs(code, reason));
#endif
receiveQueue.Enqueue(new CloseEventArgs(code, reason));
}

private void HandleError(Exception exception)
{
Log("OnError, error: " + exception.Message);
#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
HandleEventSync(new ErrorEventArgs(exception.Message));
#else
OnError?.Invoke(this, new ErrorEventArgs(exception.Message));
#endif
}

#if !UNITY_WEB_SOCKET_ENABLE_ASYNC
private readonly Queue<EventArgs> eventQueue = new Queue<EventArgs>();
private readonly object eventQueueLock = new object();
private void HandleEventSync(EventArgs eventArgs)
{
lock (eventQueueLock)
{
eventQueue.Enqueue(eventArgs);
}
receiveQueue.Enqueue(new ErrorEventArgs(exception.Message));
}

internal void Update()
{
EventArgs e;
while (eventQueue.Count > 0)
while (receiveQueue.TryDequeue(out var e))
{
lock (eventQueueLock)
{
e = eventQueue.Dequeue();
}

if (e is CloseEventArgs)
{
OnClose?.Invoke(this, e as CloseEventArgs);
Expand All @@ -346,7 +300,6 @@ internal void Update()
}
}
}
#endif

[System.Diagnostics.Conditional("UNITY_WEB_SOCKET_LOG")]
static void Log(string msg)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if !NET_LEGACY && (UNITY_EDITOR || !UNITY_WEBGL) && !UNITY_WEB_SOCKET_ENABLE_ASYNC
#if !NET_LEGACY && (UNITY_EDITOR || !UNITY_WEBGL)
using System.Collections.Generic;
using UnityEngine;

Expand Down
3 changes: 2 additions & 1 deletion ProjectSettings/ProjectSettings.asset
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ PlayerSettings:
androidBlitType: 0
defaultIsNativeResolution: 1
macRetinaSupport: 1
runInBackground: 0
runInBackground: 1
captureSingleScreen: 0
muteOtherAudioSources: 0
Prepare IOS For Recording: 0
Expand Down Expand Up @@ -516,6 +516,7 @@ PlayerSettings:
webGLThreadsSupport: 0
scriptingDefineSymbols:
1:
13: UNITY_WEB_SOCKET_LOG
platformArchitecture: {}
scriptingBackend:
Standalone: 0
Expand Down

0 comments on commit 96703ee

Please sign in to comment.