Skip to content

fix: Lift ~44KB limit for reliable payloads in the adapter #1596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions com.unity.netcode.adapter.utp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ All notable changes to this package will be documented in this file. The format

### Fixed

- Lifted the limit of ~44KB for reliable payloads. Before the fix, attempting to send a payload larger than that with reliable delivery would silently fail. Note that it is still not recommended to send such large reliable payloads, since their delivery could take a few network round-trips. (#1596)

## [1.0.0-pre.4] - 2022-01-04

### Added
Expand Down
66 changes: 53 additions & 13 deletions com.unity.netcode.adapter.utp/Runtime/BatchedReceiveQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ namespace Unity.Netcode.UTP.Utilities
internal class BatchedReceiveQueue
{
private byte[] m_Data;
private DataStreamReader m_Reader;
private int m_ReaderOffset;
private int m_Offset;
private int m_Length;

public bool IsEmpty => m_ReaderOffset >= m_Reader.Length;
public bool IsEmpty => m_Length <= 0;

/// <summary>
/// Construct a new receive queue from a <see cref="DataStreamReader"/> returned by
Expand All @@ -29,26 +29,66 @@ public BatchedReceiveQueue(DataStreamReader reader)
}
}

m_Reader = reader;
m_ReaderOffset = 0;
m_Offset = 0;
m_Length = reader.Length;
}

/// <summary>Pop the next message in the queue.</summary>
/// <returns>The message, or the default value if no more messages.</returns>
/// <summary>
/// Push the entire data from a <see cref="DataStreamReader"/> (as returned by popping an
/// event from a <see cref="NetworkDriver">) to the queue.
/// </summary>
/// <param name="reader">The <see cref="DataStreamReader"/> to push the data of.</param>
public void PushReader(DataStreamReader reader)
{
// Resize the array and copy the existing data to the beginning if there's not enough
// room to copy the reader's data at the end of the existing data.
var available = m_Data.Length - (m_Offset + m_Length);
if (available < reader.Length)
{
if (m_Length > 0)
{
Array.Copy(m_Data, m_Offset, m_Data, 0, m_Length);
}

m_Offset = 0;

while (m_Data.Length - m_Length < reader.Length)
{
Array.Resize(ref m_Data, m_Data.Length * 2);
}
}

unsafe
{
fixed (byte* dataPtr = m_Data)
{
reader.ReadBytes(dataPtr + m_Offset + m_Length, reader.Length);
}
}

m_Length += reader.Length;
}

/// <summary>Pop the next full message in the queue.</summary>
/// <returns>The message, or the default value if no more full messages.</returns>
public ArraySegment<byte> PopMessage()
{
if (IsEmpty)
if (m_Length < sizeof(int))
{
return default;
}

m_Reader.SeekSet(m_ReaderOffset);
var messageLength = BitConverter.ToInt32(m_Data, m_Offset);

if (m_Length - sizeof(int) < messageLength)
{
return default;
}

var messageLength = m_Reader.ReadInt();
m_ReaderOffset += sizeof(int);
var data = new ArraySegment<byte>(m_Data, m_Offset + sizeof(int), messageLength);

var data = new ArraySegment<byte>(m_Data, m_ReaderOffset, messageLength);
m_ReaderOffset += messageLength;
m_Offset += sizeof(int) + messageLength;
m_Length -= sizeof(int) + messageLength;

return data;
}
Expand Down
37 changes: 36 additions & 1 deletion com.unity.netcode.adapter.utp/Runtime/BatchedSendQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ public bool PushMessage(ArraySegment<byte> message)
/// does not reduce the length of the queue. Callers are expected to call
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
/// be safely removed from the queue (e.g. if it was sent successfully).
///
/// This method should not be used together with <see cref="FillWriterWithBytes"> since this
/// could lead to a corrupted queue.
/// </remarks>
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
/// <returns>How many bytes were written to the writer.</returns>
public int FillWriter(ref DataStreamWriter writer)
public int FillWriterWithMessages(ref DataStreamWriter writer)
{
if (!IsCreated || Length == 0)
{
Expand Down Expand Up @@ -176,6 +179,38 @@ public int FillWriter(ref DataStreamWriter writer)
}
}

/// <summary>
/// Fill the given <see cref="DataStreamWriter"/> with as many bytes from the queue as
/// possible, disregarding message boundaries.
/// </summary>
///<remarks>
/// This does NOT actually consume anything from the queue. That is, calling this method
/// does not reduce the length of the queue. Callers are expected to call
/// <see cref="Consume"/> with the value returned by this method afterwards if the data can
/// be safely removed from the queue (e.g. if it was sent successfully).
///
/// This method should not be used together with <see cref="FillWriterWithMessages"/> since
/// this could lead to reading messages from a corrupted queue.
/// </remarks>
/// <param name="writer">The <see cref="DataStreamWriter"/> to write to.</param>
/// <returns>How many bytes were written to the writer.</returns>
public int FillWriterWithBytes(ref DataStreamWriter writer)
{
if (!IsCreated || Length == 0)
{
return 0;
}

var copyLength = Math.Min(writer.Capacity, Length);

unsafe
{
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
}

return copyLength;
}

/// <summary>Consume a number of bytes from the head of the queue.</summary>
/// <remarks>
/// This should only be called with a size that matches the last value returned by
Expand Down
86 changes: 63 additions & 23 deletions com.unity.netcode.adapter.utp/Runtime/UnityTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void CreateDriver(
out NetworkDriver driver,
out NetworkPipeline unreliableFragmentedPipeline,
out NetworkPipeline unreliableSequencedFragmentedPipeline,
out NetworkPipeline reliableSequencedFragmentedPipeline);
out NetworkPipeline reliableSequencedPipeline);
}

public static class ErrorUtilities
Expand Down Expand Up @@ -160,7 +160,7 @@ public static implicit operator ConnectionAddressData(NetworkEndPoint d) =>

private NetworkPipeline m_UnreliableFragmentedPipeline;
private NetworkPipeline m_UnreliableSequencedFragmentedPipeline;
private NetworkPipeline m_ReliableSequencedFragmentedPipeline;
private NetworkPipeline m_ReliableSequencedPipeline;

public override ulong ServerClientId => m_ServerClientId;

Expand Down Expand Up @@ -210,14 +210,19 @@ public SimulatorUtility.Parameters ClientSimulatorParameters
/// </summary>
private readonly Dictionary<SendTarget, BatchedSendQueue> m_SendQueue = new Dictionary<SendTarget, BatchedSendQueue>();

// Since reliable messages may be spread out over multiple transport payloads, it's possible
// to receive only parts of a message in an update. We thus keep the reliable receive queues
// around to avoid losing partial messages.
private readonly Dictionary<ulong, BatchedReceiveQueue> m_ReliableReceiveQueues = new Dictionary<ulong, BatchedReceiveQueue>();

private void InitDriver()
{
DriverConstructor.CreateDriver(
this,
out m_Driver,
out m_UnreliableFragmentedPipeline,
out m_UnreliableSequencedFragmentedPipeline,
out m_ReliableSequencedFragmentedPipeline);
out m_ReliableSequencedPipeline);
}

private void DisposeDriver()
Expand All @@ -241,7 +246,7 @@ private NetworkPipeline SelectSendPipeline(NetworkDelivery delivery)
case NetworkDelivery.Reliable:
case NetworkDelivery.ReliableSequenced:
case NetworkDelivery.ReliableFragmentedSequenced:
return m_ReliableSequencedFragmentedPipeline;
return m_ReliableSequencedPipeline;

default:
Debug.LogError($"Unknown {nameof(NetworkDelivery)} value: {delivery}");
Expand Down Expand Up @@ -340,6 +345,11 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData)
}
}

internal void SetMaxPayloadSize(int maxPayloadSize)
{
m_MaxPayloadSize = maxPayloadSize;
}

private void SetProtocol(ProtocolType inProtocol)
{
m_ProtocolType = inProtocol;
Expand Down Expand Up @@ -439,7 +449,14 @@ private void SendBatchedMessages(SendTarget sendTarget, BatchedSendQueue queue)
return;
}

var written = queue.FillWriter(ref writer);
// We don't attempt to send entire payloads over the reliable pipeline. Instead we
// fragment it manually. This is safe and easy to do since the reliable pipeline
// basically implements a stream, so as long as we separate the different messages
// in the stream (the send queue does that automatically) we are sure they'll be
// reassembled properly at the other end. This allows us to lift the limit of ~44KB
// on reliable payloads (because of the reliable window size).
var written = pipeline == m_ReliableSequencedPipeline
? queue.FillWriterWithBytes(ref writer) : queue.FillWriterWithMessages(ref writer);

result = m_Driver.EndSend(writer);
if (result == written)
Expand Down Expand Up @@ -481,9 +498,42 @@ private bool AcceptConnection()

}

private void ReceiveMessages(ulong clientId, NetworkPipeline pipeline, DataStreamReader dataReader)
{
BatchedReceiveQueue queue;
if (pipeline == m_ReliableSequencedPipeline)
{
if (m_ReliableReceiveQueues.TryGetValue(clientId, out queue))
{
queue.PushReader(dataReader);
}
else
{
queue = new BatchedReceiveQueue(dataReader);
m_ReliableReceiveQueues[clientId] = queue;
}
}
else
{
queue = new BatchedReceiveQueue(dataReader);
}

while (!queue.IsEmpty)
{
var message = queue.PopMessage();
if (message == default)
{
// Only happens if there's only a partial message in the queue (rare).
break;
}

InvokeOnTransportEvent(NetcodeNetworkEvent.Data, clientId, message, Time.realtimeSinceStartup);
}
}

private bool ProcessEvent()
{
var eventType = m_Driver.PopEvent(out var networkConnection, out var reader);
var eventType = m_Driver.PopEvent(out var networkConnection, out var reader, out var pipeline);

switch (eventType)
{
Expand All @@ -510,6 +560,8 @@ private bool ProcessEvent()
}
}

m_ReliableReceiveQueues.Remove(ParseClientId(networkConnection));

InvokeOnTransportEvent(NetcodeNetworkEvent.Disconnect,
ParseClientId(networkConnection),
default(ArraySegment<byte>),
Expand All @@ -520,17 +572,7 @@ private bool ProcessEvent()
}
case TransportNetworkEvent.Type.Data:
{
var queue = new BatchedReceiveQueue(reader);

while (!queue.IsEmpty)
{
InvokeOnTransportEvent(NetcodeNetworkEvent.Data,
ParseClientId(networkConnection),
queue.PopMessage(),
Time.realtimeSinceStartup
);
}

ReceiveMessages(ParseClientId(networkConnection), pipeline, reader);
return true;
}
}
Expand Down Expand Up @@ -596,7 +638,7 @@ private void ExtractNetworkMetricsForClient(ulong transportClientId)
var networkConnection = ParseClientId(transportClientId);
ExtractNetworkMetricsFromPipeline(m_UnreliableFragmentedPipeline, networkConnection);
ExtractNetworkMetricsFromPipeline(m_UnreliableSequencedFragmentedPipeline, networkConnection);
ExtractNetworkMetricsFromPipeline(m_ReliableSequencedFragmentedPipeline, networkConnection);
ExtractNetworkMetricsFromPipeline(m_ReliableSequencedPipeline, networkConnection);
}

private void ExtractNetworkMetricsFromPipeline(NetworkPipeline pipeline, NetworkConnection networkConnection)
Expand Down Expand Up @@ -800,7 +842,7 @@ public override void Shutdown()
public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
out NetworkPipeline unreliableFragmentedPipeline,
out NetworkPipeline unreliableSequencedFragmentedPipeline,
out NetworkPipeline reliableSequencedFragmentedPipeline)
out NetworkPipeline reliableSequencedPipeline)
{
#if MULTIPLAYER_TOOLS
NetworkPipelineStageCollection.RegisterPipelineStage(new NetworkMetricsPipelineStage());
Expand Down Expand Up @@ -843,8 +885,7 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
,typeof(NetworkMetricsPipelineStage)
#endif
);
reliableSequencedFragmentedPipeline = driver.CreatePipeline(
typeof(FragmentationPipelineStage),
reliableSequencedPipeline = driver.CreatePipeline(
typeof(ReliableSequencedPipelineStage),
typeof(SimulatorPipelineStage),
typeof(SimulatorPipelineStageInSend)
Expand All @@ -870,8 +911,7 @@ public void CreateDriver(UnityTransport transport, out NetworkDriver driver,
,typeof(NetworkMetricsPipelineStage)
#endif
);
reliableSequencedFragmentedPipeline = driver.CreatePipeline(
typeof(FragmentationPipelineStage),
reliableSequencedPipeline = driver.CreatePipeline(
typeof(ReliableSequencedPipelineStage)
#if MULTIPLAYER_TOOLS
,typeof(NetworkMetricsPipelineStage)
Expand Down
Loading