Skip to content

Commit

Permalink
perf: Build batches directly instead of queuing serializations (Mirro…
Browse files Browse the repository at this point in the history
…rNetworking#3130)

* rename

* so far

* better

* fixx

* old

* error

* typo

* comment
  • Loading branch information
vis2k authored Apr 1, 2022
1 parent 1631402 commit 6c2559a
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 78 deletions.
104 changes: 67 additions & 37 deletions Assets/Mirror/Runtime/Batching/Batcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ public class Batcher
// TimeStamp header size for those who need it
public const int HeaderSize = sizeof(double);

// batched messages
// IMPORTANT: we queue the serialized messages!
// queueing NetworkMessage would box and allocate!
Queue<NetworkWriterPooled> messages = new Queue<NetworkWriterPooled>();
// full batches ready to be sent.
// DO NOT queue NetworkMessage, it would box.
// DO NOT queue each serialization separately.
// it would allocate too many writers.
// https://github.com/vis2k/Mirror/pull/3127
// => best to build batches on the fly.
Queue<NetworkWriterPooled> batches = new Queue<NetworkWriterPooled>();

// current batch in progress
NetworkWriterPooled batch;

public Batcher(int threshold)
{
Expand All @@ -45,53 +51,77 @@ public Batcher(int threshold)
// add a message for batching
// we allow any sized messages.
// caller needs to make sure they are within max packet size.
public void AddMessage(ArraySegment<byte> message)
public void AddMessage(ArraySegment<byte> message, double timeStamp)
{
// put into a (pooled) writer
// when appending to a batch in progress, check final size.
// if it expands beyond threshold, then we should finalize it first.
// => less than or exactly threshold is fine.
// GetBatch() will finalize it.
// => see unit tests.
if (batch != null &&
batch.Position + message.Count > threshold)
{
batches.Enqueue(batch);
batch = null;
}

// initialize a new batch if necessary
if (batch == null)
{
// borrow from pool. we return it in GetBatch.
batch = NetworkWriterPool.Get();

// write timestamp first.
// -> double precision for accuracy over long periods of time
// -> batches are per-frame, it doesn't matter which message's
// timestamp we use.
batch.WriteDouble(timeStamp);
}

// add serialization to current batch. even if > threshold.
// -> we do allow > threshold sized messages as single batch
// -> WriteBytes instead of WriteSegment because the latter
// would add a size header. we want to write directly.
// -> will be returned to pool when making the batch!
// IMPORTANT: NOT adding a size header / msg saves LOTS of bandwidth
NetworkWriterPooled writer = NetworkWriterPool.Get();
writer.WriteBytes(message.Array, message.Offset, message.Count);
messages.Enqueue(writer);
batch.WriteBytes(message.Array, message.Offset, message.Count);
}

// batch as many messages as possible into writer
// returns true if any batch was made.
public bool MakeNextBatch(NetworkWriter writer, double timeStamp)
// helper function to copy a batch to writer and return it to pool
static void CopyAndReturn(NetworkWriterPooled batch, NetworkWriter writer)
{
// if we have no messages then there's nothing to do
if (messages.Count == 0)
return false;

// make sure the writer is fresh to avoid uncertain situations
if (writer.Position != 0)
throw new ArgumentException($"MakeNextBatch needs a fresh writer!");
throw new ArgumentException($"GetBatch needs a fresh writer!");

// write timestamp first
// -> double precision for accuracy over long periods of time
writer.WriteDouble(timeStamp);
// copy to the target writer
ArraySegment<byte> segment = batch.ToArraySegment();
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);

// return batch to pool for reuse
NetworkWriterPool.Return(batch);
}

// do start no matter what
do
// get the next batch which is available for sending (if any).
// TODO safely get & return a batch instead of copying to writer?
// TODO could return pooled writer & use GetBatch in a 'using' statement!
public bool GetBatch(NetworkWriter writer)
{
// get first batch from queue (if any)
if (batches.TryDequeue(out NetworkWriterPooled first))
{
// add next message no matter what. even if > threshold.
// (we do allow > threshold sized messages as single batch)
NetworkWriterPooled message = messages.Dequeue();
ArraySegment<byte> segment = message.ToArraySegment();
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
CopyAndReturn(first, writer);
return true;
}

// return the writer to pool
NetworkWriterPool.Return(message);
// if queue was empty, we can send the batch in progress.
if (batch != null)
{
CopyAndReturn(batch, writer);
batch = null;
return true;
}
// keep going as long as we have more messages,
// AND the next one would fit into threshold.
while (messages.Count > 0 &&
writer.Position + messages.Peek().Position <= threshold);

// we had messages, so a batch was made
return true;
// nothing was written
return false;
}
}
}
8 changes: 4 additions & 4 deletions Assets/Mirror/Runtime/LocalConnectionToServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ internal override void Send(ArraySegment<byte> segment, int channelId = Channels
// OnTransportData assumes batching.
// so let's make a batch with proper timestamp prefix.
Batcher batcher = GetBatchForChannelId(channelId);
batcher.AddMessage(segment);
batcher.AddMessage(segment, NetworkTime.localTime);

// flush it to the server's OnTransportData immediately.
// local connection to server always invokes immediately.
using (NetworkWriterPooled writer = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
if (batcher.MakeNextBatch(writer, NetworkTime.localTime))
if (batcher.GetBatch(writer))
{
NetworkServer.OnTransportData(connectionId, writer.ToArraySegment(), channelId);
}
Expand Down Expand Up @@ -69,12 +69,12 @@ internal override void Update()
// OnTransportData assumes a proper batch with timestamp etc.
// let's make a proper batch and pass it to OnTransportData.
Batcher batcher = GetBatchForChannelId(Channels.Reliable);
batcher.AddMessage(message);
batcher.AddMessage(message, NetworkTime.localTime);

using (NetworkWriterPooled batchWriter = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
if (batcher.MakeNextBatch(batchWriter, NetworkTime.localTime))
if (batcher.GetBatch(batchWriter))
{
NetworkClient.OnTransportData(batchWriter.ToArraySegment(), Channels.Reliable);
}
Expand Down
4 changes: 2 additions & 2 deletions Assets/Mirror/Runtime/NetworkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ internal virtual void Send(ArraySegment<byte> segment, int channelId = Channels.
//
// NOTE: we do NOT ValidatePacketSize here yet. the final packet
// will be the full batch, including timestamp.
GetBatchForChannelId(channelId).AddMessage(segment);
GetBatchForChannelId(channelId).AddMessage(segment, NetworkTime.localTime);
}

// Send stage three: hand off to transport
Expand All @@ -183,7 +183,7 @@ internal virtual void Update()
using (NetworkWriterPooled writer = NetworkWriterPool.Get())
{
// make a batch with our local time (double precision)
while (batcher.MakeNextBatch(writer, NetworkTime.localTime))
while (batcher.GetBatch(writer))
{
// validate packet before handing the batch to the
// transport. this guarantees that we always stay
Expand Down
70 changes: 35 additions & 35 deletions Assets/Mirror/Tests/Editor/Batching/BatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@ public static byte[] ConcatTimestamp(double tickTimeStamp, byte[] data)
public void AddMessage()
{
byte[] message = {0x01, 0x02};
batcher.AddMessage(new ArraySegment<byte>(message));
batcher.AddMessage(new ArraySegment<byte>(message), TimeStamp);
}

[Test]
public void MakeNextBatch_OnlyAcceptsFreshWriter()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);

writer.WriteByte(0);
Assert.Throws<ArgumentException>(() => {
batcher.MakeNextBatch(writer, TimeStamp);
batcher.GetBatch(writer);
});
}

[Test]
public void MakeNextBatch_NoMessage()
{
// make batch with no message
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(false));
}

Expand All @@ -60,10 +60,10 @@ public void MakeNextBatch_OneMessage()
{
// add message
byte[] message = {0x01, 0x02};
batcher.AddMessage(new ArraySegment<byte>(message));
batcher.AddMessage(new ArraySegment<byte>(message), TimeStamp);

// make batch
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -73,46 +73,46 @@ public void MakeNextBatch_OneMessage()
[Test]
public void MakeNextBatch_MultipleMessages_AlmostFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}), TimeStamp);

// make batch
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02, 0x03})));

// there should be no more batches to make
Assert.That(batcher.MakeNextBatch(writer, TimeStamp), Is.False);
Assert.That(batcher.GetBatch(writer), Is.False);
}

[Test]
public void MakeNextBatch_MultipleMessages_ExactlyFullBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}), TimeStamp);

// make batch
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02, 0x03, 0x04})));

// there should be no more batches to make
Assert.That(batcher.MakeNextBatch(writer, TimeStamp), Is.False);
Assert.That(batcher.GetBatch(writer), Is.False);
}

[Test]
public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
{
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x05}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01, 0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03, 0x04}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x05}), TimeStamp);

// first batch
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -122,7 +122,7 @@ public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
writer.Position = 0;

// second batch
result = batcher.MakeNextBatch(writer, TimeStamp);
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -133,12 +133,12 @@ public void MakeNextBatch_MultipleMessages_MoreThanOneBatch()
public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
{
// small, too big to include in batch, small
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02, 0x03, 0x04, 0x05}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x06, 0x07}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02, 0x03, 0x04, 0x05}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x06, 0x07}), TimeStamp);

// first batch
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -148,7 +148,7 @@ public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
writer.Position = 0;

// second batch
result = batcher.MakeNextBatch(writer, TimeStamp);
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -158,7 +158,7 @@ public void MakeNextBatch_MultipleMessages_Small_Giant_Small()
writer.Position = 0;

// third batch
result = batcher.MakeNextBatch(writer, TimeStamp);
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));

// check result: <<tickTimeStamp:8, message>>
Expand All @@ -176,10 +176,10 @@ public void MakeNextBatch_LargerThanThreshold()
byte[] large = new byte[Threshold + 1];
for (int i = 0; i < Threshold + 1; ++i)
large[i] = (byte)i;
batcher.AddMessage(new ArraySegment<byte>(large));
batcher.AddMessage(new ArraySegment<byte>(large), TimeStamp);

// result should be only the large message
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, large)));
}
Expand All @@ -199,30 +199,30 @@ public void MakeNextBatch_LargerThanThreshold_BetweenSmallerMessages()
// add two small, one large, two small messages.
// to make sure everything around it is still batched,
// and the large one is a separate batch.
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02}));
batcher.AddMessage(new ArraySegment<byte>(large));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x04}));
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x01}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x02}), TimeStamp);
batcher.AddMessage(new ArraySegment<byte>(large), TimeStamp + 1);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x03}), TimeStamp + 2);
batcher.AddMessage(new ArraySegment<byte>(new byte[]{0x04}), TimeStamp + 2);

// first batch should be the two small messages
bool result = batcher.MakeNextBatch(writer, TimeStamp);
bool result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp, new byte[]{0x01, 0x02})));

// reset writer
writer.Position = 0;

// second batch should be only the large message
result = batcher.MakeNextBatch(writer, TimeStamp + 1);
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp + 1, large)));

// reset writer
writer.Position = 0;

// third batch be the two small messages
result = batcher.MakeNextBatch(writer, TimeStamp + 2);
result = batcher.GetBatch(writer);
Assert.That(result, Is.EqualTo(true));
Assert.That(writer.ToArray().SequenceEqual(ConcatTimestamp(TimeStamp + 2, new byte[]{0x03, 0x04})));
}
Expand Down

0 comments on commit 6c2559a

Please sign in to comment.