-
Notifications
You must be signed in to change notification settings - Fork 450
fix: increases maximum message size to int.MaxValue #1384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fe00274
c6e50da
cda7ed4
a417835
824cbc2
89b15bc
b6d1f3f
0a139dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ private struct ReceiveQueueItem | |
public MessageHeader Header; | ||
public ulong SenderId; | ||
public float Timestamp; | ||
public int MessageHeaderSerializedSize; | ||
} | ||
|
||
private struct SendQueueItem | ||
|
@@ -46,27 +47,27 @@ public SendQueueItem(NetworkDelivery delivery, int writerSize, Allocator writerA | |
private MessageHandler[] m_MessageHandlers = new MessageHandler[255]; | ||
private Type[] m_ReverseTypeMap = new Type[255]; | ||
|
||
private Dictionary<Type, byte> m_MessageTypes = new Dictionary<Type, byte>(); | ||
private Dictionary<Type, uint> m_MessageTypes = new Dictionary<Type, uint>(); | ||
private Dictionary<ulong, NativeList<SendQueueItem>> m_SendQueues = new Dictionary<ulong, NativeList<SendQueueItem>>(); | ||
|
||
private List<INetworkHooks> m_Hooks = new List<INetworkHooks>(); | ||
|
||
private byte m_HighMessageType; | ||
private uint m_HighMessageType; | ||
private object m_Owner; | ||
private IMessageSender m_MessageSender; | ||
private bool m_Disposed; | ||
|
||
internal Type[] MessageTypes => m_ReverseTypeMap; | ||
internal MessageHandler[] MessageHandlers => m_MessageHandlers; | ||
internal int MessageHandlerCount => m_HighMessageType; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above not clear why these changes are needed. |
||
internal uint MessageHandlerCount => m_HighMessageType; | ||
|
||
internal byte GetMessageType(Type t) | ||
internal uint GetMessageType(Type t) | ||
{ | ||
return m_MessageTypes[t]; | ||
} | ||
|
||
public const int NON_FRAGMENTED_MESSAGE_MAX_SIZE = 1300; | ||
public const int FRAGMENTED_MESSAGE_MAX_SIZE = 64000; | ||
public const int FRAGMENTED_MESSAGE_MAX_SIZE = int.MaxValue; | ||
|
||
internal struct MessageWithHandler | ||
{ | ||
|
@@ -165,14 +166,23 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float | |
|
||
for (var messageIdx = 0; messageIdx < batchHeader.BatchSize; ++messageIdx) | ||
{ | ||
if (!batchReader.TryBeginRead(sizeof(MessageHeader))) | ||
|
||
var messageHeader = new MessageHeader(); | ||
var position = batchReader.Position; | ||
try | ||
{ | ||
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageType); | ||
ByteUnpacker.ReadValueBitPacked(batchReader, out messageHeader.MessageSize); | ||
} | ||
catch (OverflowException) | ||
{ | ||
NetworkLog.LogWarning("Received a batch that didn't have enough data for all of its batches, ending early!"); | ||
return; | ||
throw; | ||
} | ||
batchReader.ReadValue(out MessageHeader messageHeader); | ||
|
||
if (!batchReader.TryBeginRead(messageHeader.MessageSize)) | ||
var receivedHeaderSize = batchReader.Position - position; | ||
|
||
if (!batchReader.TryBeginRead((int)messageHeader.MessageSize)) | ||
{ | ||
NetworkLog.LogWarning("Received a message that claimed a size larger than the packet, ending early!"); | ||
return; | ||
|
@@ -185,9 +195,10 @@ internal void HandleIncomingData(ulong clientId, ArraySegment<byte> data, float | |
// Copy the data for this message into a new FastBufferReader that owns that memory. | ||
// We can't guarantee the memory in the ArraySegment stays valid because we don't own it, | ||
// so we must move it to memory we do own. | ||
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, messageHeader.MessageSize) | ||
Reader = new FastBufferReader(batchReader.GetUnsafePtrAtCurrentPosition(), Allocator.TempJob, (int)messageHeader.MessageSize), | ||
MessageHeaderSerializedSize = receivedHeaderSize, | ||
}); | ||
batchReader.Seek(batchReader.Position + messageHeader.MessageSize); | ||
batchReader.Seek(batchReader.Position + (int)messageHeader.MessageSize); | ||
} | ||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx) | ||
{ | ||
|
@@ -210,7 +221,7 @@ private bool CanReceive(ulong clientId, Type messageType) | |
return true; | ||
} | ||
|
||
public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp) | ||
public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulong senderId, float timestamp, int serializedHeaderSize) | ||
{ | ||
if (header.MessageType >= m_HighMessageType) | ||
{ | ||
|
@@ -223,8 +234,10 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon | |
SystemOwner = m_Owner, | ||
SenderId = senderId, | ||
Timestamp = timestamp, | ||
Header = header | ||
Header = header, | ||
SerializedHeaderSize = serializedHeaderSize, | ||
}; | ||
|
||
var type = m_ReverseTypeMap[header.MessageType]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the moving of this code related to the original fix? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes and no... one iteration of the fix did require this code to be moved... when I backtracked on that direction I realized the code still needed to be moved because as it is, it's dispatching profiling/metrics events twice for some messages. I can move it into a separate PR... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes lets move that into a different PR |
||
if (!CanReceive(senderId, type)) | ||
{ | ||
|
@@ -236,6 +249,7 @@ public void HandleMessage(in MessageHeader header, FastBufferReader reader, ulon | |
{ | ||
m_Hooks[hookIdx].OnBeforeReceiveMessage(senderId, type, reader.Length + FastBufferWriter.GetWriteSize<MessageHeader>()); | ||
} | ||
|
||
var handler = m_MessageHandlers[header.MessageType]; | ||
using (reader) | ||
{ | ||
|
@@ -265,7 +279,7 @@ internal unsafe void ProcessIncomingMessageQueue() | |
{ | ||
// Avoid copies... | ||
ref var item = ref m_IncomingMessageQueue.GetUnsafeList()->ElementAt(index); | ||
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp); | ||
HandleMessage(item.Header, item.Reader, item.SenderId, item.Timestamp, item.MessageHeaderSerializedSize); | ||
if (m_Disposed) | ||
{ | ||
return; | ||
|
@@ -328,64 +342,68 @@ internal unsafe int SendMessage<TMessageType, TClientIdListType>(in TMessageType | |
} | ||
|
||
var maxSize = delivery == NetworkDelivery.ReliableFragmentedSequenced ? FRAGMENTED_MESSAGE_MAX_SIZE : NON_FRAGMENTED_MESSAGE_MAX_SIZE; | ||
var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>()); | ||
using (tmpSerializer) | ||
|
||
using var tmpSerializer = new FastBufferWriter(NON_FRAGMENTED_MESSAGE_MAX_SIZE - FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp, maxSize - FastBufferWriter.GetWriteSize<MessageHeader>()); | ||
|
||
message.Serialize(tmpSerializer); | ||
|
||
using var headerSerializer = new FastBufferWriter(FastBufferWriter.GetWriteSize<MessageHeader>(), Allocator.Temp); | ||
|
||
var header = new MessageHeader | ||
{ | ||
message.Serialize(tmpSerializer); | ||
MessageSize = (ushort)tmpSerializer.Length, | ||
MessageType = m_MessageTypes[typeof(TMessageType)], | ||
}; | ||
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageType); | ||
BytePacker.WriteValueBitPacked(headerSerializer, header.MessageSize); | ||
|
||
for (var i = 0; i < clientIds.Count; ++i) | ||
{ | ||
var clientId = clientIds[i]; | ||
for (var i = 0; i < clientIds.Count; ++i) | ||
{ | ||
var clientId = clientIds[i]; | ||
|
||
if (!CanSend(clientId, typeof(TMessageType), delivery)) | ||
{ | ||
continue; | ||
} | ||
if (!CanSend(clientId, typeof(TMessageType), delivery)) | ||
{ | ||
continue; | ||
} | ||
|
||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx) | ||
{ | ||
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery); | ||
} | ||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx) | ||
{ | ||
m_Hooks[hookIdx].OnBeforeSendMessage(clientId, typeof(TMessageType), delivery); | ||
} | ||
|
||
var sendQueueItem = m_SendQueues[clientId]; | ||
if (sendQueueItem.Length == 0) | ||
var sendQueueItem = m_SendQueues[clientId]; | ||
if (sendQueueItem.Length == 0) | ||
{ | ||
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob, | ||
maxSize)); | ||
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader)); | ||
} | ||
else | ||
{ | ||
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1); | ||
if (lastQueueItem.NetworkDelivery != delivery || | ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position | ||
< tmpSerializer.Length + headerSerializer.Length) | ||
{ | ||
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob, | ||
maxSize)); | ||
sendQueueItem.GetUnsafeList()->ElementAt(0).Writer.Seek(sizeof(BatchHeader)); | ||
} | ||
else | ||
{ | ||
ref var lastQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1); | ||
if (lastQueueItem.NetworkDelivery != delivery || | ||
lastQueueItem.Writer.MaxCapacity - lastQueueItem.Writer.Position | ||
< tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>()) | ||
{ | ||
sendQueueItem.Add(new SendQueueItem(delivery, NON_FRAGMENTED_MESSAGE_MAX_SIZE, Allocator.TempJob, | ||
maxSize)); | ||
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader)); | ||
} | ||
sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1).Writer.Seek(sizeof(BatchHeader)); | ||
} | ||
} | ||
|
||
ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1); | ||
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>()); | ||
var header = new MessageHeader | ||
{ | ||
MessageSize = (ushort)tmpSerializer.Length, | ||
MessageType = m_MessageTypes[typeof(TMessageType)], | ||
}; | ||
ref var writeQueueItem = ref sendQueueItem.GetUnsafeList()->ElementAt(sendQueueItem.Length - 1); | ||
writeQueueItem.Writer.TryBeginWrite(tmpSerializer.Length + headerSerializer.Length); | ||
|
||
writeQueueItem.Writer.WriteValue(header); | ||
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length); | ||
writeQueueItem.BatchHeader.BatchSize++; | ||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx) | ||
{ | ||
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>()); | ||
} | ||
writeQueueItem.Writer.WriteBytes(headerSerializer.GetUnsafePtr(), headerSerializer.Length); | ||
writeQueueItem.Writer.WriteBytes(tmpSerializer.GetUnsafePtr(), tmpSerializer.Length); | ||
writeQueueItem.BatchHeader.BatchSize++; | ||
for (var hookIdx = 0; hookIdx < m_Hooks.Count; ++hookIdx) | ||
{ | ||
m_Hooks[hookIdx].OnAfterSendMessage(clientId, typeof(TMessageType), delivery, tmpSerializer.Length + headerSerializer.Length); | ||
} | ||
|
||
return tmpSerializer.Length + FastBufferWriter.GetWriteSize<MessageHeader>(); | ||
} | ||
|
||
return tmpSerializer.Length + headerSerializer.Length; | ||
} | ||
|
||
private struct PointerListWrapper<T> : IReadOnlyList<T> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a required change for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not required, no, but I thought it made sense. Making the other one larger meant it made sense to start packing the header. If the header's packed, then there's no reason to limit the number of messages to the byte range. It still ends up serialized as a byte due to packing.