Skip to content

fix: increases maximum message size to int.MaxValue #1384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions com.unity.netcode.gameobjects/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Additional documentation and release notes are available at [Multiplayer Documen
### Changed

- ServerRpcParams and ClientRpcParams must be the last parameter of an RPC in order to function properly. Added a compile-time check to ensure this is the case and trigger an error if they're placed elsewhere. (#1318)
- The SDK no longer limits message size to 64k. (The transport may still impose its own limits, but the SDK no longer does.) (#1384)

## [1.0.0-pre.3] - 2021-10-22

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Unity.Netcode
/// </summary>
internal static class NetworkConstants
{
internal const string PROTOCOL_VERSION = "14.0.0";
internal const string PROTOCOL_VERSION = "15.0.0";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ internal void __sendServerRpc(FastBufferWriter writer, uint rpcMethodId, ServerR
SystemOwner = NetworkManager,
// header information isn't valid since it's not a real message.
// Passing false to canDefer prevents it being accessed.
Header = new MessageHeader()
Header = new MessageHeader(),
SerializedHeaderSize = 0,
};
message.Handle(tempBuffer, context, NetworkManager, NetworkManager.ServerClientId, false);
rpcMessageSize = tempBuffer.Length;
Expand Down Expand Up @@ -188,7 +189,8 @@ internal unsafe void __sendClientRpc(FastBufferWriter writer, uint rpcMethodId,
SystemOwner = NetworkManager,
// header information isn't valid since it's not a real message.
// Passing false to canDefer prevents it being accessed.
Header = new MessageHeader()
Header = new MessageHeader(),
SerializedHeaderSize = 0,
};
message.Handle(tempBuffer, context, NetworkManager, NetworkManager.ServerClientId, false);
messageSize = tempBuffer.Length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal CustomMessagingManager(NetworkManager networkManager)
/// </summary>
public event UnnamedMessageDelegate OnUnnamedMessage;

internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader)
internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader, int serializedHeaderSize)
{
if (OnUnnamedMessage != null)
{
Expand All @@ -40,7 +40,7 @@ internal void InvokeUnnamedMessage(ulong clientId, FastBufferReader reader)
((UnnamedMessageDelegate)handler).Invoke(clientId, reader);
}
}
m_NetworkManager.NetworkMetrics.TrackUnnamedMessageReceived(clientId, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
m_NetworkManager.NetworkMetrics.TrackUnnamedMessageReceived(clientId, reader.Length + serializedHeaderSize);
}

/// <summary>
Expand Down Expand Up @@ -115,9 +115,9 @@ public void SendUnnamedMessage(ulong clientId, FastBufferWriter messageBuffer, N
private Dictionary<ulong, string> m_MessageHandlerNameLookup32 = new Dictionary<ulong, string>();
private Dictionary<ulong, string> m_MessageHandlerNameLookup64 = new Dictionary<ulong, string>();

internal void InvokeNamedMessage(ulong hash, ulong sender, FastBufferReader reader)
internal void InvokeNamedMessage(ulong hash, ulong sender, FastBufferReader reader, int serializedHeaderSize)
{
var bytesCount = reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>();
var bytesCount = reader.Length + serializedHeaderSize;

if (m_NetworkManager == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ internal struct MessageHeader
/// unchanged - if new messages are added or messages are removed, MessageType assignments may be
/// calculated differently.
/// </summary>
public byte MessageType;
public uint MessageType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a required change for this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not required, no, but I thought it made sense. Making the other one larger meant it made sense to start packing the header. If the header's packed, then there's no reason to limit the number of messages to the byte range. It still ends up serialized as a byte due to packing.


/// <summary>
/// The total size of the message, NOT including the header.
/// Stored as a uint to avoid zig-zag encoding, but capped at int.MaxValue.
/// </summary>
public ushort MessageSize;
public uint MessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public static void Receive(FastBufferReader reader, in NetworkContext context)
var message = new NamedMessage();
reader.ReadValueSafe(out message.Hash);

((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeNamedMessage(message.Hash, context.SenderId, reader);
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeNamedMessage(message.Hash, context.SenderId, reader, context.SerializedHeaderSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public unsafe void Serialize(FastBufferWriter writer)

public static void Receive(FastBufferReader reader, in NetworkContext context)
{
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeUnnamedMessage(context.SenderId, reader);
((NetworkManager)context.SystemOwner).CustomMessagingManager.InvokeUnnamedMessage(context.SenderId, reader, context.SerializedHeaderSize);
}
}
}
136 changes: 77 additions & 59 deletions com.unity.netcode.gameobjects/Runtime/Messaging/MessagingSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private struct ReceiveQueueItem
public MessageHeader Header;
public ulong SenderId;
public float Timestamp;
public int MessageHeaderSerializedSize;
}

private struct SendQueueItem
Expand All @@ -46,27 +47,27 @@ public SendQueueItem(NetworkDelivery delivery, int writerSize, Allocator writerA
private MessageHandler[] m_MessageHandlers = new MessageHandler[255];
private Type[] m_ReverseTypeMap = new Type[255];

private Dictionary<Type, byte> m_MessageTypes = new Dictionary<Type, byte>();
private Dictionary<Type, uint> m_MessageTypes = new Dictionary<Type, uint>();
private Dictionary<ulong, NativeList<SendQueueItem>> m_SendQueues = new Dictionary<ulong, NativeList<SendQueueItem>>();

private List<INetworkHooks> m_Hooks = new List<INetworkHooks>();

private byte m_HighMessageType;
private uint m_HighMessageType;
private object m_Owner;
private IMessageSender m_MessageSender;
private bool m_Disposed;

internal Type[] MessageTypes => m_ReverseTypeMap;
internal MessageHandler[] MessageHandlers => m_MessageHandlers;
internal int MessageHandlerCount => m_HighMessageType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above not clear why these changes are needed.

internal uint MessageHandlerCount => m_HighMessageType;

internal byte GetMessageType(Type t)
internal uint GetMessageType(Type t)
{
return m_MessageTypes[t];
}

public const int NON_FRAGMENTED_MESSAGE_MAX_SIZE = 1300;
public const int FRAGMENTED_MESSAGE_MAX_SIZE = 64000;
public const int FRAGMENTED_MESSAGE_MAX_SIZE = int.MaxValue;

internal struct MessageWithHandler
{
Expand Down Expand Up @@ -165,14 +166,23 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float

for (var messageIdx = 0; messageIdx < batchHeader.BatchSize; ++messageIdx)
{
if (!batchReader.TryBeginRead(sizeof(MessageHeader)))

var messageHeader = new MessageHeader();
var position = batchReader.Position;
try
{
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageType);
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageSize);
}
catch (OverflowException)
{
NetworkLog.LogWarning("Received a batch that didn't have enough data for all of its batches, ending early!");
return;
throw;
}
batchReader.ReadValue(out MessageHeader messageHeader);

if (!batchReader.TryBeginRead(messageHeader.MessageSize))
var receivedHeaderSize = batchReader.Position - position;

if (!batchReader.TryBeginRead((int)messageHeader.MessageSize))
{
NetworkLog.LogWarning("Received a message that claimed a size larger than the packet, ending early!");
return;
Expand All @@ -185,9 +195,10 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float
// Copy the data for this message into a new FastBufferReader that owns that memory.
// We can't guarantee the memory in the ArraySegment stays valid because we don't own it,
// so we must move it to memory we do own.
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, messageHeader.MessageSize)
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, (int)messageHeader.MessageSize),
MessageHeaderSerializedSize = receivedHeaderSize,
});
batchReader.Seek(batchReader.Position + messageHeader.MessageSize);
batchReader.Seek(batchReader.Position + (int)messageHeader.MessageSize);
}
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
Expand All @@ -210,7 +221,7 @@ private bool CanReceive(ulong clientId, Type messageType)
return true;
}

public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp)
public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp, int serializedHeaderSize)
{
if (header.MessageType >= m_HighMessageType)
{
Expand All @@ -223,8 +234,10 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon
SystemOwner = m_Owner,
SenderId = senderId,
Timestamp = timestamp,
Header = header
Header = header,
SerializedHeaderSize = serializedHeaderSize,
};

var type = m_ReverseTypeMap[header.MessageType];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the moving of this code related to the original fix?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no... one iteration of the fix did require this code to be moved... when I backtracked on that direction I realized the code still needed to be moved because as it is, it's dispatching profiling/metrics events twice for some messages. I can move it into a separate PR...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes lets move that into a different PR

if (!CanReceive(senderId, type))
{
Expand All @@ -236,6 +249,7 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon
{
m_Hooks[hookIdx].OnBeforeReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
}

var handler = m_MessageHandlers[header.MessageType];
using (reader)
{
Expand Down Expand Up @@ -265,7 +279,7 @@ internal unsafe void ProcessIncomingMessageQueue()
{
// Avoid copies...
ref var item = ref m_IncomingMessageQueue.GetUnsafeList()->ElementAt(index);
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp);
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp, item.MessageHeaderSerializedSize);
if (m_Disposed)
{
return;
Expand Down Expand Up @@ -328,64 +342,68 @@ internal unsafe int SendMessage<TMessageType, TClientIdListType>(in TMessageType
}

var maxSize = delivery == NetworkDelivery.ReliableFragmentedSequenced ? FRAGMENTED_MESSAGE_MAX_SIZE : NON_FRAGMENTED_MESSAGE_MAX_SIZE;
var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>());
using (tmpSerializer)

using var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>());

message.Serialize(tmpSerializer);

using var headerSerializer = new FastBufferWriter(FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp);

var header = new MessageHeader
{
message.Serialize(tmpSerializer);
MessageSize = (ushort)tmpSerializer.Length,
MessageType = m_MessageTypes[typeof(TMessageType)],
};
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageType);
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageSize);

for (var i = 0; i < clientIds.Count; ++i)
{
var clientId = clientIds[i];
for (var i = 0; i < clientIds.Count; ++i)
{
var clientId = clientIds[i];

if (!CanSend(clientId, typeof(TMessageType), delivery))
{
continue;
}
if (!CanSend(clientId, typeof(TMessageType), delivery))
{
continue;
}

for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery);
}
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery);
}

var sendQueueItem = m_SendQueues[clientId];
if (sendQueueItem.Length == 0)
var sendQueueItem = m_SendQueues[clientId];
if (sendQueueItem.Length == 0)
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader));
}
else
{
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
if (lastQueueItem.NetworkDelivery != delivery ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position
< tmpSerializer.Length + headerSerializer.Length)
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader));
}
else
{
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
if (lastQueueItem.NetworkDelivery != delivery ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position
< tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>())
{
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob,
maxSize));
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader));
}
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader));
}
}

ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
var header = new MessageHeader
{
MessageSize = (ushort)tmpSerializer.Length,
MessageType = m_MessageTypes[typeof(TMessageType)],
};
ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1);
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + headerSerializer.Length);

writeQueueItem.Writer.WriteValue(header);
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
writeQueueItem.BatchHeader.BatchSize++;
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>());
}
writeQueueItem.Writer.WriteBytes(headerSerializer.GetUnsafePtr(), headerSerializer.Length);
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length);
writeQueueItem.BatchHeader.BatchSize++;
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx)
{
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + headerSerializer.Length);
}

return tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>();
}

return tmpSerializer.Length + headerSerializer.Length;
}

private struct PointerListWrapper<T> : IReadOnlyList<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ internal ref struct NetworkContext
/// The header data that was sent with the message
/// </summary>
public MessageHeader Header;

/// <summary>
/// The actual serialized size of the header when packed into the buffer
/// </summary>
public int SerializedHeaderSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private struct TriggerData
public MessageHeader Header;
public ulong SenderId;
public float Timestamp;
public int SerializedHeaderSize;
}
private struct TriggerInfo
{
Expand Down Expand Up @@ -117,7 +118,8 @@ internal unsafe void TriggerOnSpawn(ulong networkObjectId, FastBufferReader read
Reader = new FastBufferReader(reader.GetUnsafePtr(), Allocator.Persistent, reader.Length),
Header = context.Header,
Timestamp = context.Timestamp,
SenderId = context.SenderId
SenderId = context.SenderId,
SerializedHeaderSize = context.SerializedHeaderSize
});
}

Expand Down Expand Up @@ -501,7 +503,7 @@ private void SpawnNetworkObjectLocallyCommon(NetworkObject networkObject, ulong
foreach (var trigger in triggerInfo.TriggerData)
{
// Reader will be disposed within HandleMessage
NetworkManager.MessagingSystem.HandleMessage(trigger.Header, trigger.Reader, trigger.SenderId, trigger.Timestamp);
NetworkManager.MessagingSystem.HandleMessage(trigger.Header, trigger.Reader, trigger.SenderId, trigger.Timestamp, trigger.SerializedHeaderSize);
}

triggerInfo.TriggerData.Dispose();
Expand Down
Loading