Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding option to pass channel type to socket #1174

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions Assets/Mirage/Runtime/NetworkClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,16 @@ public void Connect(string address = null, ushort? port = null)
if (logger.LogEnabled()) logger.Log($"Client connecting to endpoint: {endPoint}");

var socket = SocketFactory.CreateClientSocket();
var maxPacketSize = SocketFactory.MaxPacketSize;
var socketInfo = SocketFactory.SocketInfo;
MessageHandler = new MessageHandler(World, DisconnectOnException, RethrowException);
var dataHandler = new DataHandler(MessageHandler);
Metrics = EnablePeerMetrics ? new Metrics(MetricsSize) : null;

var config = PeerConfig ?? new Config();

NetworkWriterPool.Configure(maxPacketSize);
NetworkWriterPool.Configure(socketInfo.MaxSize);

_peer = new Peer(socket, maxPacketSize, dataHandler, config, LogFactory.GetLogger<Peer>(), Metrics);
_peer = new Peer(socket, socketInfo, dataHandler, config, LogFactory.GetLogger<Peer>(), Metrics);
_peer.OnConnected += Peer_OnConnected;
_peer.OnConnectionFailed += Peer_OnConnectionFailed;
_peer.OnDisconnected += Peer_OnDisconnected;
Expand Down
6 changes: 3 additions & 3 deletions Assets/Mirage/Runtime/NetworkServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,14 @@ public void StartServer(NetworkClient localClient = null)
// If not, that's okay. Some games use a non-listening server for their single player game mode (Battlefield, Call of Duty...)
if (Listening)
{
var maxPacketSize = SocketFactory.MaxPacketSize;
NetworkWriterPool.Configure(maxPacketSize);
var socketInfo = SocketFactory.SocketInfo;
NetworkWriterPool.Configure(socketInfo.MaxSize);

// Create a server specific socket.
var socket = SocketFactory.CreateServerSocket();

// Tell the peer to use that newly created socket.
_peer = new Peer(socket, maxPacketSize, dataHandler, config, LogFactory.GetLogger<Peer>(), Metrics);
_peer = new Peer(socket, socketInfo, dataHandler, config, LogFactory.GetLogger<Peer>(), Metrics);
_peer.OnConnected += Peer_OnConnected;
_peer.OnDisconnected += Peer_OnDisconnected;
// Bind it to the endpoint.
Expand Down
2 changes: 1 addition & 1 deletion Assets/Mirage/Runtime/SocketLayer/ByteBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Mirage.SocketLayer
{
/// <summary>
/// Warpper around a byte[] that belongs to a <see cref="Pool{T}"/>
/// Wrapper around a byte[] that belongs to a <see cref="Pool{T}"/>
/// </summary>
public sealed class ByteBuffer : IDisposable
{
Expand Down
5 changes: 0 additions & 5 deletions Assets/Mirage/Runtime/SocketLayer/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ public class Config
/// <para>max value is 255</para>
/// </summary>
public int MaxReliableFragments = 50;

/// <summary>
/// Enable if the Socket you are using has its own Reliable layer. For example using Websocket, which is TCP.
/// </summary>
public bool DisableReliableLayer = false;
#endregion
}
}
4 changes: 2 additions & 2 deletions Assets/Mirage/Runtime/SocketLayer/Connection/AckSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private bool ShouldSendEmptyAck()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void Send(byte[] final, int length)
{
_connection.SendRaw(final, length);
_connection.SendRaw(final, length, SendMode.Unreliable);
OnSend();
}

Expand All @@ -261,7 +261,7 @@ private void SendAck()
ByteUtils.WriteUShort(final.array, ref offset, _latestAckSequence);
ByteUtils.WriteULong(final.array, ref offset, _ackMask);

_connection.SendRaw(final.array, offset);
_connection.SendRaw(final.array, offset, SendMode.Unreliable);
Send(final.array, offset);
}
}
Expand Down
41 changes: 35 additions & 6 deletions Assets/Mirage/Runtime/SocketLayer/Connection/Batch.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
using System;
using UnityEngine;

namespace Mirage.SocketLayer
{
public abstract class Batch
{
public const int MESSAGE_LENGTH_SIZE = 2;
public const int MAX_BATCH_SIZE = ushort.MaxValue;

private readonly int _maxPacketSize;

public Batch(int maxPacketSize)
{

_maxPacketSize = maxPacketSize;
}

Expand Down Expand Up @@ -39,7 +42,7 @@ public void AddMessage(byte[] message, int offset, int length)
AddToBatch(message, offset, length);
}

private void AddToBatch(byte[] message, int offset, int length)
protected virtual void AddToBatch(byte[] message, int offset, int length)
{
var batch = GetBatch();
ref var batchLength = ref GetBatchLength();
Expand All @@ -57,18 +60,21 @@ public void Flush()

public class ArrayBatch : Batch
{
private readonly Action<byte[], int> _send;
private readonly IRawConnection _connection;
private readonly PacketType _packetType;

private readonly SendMode _sendMode;
private readonly byte[] _batch;
protected readonly ILogger _logger;
private int _batchLength;

public ArrayBatch(int maxPacketSize, Action<byte[], int> send, PacketType reliable)
public ArrayBatch(int maxPacketSize, ILogger logger, IRawConnection connection, PacketType reliable, SendMode sendMode)
: base(maxPacketSize)
{
_logger = logger;
_batch = new byte[maxPacketSize];
_send = send;
_connection = connection;
_packetType = reliable;
_sendMode = sendMode;
}

protected override bool Created => _batchLength > 0;
Expand All @@ -84,9 +90,32 @@ protected override void CreateNewBatch()

protected override void SendAndReset()
{
_send.Invoke(_batch, _batchLength);
_connection.SendRaw(_batch, _batchLength, _sendMode);
_batchLength = 0;
}

protected override void AddToBatch(byte[] message, int offset, int length)
{
if (length > MAX_BATCH_SIZE)
{
var batch = GetBatch();
ref var batchLength = ref GetBatchLength();
_logger.Assert(batchLength == 1, "if length is large, then batch should be new (empty) packet");

// write zero as flag for large message,
// normal message will have atleast 1 length
ByteUtils.WriteUShort(batch, ref batchLength, 0);
Buffer.BlockCopy(message, offset, batch, batchLength, length);
batchLength += length;

// we can send right away, nothing else will fit in this message
SendAndReset();
}
else
{
base.AddToBatch(message, offset, length);
}
}
}

public class ReliableBatch : Batch, IDisposable
Expand Down
31 changes: 26 additions & 5 deletions Assets/Mirage/Runtime/SocketLayer/Connection/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

namespace Mirage.SocketLayer
{
internal abstract class Connection : IConnection
internal abstract class Connection : IConnection, IRawConnection
{
protected readonly ILogger _logger;
protected readonly int _maxPacketSize;
protected readonly SocketInfo _socketInfo;
protected readonly Peer _peer;
protected readonly IDataHandler _dataHandler;

Expand Down Expand Up @@ -55,11 +55,11 @@ public ConnectionState State

public bool Connected => State == ConnectionState.Connected;

protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, ILogger logger, Metrics metrics)
protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, SocketInfo socketInfo, Time time, ILogger logger, Metrics metrics)
{
_peer = peer;
_logger = logger;
_maxPacketSize = maxPacketSize;
_socketInfo = socketInfo;

EndPoint = endPoint ?? throw new ArgumentNullException(nameof(endPoint));
_dataHandler = dataHandler ?? throw new ArgumentNullException(nameof(dataHandler));
Expand All @@ -73,6 +73,11 @@ protected Connection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Co
_metrics = metrics;
}

void IRawConnection.SendRaw(byte[] packet, int length, SendMode mode)
{
_peer.Send(this, packet, length, mode);
}

public override string ToString()
{
return $"[{EndPoint}]";
Expand Down Expand Up @@ -207,14 +212,30 @@ private void UpdateConnected()

protected void HandleReliableBatched(byte[] array, int offset, int packetLength, PacketType packetType)
{
var firstPacket = true;
while (offset < packetLength)
{
var length = ByteUtils.ReadUShort(array, ref offset);
var length = (int)ByteUtils.ReadUShort(array, ref offset);
if (length == 0)// not batched
{
if (!firstPacket)
{
// only first message can be not batched
Disconnect(DisconnectReason.InvalidPacket);
return;
}

_logger.Assert(offset == 3);
// set real length
length = packetLength - offset;
}

var message = new ArraySegment<byte>(array, offset, length);
offset += length;

_metrics?.OnReceiveMessage(packetType, length);
_dataHandler.ReceiveMessage(this, message);
firstPacket = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IRawConnection
/// <para>packet given to this function as assumed to already have a header</para>
/// </summary>
/// <param name="packet">header and messages</param>
void SendRaw(byte[] packet, int length);
void SendRaw(byte[] packet, int length, SendMode mode);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,20 @@
namespace Mirage.SocketLayer
{
/// <summary>
/// Connection that does not run its own reliablity layer, good for TCP sockets
/// Connection that does not run its own reliability layer, good for TCP sockets
/// </summary>
internal sealed class NoReliableConnection : Connection
{
private const int HEADER_SIZE = 1 + Batch.MESSAGE_LENGTH_SIZE;

private readonly Batch _nextBatchReliable;

internal NoReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, int maxPacketSize, Time time, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, maxPacketSize, time, logger, metrics)
internal NoReliableConnection(Peer peer, IEndPoint endPoint, IDataHandler dataHandler, Config config, SocketInfo socketInfo, Time time, ILogger logger, Metrics metrics)
: base(peer, endPoint, dataHandler, config, socketInfo, time, logger, metrics)
{
_nextBatchReliable = new ArrayBatch(maxPacketSize, SendBatchInternal, PacketType.Reliable);
Debug.Assert(socketInfo.Reliability == SocketReliability.Reliable);

if (maxPacketSize > ushort.MaxValue)
{
throw new ArgumentException($"Max package size can not bigger than {ushort.MaxValue}. NoReliableConnection uses 2 bytes for message length, maxPacketSize over that value will mean that message will be incorrectly batched.");
}
}

private void SendBatchInternal(byte[] batch, int length)
{
_peer.Send(this, batch, length);
_nextBatchReliable = new ArrayBatch(socketInfo.MaxReliableSize, logger, this, PacketType.Reliable, SendMode.Reliable);
}

// just sue SendReliable for unreliable/notify
Expand All @@ -51,9 +43,9 @@ public override void SendReliable(byte[] message, int offset, int length)
{
ThrowIfNotConnectedOrConnecting();

if (length + HEADER_SIZE > _maxPacketSize)
if (length + HEADER_SIZE > _socketInfo.MaxReliableSize)
{
throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_maxPacketSize - HEADER_SIZE}");
throw new ArgumentException($"Message is bigger than MTU, size:{length} but max message size is {_socketInfo.MaxReliableSize - HEADER_SIZE}");
}

_nextBatchReliable.AddMessage(message, offset, length);
Expand Down
Loading
Loading