Skip to content

fix: increases maximum message size to int.MaxValue (#1384) #1462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6639d8d
fix: increases maximum message size to int.MaxValue (#1384)
ShadauxCat Nov 16, 2021
825d929
Merge branch 'release/1.0.0' into backport/1.0.0-fix-increase_max_mes…
andrews-unity Nov 24, 2021
164de87
Merge branch 'release/1.0.0' into backport/1.0.0-fix-increase_max_mes…
andrews-unity Nov 24, 2021
8724e35
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
e229766
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
f4e4d00
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
10a4bb0
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
abe0e33
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
dabf598
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
6180917
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
f4314e1
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
aa2d051
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
6bb3420
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
956d918
Merge release/1.0.0 into backport/1.0.0-fix-increase_max_message_size
netcode-ci-service Nov 24, 2021
81825dd
merge branch 'release/1.0.0' into 'backport/1.0.0-fix-increase_max_me…
0xFA11 Nov 25, 2021
aeb709b
Merge branch 'release/1.0.0' into backport/1.0.0-fix-increase_max_mes…
0xFA11 Nov 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Unity.Netcode
/// </summary>
internal static class NetworkConstants
{
internal const string PROTOCOL_VERSION = "14.0.0";
internal const string PROTOCOL_VERSION = "15.0.0";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ internal void __sendServerRpc(FastBufferWriter writer, uint rpcMethodId, ServerR
SystemOwner = NetworkManager,
// header information isn't valid since it's not a real message.
// Passing false to canDefer prevents it being accessed.
Header = new MessageHeader()
Header = new MessageHeader(),
SerializedHeaderSize = 0,
};
message.Handle(tempBuffer, context, NetworkManager, NetworkManager.ServerClientId, false);
rpcMessageSize = tempBuffer.Length;
Expand Down Expand Up @@ -188,7 +189,8 @@ internal unsafe void __sendClientRpc(FastBufferWriter writer, uint rpcMethodId,
SystemOwner = NetworkManager,
// header information isn't valid since it's not a real message.
// Passing false to canDefer prevents it being accessed.
Header = new MessageHeader()
Header = new MessageHeader(),
SerializedHeaderSize = 0,
};
message.Handle(tempBuffer, context, NetworkManager, NetworkManager.ServerClientId, false);
messageSize = tempBuffer.Length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal CustomMessagingManager(NetworkManager networkManager)
/// </summary>
public event UnnamedMessageDelegate OnUnnamedMessage;

internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader)
internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader, int serializedHeaderSize)
{
if (OnUnnamedMessage != null)
{
Expand All @@ -40,7 +40,7 @@ internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader)
((UnnamedMessageDelegate)handler).Invoke(clientId, reader);
}
}
m_NetworkManager.NetworkMetrics.TrackUnnamedMessageReceived(clientId, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
m_NetworkManager.NetworkMetrics.TrackUnnamedMessageReceived(clientId, reader.Length + serializedHeaderSize);
}

/// <summary>
Expand Down Expand Up @@ -115,9 +115,9 @@ public void SendUnnamedMessage(ulong clientId, FastBufferWriter messageBuffer, N
private Dictionary<ulong, string> m_MessageHandlerNameLookup32 = new Dictionary<ulong, string>();
private Dictionary<ulong, string> m_MessageHandlerNameLookup64 = new Dictionary<ulong, string>();

internal void InvokeNamedMessage(ulong hash, ulong sender, FastBufferReader reader)
internal void InvokeNamedMessage(ulong hash, ulong sender, FastBufferReader reader, int serializedHeaderSize)
{
var bytesCount = reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>();
var bytesCount = reader.Length + serializedHeaderSize;

if (m_NetworkManager == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ internal struct MessageHeader
/// unchanged - if new messages are added or messages are removed, MessageType assignments may be
/// calculated differently.
/// </summary>
public byte MessageType;
public uint MessageType;

/// <summary>
/// The total size of the message, NOT including the header.
/// Stored as a uint to avoid zig-zag encoding, but capped at int.MaxValue.
/// </summary>
public ushort MessageSize;
public uint MessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static void Receive(FastBufferReader reader, in NetworkContext context)
var message = new NamedMessage();
reader.ReadValueSafe(out message.Hash);

((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeNamedMessage(message.Hash, context.SenderId, reader);
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeNamedMessage(message.Hash, context.SenderId, reader, context.SerializedHeaderSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public unsafe void Serialize(FastBufferWriter writer)

public static void Receive(FastBufferReader reader, in NetworkContext context)
{
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeUnnamedMessage(context.SenderId, reader);
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeUnnamedMessage(context.SenderId, reader, context.SerializedHeaderSize);
}
}
}
136 changes: 77 additions & 59 deletions com.unity.netcode.gameobjects/Runtime/Messaging/MessagingSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private struct ReceiveQueueItem
public MessageHeader Header;
public ulong SenderId;
public float Timestamp;
public int MessageHeaderSerializedSize;
}

private struct SendQueueItem
Expand All @@ -46,27 +47,27 @@ public SendQueueItem(NetworkDelivery delivery, int writerSize, Allocator writerA
private MessageHandler[] m_MessageHandlers = new MessageHandler[255];
private Type[] m_ReverseTypeMap = new Type[255];

private Dictionary<Type, byte> m_MessageTypes = new Dictionary<Type, byte>();
private Dictionary<Type, uint> m_MessageTypes = new Dictionary<Type, uint>();
private Dictionary<ulong, NativeList<SendQueueItem>> m_SendQueues = new Dictionary<ulong, NativeList<SendQueueItem>>();

private List<INetworkHooks> m_Hooks = new List<INetworkHooks>();

private byte m_HighMessageType;
private uint m_HighMessageType;
private object m_Owner;
private IMessageSender m_MessageSender;
private bool m_Disposed;

internal Type[] MessageTypes => m_ReverseTypeMap;
internal MessageHandler[] MessageHandlers => m_MessageHandlers;
internal int MessageHandlerCount => m_HighMessageType;
internal uint MessageHandlerCount => m_HighMessageType;

internal byte GetMessageType(Type t)
internal uint GetMessageType(Type t)
{
return m_MessageTypes[t];
}

public const int NON_FRAGMENTED_MESSAGE_MAX_SIZE = 1300;
public const int FRAGMENTED_MESSAGE_MAX_SIZE = 64000;
public const int FRAGMENTED_MESSAGE_MAX_SIZE = int.MaxValue;

internal struct MessageWithHandler
{
Expand Down Expand Up @@ -165,14 +166,23 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float

for (var messageIdx = 0; messageIdx < batchHeader.BatchSize; ++messageIdx)
{
if (!batchReader.TryBeginRead(sizeof(MessageHeader)))

var messageHeader = new MessageHeader();
var position = batchReader.Position;
try
{
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageType);
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageSize);
}
catch (OverflowException)
{
NetworkLog.LogWarning("Received a batch that didn't have enough data for all of its batches, ending early!");
return;
throw;
}
batchReader.ReadValue(out MessageHeader messageHeader);

if (!batchReader.TryBeginRead(messageHeader.MessageSize))
var receivedHeaderSize = batchReader.Position - position;

if (!batchReader.TryBeginRead((int)messageHeader.MessageSize))
{
NetworkLog.LogWarning("Received a message that claimed a size larger than the packet, ending early!");
return;
Expand All @@ -185,9 +195,10 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float
// Copy the data for this message into a new FastBufferReader that owns that memory.
// We can't guarantee the memory in the ArraySegment stays valid because we don't own it,
// so we must move it to memory we do own.
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, messageHeader.MessageSize)
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, (int)messageHeader.MessageSize),
MessageHeaderSerializedSize = receivedHeaderSize,
});
batchReader.Seek(batchReader.Position + messageHeader.MessageSize);
batchReader.Seek(batchReader.Position + (int)messageHeader.MessageSize);
}
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
Expand All @@ -210,7 +221,7 @@ private bool CanReceive(ulong clientId, Type messageType)
return true;
}

public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp)
public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp, int serializedHeaderSize)
{
if (header.MessageType >= m_HighMessageType)
{
Expand All @@ -223,8 +234,10 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon
SystemOwner = m_Owner,
SenderId = senderId,
Timestamp = timestamp,
Header = header
Header = header,
SerializedHeaderSize = serializedHeaderSize,
};

var type = m_ReverseTypeMap[header.MessageType];
if (!CanReceive(senderId, type))
{
Expand All @@ -236,6 +249,7 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon
{
m_Hooks[hookIdx].OnBeforeReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
}

var handler = m_MessageHandlers[header.MessageType];
using (reader)
{
Expand Down Expand Up @@ -265,7 +279,7 @@ internal unsafe void ProcessIncomingMessageQueue()
{
// Avoid copies...
ref var item = ref m_IncomingMessageQueue.GetUnsafeList()->ElementAt(index);
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp);
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp, item.MessageHeaderSerializedSize);
if (m_Disposed)
{
return;
Expand Down Expand Up @@ -328,64 +342,68 @@ internal unsafe int SendMessage<TMessageType, TClientIdListType>(in TMessageType
}

var maxSize = delivery == NetworkDelivery.ReliableFragmentedSequenced ? FRAGMENTED_MESSAGE_MAX_SIZE : NON_FRAGMENTED_MESSAGE_MAX_SIZE;
var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>());
using (tmpSerializer)

using var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>());

message.Serialize(tmpSerializer);

using var headerSerializer = new FastBufferWriter(FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp);

var header = new MessageHeader
{
message.Serialize(tmpSerializer);
MessageSize = (ushort)tmpSerializer.Length,
MessageType = m_MessageTypes[typeof(TMessageType)],
};
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageType);
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageSize);

for (var i = 0; i < clientIds.Count; ++i)
{
var clientId = clientIds[i];
for (var i = 0; i < clientIds.Count; ++i)
{
var clientId = clientIds[i];

if (!CanSend(clientId, typeof(TMessageType), delivery))
{
continue;
}
if (!CanSend(clientId, typeof(TMessageType), delivery))
{
continue;
}

for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery);
}
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery);
}

var sendQueueItem = m_SendQueues[clientId];
if (sendQueueItem.Length == 0)
var sendQueueItem = m_SendQueues[clientId];
if (sendQueueItem.Length == 0)
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader));
}
else
{
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
if (lastQueueItem.NetworkDelivery != delivery ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position
< tmpSerializer.Length + headerSerializer.Length)
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader));
}
else
{
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
if (lastQueueItem.NetworkDelivery != delivery ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position
< tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>())
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader));
}
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader));
}
}

ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
var header = new MessageHeader
{
MessageSize = (ushort)tmpSerializer.Length,
MessageType = m_MessageTypes[typeof(TMessageType)],
};
ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + headerSerializer.Length);

writeQueueItem.Writer.WriteValue(header);
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
writeQueueItem.BatchHeader.BatchSize++;
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
}
writeQueueItem.Writer.WriteBytes(headerSerializer.GetUnsafePtr(), headerSerializer.Length);
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
writeQueueItem.BatchHeader.BatchSize++;
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + headerSerializer.Length);
}

return tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>();
}

return tmpSerializer.Length + headerSerializer.Length;
}

private struct PointerListWrapper<T> : IReadOnlyList<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ internal ref struct NetworkContext
/// The header data that was sent with the message
/// </summary>
public MessageHeader Header;

/// <summary>
/// The actual serialized size of the header when packed into the buffer
/// </summary>
public int SerializedHeaderSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private struct TriggerData
public MessageHeader Header;
public ulong SenderId;
public float Timestamp;
public int SerializedHeaderSize;
}
private struct TriggerInfo
{
Expand Down Expand Up @@ -117,7 +118,8 @@ internal unsafe void TriggerOnSpawn(ulong networkObjectId, FastBufferReader read
Reader = new FastBufferReader(reader.GetUnsafePtr(), Allocator.Persistent, reader.Length),
Header = context.Header,
Timestamp = context.Timestamp,
SenderId = context.SenderId
SenderId = context.SenderId,
SerializedHeaderSize = context.SerializedHeaderSize
});
}

Expand Down Expand Up @@ -501,7 +503,7 @@ private void SpawnNetworkObjectLocallyCommon(NetworkObject networkObject, ulong
foreach (var trigger in triggerInfo.TriggerData)
{
// Reader will be disposed within HandleMessage
NetworkManager.MessagingSystem.HandleMessage(trigger.Header, trigger.Reader, trigger.SenderId, trigger.Timestamp);
NetworkManager.MessagingSystem.HandleMessage(trigger.Header, trigger.Reader, trigger.SenderId, trigger.Timestamp, trigger.SerializedHeaderSize);
}

triggerInfo.TriggerData.Dispose();
Expand Down
Loading