Skip to content

Commit 58becb5

Browse files
feat: Dynamically size the UnityTransport send queues [MTT-2816] (#2212)
* Make BatchedSendQueue dynamically-sized * Remove MaxSendQueueSize as a serialized field * Add CHANGELOG entries * Add PR number to CHANGELOG entries * Add extra tests for BatchedSendQueue * Address some PR comments * Keep the standards checks happy * Address a review comment + some cleanup
1 parent 9185677 commit 58becb5

File tree

6 files changed

+216
-63
lines changed

6 files changed

+216
-63
lines changed

com.unity.netcode.gameobjects/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Additional documentation and release notes are available at [Multiplayer Documen
1515

1616
### Changed
1717

18+
- The send queues of `UnityTransport` are now dynamically-sized. This means that there shouldn't be any need anymore to tweak the 'Max Send Queue Size' value. In fact, this field is now removed from the inspector and will not be serialized anymore. It is still possible to set it manually using the `MaxSendQueueSize` property, but it is not recommended to do so aside from some specific needs (e.g. limiting the amount of memory used by the send queues in very constrained environments). (#2212)
19+
- As a consequence of the above change, the `UnityTransport.InitialMaxSendQueueSize` field is now deprecated. There is no default value anymore since send queues are dynamically-sized. (#2212)
1820
- The debug simulator in `UnityTransport` is now non-deterministic. Its random number generator used to be seeded with a constant value, leading to the same pattern of packet drops, delays, and jitter in every run. (#2196)
1921

2022
### Fixed

com.unity.netcode.gameobjects/Runtime/Transports/UTP/BatchedSendQueue.cs

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,30 @@ namespace Unity.Netcode.Transports.UTP
88
/// <summary>Queue for batched messages meant to be sent through UTP.</summary>
99
/// <remarks>
1010
/// Messages should be pushed on the queue with <see cref="PushMessage"/>. To send batched
11-
/// messages, call <see cref="FillWriter"> with the <see cref="DataStreamWriter"/> obtained from
12-
/// <see cref="NetworkDriver.BeginSend"/>. This will fill the writer with as many messages as
13-
/// possible. If the send is successful, call <see cref="Consume"/> to remove the data from the
14-
/// queue.
11+
/// messages, call <see cref="FillWriterWithMessages"/> or <see cref="FillWriterWithBytes"/>
12+
/// with the <see cref="DataStreamWriter"/> obtained from <see cref="NetworkDriver.BeginSend"/>.
13+
/// This will fill the writer with as many messages/bytes as possible. If the send is
14+
/// successful, call <see cref="Consume"/> to remove the data from the queue.
1515
///
1616
/// This is meant as a companion to <see cref="BatchedReceiveQueue"/>, which should be used to
1717
/// read messages sent with this queue.
1818
/// </remarks>
1919
internal struct BatchedSendQueue : IDisposable
2020
{
21-
private NativeArray<byte> m_Data;
21+
// Note that we're using NativeList basically like a growable NativeArray, where the length
22+
// of the list is the capacity of our array. (We can't use the capacity of the list as our
23+
// queue capacity because NativeList may elect to set it higher than what we'd set it to
24+
// with SetCapacity, which breaks the logic of our code.)
25+
private NativeList<byte> m_Data;
2226
private NativeArray<int> m_HeadTailIndices;
27+
private int m_MaximumCapacity;
28+
private int m_MinimumCapacity;
2329

2430
/// <summary>Overhead that is added to each message in the queue.</summary>
2531
public const int PerMessageOverhead = sizeof(int);
2632

33+
internal const int MinimumMinimumCapacity = 4096;
34+
2735
// Indices into m_HeadTailIndicies.
2836
private const int k_HeadInternalIndex = 0;
2937
private const int k_TailInternalIndex = 1;
@@ -43,18 +51,33 @@ private int TailIndex
4351
}
4452

4553
public int Length => TailIndex - HeadIndex;
46-
54+
public int Capacity => m_Data.Length;
4755
public bool IsEmpty => HeadIndex == TailIndex;
48-
4956
public bool IsCreated => m_Data.IsCreated;
5057

5158
/// <summary>Construct a new empty send queue.</summary>
5259
/// <param name="capacity">Maximum capacity of the send queue.</param>
5360
public BatchedSendQueue(int capacity)
5461
{
55-
m_Data = new NativeArray<byte>(capacity, Allocator.Persistent);
62+
// Make sure the maximum capacity will be even.
63+
m_MaximumCapacity = capacity + (capacity & 1);
64+
65+
// We pick the minimum capacity such that if we keep doubling it, we'll eventually hit
66+
// the maximum capacity exactly. The alternative would be to use capacities that are
67+
// powers of 2, but this can lead to over-allocating quite a bit of memory (especially
68+
// since we expect maximum capacities to be in the megabytes range). The approach taken
69+
// here avoids this issue, at the cost of not having allocations of nice round sizes.
70+
m_MinimumCapacity = m_MaximumCapacity;
71+
while (m_MinimumCapacity / 2 >= MinimumMinimumCapacity)
72+
{
73+
m_MinimumCapacity /= 2;
74+
}
75+
76+
m_Data = new NativeList<byte>(m_MinimumCapacity, Allocator.Persistent);
5677
m_HeadTailIndices = new NativeArray<int>(2, Allocator.Persistent);
5778

79+
m_Data.ResizeUninitialized(m_MinimumCapacity);
80+
5881
HeadIndex = 0;
5982
TailIndex = 0;
6083
}
@@ -68,22 +91,28 @@ public void Dispose()
6891
}
6992
}
7093

94+
/// <summary>Write a raw buffer to a DataStreamWriter.</summary>
95+
private unsafe void WriteBytes(ref DataStreamWriter writer, byte* data, int length)
96+
{
97+
#if UTP_TRANSPORT_2_0_ABOVE
98+
writer.WriteBytesUnsafe(data, length);
99+
#else
100+
writer.WriteBytes(data, length);
101+
#endif
102+
}
103+
71104
/// <summary>Append data at the tail of the queue. No safety checks.</summary>
72105
private void AppendDataAtTail(ArraySegment<byte> data)
73106
{
74107
unsafe
75108
{
76-
var writer = new DataStreamWriter((byte*)m_Data.GetUnsafePtr() + TailIndex, m_Data.Length - TailIndex);
109+
var writer = new DataStreamWriter((byte*)m_Data.GetUnsafePtr() + TailIndex, Capacity - TailIndex);
77110

78111
writer.WriteInt(data.Count);
79112

80113
fixed (byte* dataPtr = data.Array)
81114
{
82-
#if UTP_TRANSPORT_2_0_ABOVE
83-
writer.WriteBytesUnsafe(dataPtr + data.Offset, data.Count);
84-
#else
85-
writer.WriteBytes(dataPtr + data.Offset, data.Count);
86-
#endif
115+
WriteBytes(ref writer, dataPtr + data.Offset, data.Count);
87116
}
88117
}
89118

@@ -104,29 +133,55 @@ public bool PushMessage(ArraySegment<byte> message)
104133
}
105134

106135
// Check if there's enough room after the current tail index.
107-
if (m_Data.Length - TailIndex >= sizeof(int) + message.Count)
136+
if (Capacity - TailIndex >= sizeof(int) + message.Count)
108137
{
109138
AppendDataAtTail(message);
110139
return true;
111140
}
112141

113-
// Check if there would be enough room if we moved data at the beginning of m_Data.
114-
if (m_Data.Length - TailIndex + HeadIndex >= sizeof(int) + message.Count)
142+
// Move the data at the beginning of of m_Data. Either it will leave enough space for
143+
// the message, or we'll grow m_Data and will want the data at the beginning anyway.
144+
if (HeadIndex > 0 && Length > 0)
115145
{
116-
// Move the data back at the beginning of m_Data.
117146
unsafe
118147
{
119148
UnsafeUtility.MemMove(m_Data.GetUnsafePtr(), (byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
120149
}
121150

122151
TailIndex = Length;
123152
HeadIndex = 0;
153+
}
124154

155+
// If there's enough space left at the end for the message, now is a good time to trim
156+
// the capacity of m_Data if it got very large. We define "very large" here as having
157+
// more than 75% of m_Data unused after adding the new message.
158+
if (Capacity - TailIndex >= sizeof(int) + message.Count)
159+
{
125160
AppendDataAtTail(message);
161+
162+
while (TailIndex < Capacity / 4 && Capacity > m_MinimumCapacity)
163+
{
164+
m_Data.ResizeUninitialized(Capacity / 2);
165+
}
166+
126167
return true;
127168
}
128169

129-
return false;
170+
// If we get here we need to grow m_Data until the data fits (or it's too large).
171+
while (Capacity - TailIndex < sizeof(int) + message.Count)
172+
{
173+
// Can't grow m_Data anymore. Message simply won't fit.
174+
if (Capacity * 2 > m_MaximumCapacity)
175+
{
176+
return false;
177+
}
178+
179+
m_Data.ResizeUninitialized(Capacity * 2);
180+
}
181+
182+
// If we get here we know there's now enough room for the message.
183+
AppendDataAtTail(message);
184+
return true;
130185
}
131186

132187
/// <summary>
@@ -153,11 +208,13 @@ public int FillWriterWithMessages(ref DataStreamWriter writer)
153208

154209
unsafe
155210
{
211+
var dataPtr = (byte*)m_Data.GetUnsafePtr() + HeadIndex;
212+
156213
#if UTP_TRANSPORT_2_0_ABOVE
157-
var slice = m_Data.GetSubArray(HeadIndex, Length);
214+
var slice = NativeArray.ConvertExistingDataToNativeArray<byte>(dataPtr, Length, Allocator.None);
158215
var reader = new DataStreamReader(slice);
159216
#else
160-
var reader = new DataStreamReader((byte*)m_Data.GetUnsafePtr() + HeadIndex, Length);
217+
var reader = new DataStreamReader(dataPtr, Length);
161218
#endif
162219

163220
var writerAvailable = writer.Capacity;
@@ -177,11 +234,7 @@ public int FillWriterWithMessages(ref DataStreamWriter writer)
177234
writer.WriteInt(messageLength);
178235

179236
var messageOffset = HeadIndex + reader.GetBytesRead();
180-
#if UTP_TRANSPORT_2_0_ABOVE
181-
writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
182-
#else
183-
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
184-
#endif
237+
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + messageOffset, messageLength);
185238

186239
writerAvailable -= sizeof(int) + messageLength;
187240
readerOffset += sizeof(int) + messageLength;
@@ -218,11 +271,7 @@ public int FillWriterWithBytes(ref DataStreamWriter writer)
218271

219272
unsafe
220273
{
221-
#if UTP_TRANSPORT_2_0_ABOVE
222-
writer.WriteBytesUnsafe((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
223-
#else
224-
writer.WriteBytes((byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
225-
#endif
274+
WriteBytes(ref writer, (byte*)m_Data.GetUnsafePtr() + HeadIndex, copyLength);
226275
}
227276

228277
return copyLength;
@@ -236,10 +285,14 @@ public int FillWriterWithBytes(ref DataStreamWriter writer)
236285
/// <param name="size">Number of bytes to consume from the queue.</param>
237286
public void Consume(int size)
238287
{
288+
// Adjust the head/tail indices such that we consume the given size.
239289
if (size >= Length)
240290
{
241291
HeadIndex = 0;
242292
TailIndex = 0;
293+
294+
// This is a no-op if m_Data is already at minimum capacity.
295+
m_Data.ResizeUninitialized(m_MinimumCapacity);
243296
}
244297
else
245298
{

com.unity.netcode.gameobjects/Runtime/Transports/UTP/UnityTransport.cs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,13 @@ private enum State
137137
/// <summary>
138138
/// The default maximum send queue size
139139
/// </summary>
140+
[Obsolete("MaxSendQueueSize is now determined dynamically (can still be set programmatically using the MaxSendQueueSize property). This initial value is not used anymore.", false)]
140141
public const int InitialMaxSendQueueSize = 16 * InitialMaxPayloadSize;
141142

143+
// Maximum reliable throughput, assuming the full reliable window can be sent on every
144+
// frame at 60 FPS. This will be a large over-estimation in any realistic scenario.
145+
private const int k_MaxReliableThroughput = (NetworkParameterConstants.MTU * 32 * 60) / 1000; // bytes per millisecond
146+
142147
private static ConnectionAddressData s_DefaultConnectionAddressData = new ConnectionAddressData { Address = "127.0.0.1", Port = 7777, ServerListenAddress = string.Empty };
143148

144149
#pragma warning disable IDE1006 // Naming Styles
@@ -193,15 +198,17 @@ public int MaxPayloadSize
193198
set => m_MaxPayloadSize = value;
194199
}
195200

196-
[Tooltip("The maximum size in bytes of the transport send queue. The send queue accumulates messages for batching and stores messages when other internal send queues are full. If you routinely observe an error about too many in-flight packets, try increasing this.")]
197-
[SerializeField]
198-
private int m_MaxSendQueueSize = InitialMaxSendQueueSize;
201+
private int m_MaxSendQueueSize = 0;
199202

200203
/// <summary>The maximum size in bytes of the transport send queue.</summary>
201204
/// <remarks>
202205
/// The send queue accumulates messages for batching and stores messages when other internal
203-
/// send queues are full. If you routinely observe an error about too many in-flight packets,
204-
/// try increasing this.
206+
/// send queues are full. Note that there should not be any need to set this value manually
207+
/// since the send queue size is dynamically sized based on need.
208+
///
209+
/// This value should only be set if you have particular requirements (e.g. if you want to
210+
/// limit the memory usage of the send queues). Note however that setting this value too low
211+
/// can easily lead to disconnections under heavy traffic.
205212
/// </remarks>
206213
public int MaxSendQueueSize
207214
{
@@ -551,11 +558,6 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData)
551558
}
552559
}
553560

554-
internal void SetMaxPayloadSize(int maxPayloadSize)
555-
{
556-
m_MaxPayloadSize = maxPayloadSize;
557-
}
558-
559561
private void SetProtocol(ProtocolType inProtocol)
560562
{
561563
m_ProtocolType = inProtocol;
@@ -1211,7 +1213,23 @@ public override void Send(ulong clientId, ArraySegment<byte> payload, NetworkDel
12111213
var sendTarget = new SendTarget(clientId, pipeline);
12121214
if (!m_SendQueue.TryGetValue(sendTarget, out var queue))
12131215
{
1214-
queue = new BatchedSendQueue(Math.Max(m_MaxSendQueueSize, m_MaxPayloadSize));
1216+
// The maximum size of a send queue is determined according to the disconnection
1217+
// timeout. The idea being that if the send queue contains enough reliable data that
1218+
// sending it all out would take longer than the disconnection timeout, then there's
1219+
// no point storing even more in the queue (it would be like having a ping higher
1220+
// than the disconnection timeout, which is far into the realm of unplayability).
1221+
//
1222+
// The throughput used to determine what consists the maximum send queue size is
1223+
// the maximum theoritical throughput of the reliable pipeline assuming we only send
1224+
// on each update at 60 FPS, which turns out to be around 2.688 MB/s.
1225+
//
1226+
// Note that we only care about reliable throughput for send queues because that's
1227+
// the only case where a full send queue causes a connection loss. Full unreliable
1228+
// send queues are dealt with by flushing it out to the network or simply dropping
1229+
// new messages if that fails.
1230+
var maxCapacity = m_MaxSendQueueSize > 0 ? m_MaxSendQueueSize : m_DisconnectTimeoutMS * k_MaxReliableThroughput;
1231+
1232+
queue = new BatchedSendQueue(Math.Max(maxCapacity, m_MaxPayloadSize));
12151233
m_SendQueue.Add(sendTarget, queue);
12161234
}
12171235

@@ -1225,8 +1243,7 @@ public override void Send(ulong clientId, ArraySegment<byte> payload, NetworkDel
12251243

12261244
var ngoClientId = NetworkManager?.TransportIdToClientId(clientId) ?? clientId;
12271245
Debug.LogError($"Couldn't add payload of size {payload.Count} to reliable send queue. " +
1228-
$"Closing connection {ngoClientId} as reliability guarantees can't be maintained. " +
1229-
$"Perhaps 'Max Send Queue Size' ({m_MaxSendQueueSize}) is too small for workload.");
1246+
$"Closing connection {ngoClientId} as reliability guarantees can't be maintained.");
12301247

12311248
if (clientId == m_ServerClientId)
12321249
{

0 commit comments

Comments
 (0)