-
Notifications
You must be signed in to change notification settings - Fork 450
feat: Message Ordering #948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0a4f1e6
6edda60
1905156
0c34ce4
8af4b87
2f66fba
c24834e
722b5ff
956d996
d61a06e
c015bac
6cc5f66
87fef25
9c279a0
b26b8b5
f2fbd0a
6be7950
c2eb4ef
f62b3e0
b71926b
3c2176f
c362242
adb380f
1a58496
a8e834b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,19 @@ internal enum __RpcExecStage | |
Client = 2 | ||
} | ||
|
||
private static void SetUpdateStage<T>(ref T param) where T: IHasUpdateStage | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
if (param.UpdateStage == NetworkUpdateStage.Unset) | ||
{ | ||
param.UpdateStage = NetworkUpdateLoop.UpdateStage; | ||
|
||
if (param.UpdateStage == NetworkUpdateStage.Initialization) | ||
{ | ||
param.UpdateStage = NetworkUpdateStage.EarlyUpdate; | ||
} | ||
} | ||
} | ||
|
||
#pragma warning disable 414 // disable assigned but its value is never used | ||
#pragma warning disable IDE1006 // disable naming rule violation check | ||
[NonSerialized] | ||
|
@@ -47,34 +60,35 @@ internal NetworkSerializer __beginSendServerRpc(uint rpcMethodId, ServerRpcParam | |
{ | ||
PooledNetworkWriter writer; | ||
|
||
var rpcQueueContainer = NetworkManager.RpcQueueContainer; | ||
var isUsingBatching = rpcQueueContainer.IsUsingBatching(); | ||
SetUpdateStage(ref serverRpcParams.Send); | ||
|
||
if (serverRpcParams.Send.UpdateStage == NetworkUpdateStage.Initialization) | ||
{ | ||
throw new NotSupportedException( | ||
$"{nameof(NetworkUpdateStage.Initialization)} cannot be used as a target for processing RPCs."); | ||
mattwalsh-unity marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
var messageQueueContainer = NetworkManager.MessageQueueContainer; | ||
var transportChannel = rpcDelivery == RpcDelivery.Reliable ? NetworkChannel.ReliableRpc : NetworkChannel.UnreliableRpc; | ||
|
||
if (IsHost) | ||
{ | ||
writer = rpcQueueContainer.BeginAddQueueItemToFrame(RpcQueueContainer.QueueItemType.ServerRpc, Time.realtimeSinceStartup, transportChannel, | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
NetworkManager.ServerClientId, null, RpcQueueHistoryFrame.QueueFrameType.Inbound, serverRpcParams.Send.UpdateStage); | ||
|
||
if (!isUsingBatching) | ||
{ | ||
writer.WriteByte(NetworkConstants.SERVER_RPC); // MessageType | ||
} | ||
writer = messageQueueContainer.BeginAddQueueItemToFrame(MessageQueueContainer.MessageType.ServerRpc, Time.realtimeSinceStartup, transportChannel, | ||
NetworkManager.ServerClientId, null, MessageQueueHistoryFrame.QueueFrameType.Inbound, serverRpcParams.Send.UpdateStage); | ||
} | ||
else | ||
{ | ||
writer = rpcQueueContainer.BeginAddQueueItemToFrame(RpcQueueContainer.QueueItemType.ServerRpc, Time.realtimeSinceStartup, transportChannel, | ||
NetworkManager.ServerClientId, null, RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
if (!isUsingBatching) | ||
{ | ||
writer.WriteByte(NetworkConstants.SERVER_RPC); // MessageType | ||
} | ||
writer = messageQueueContainer.BeginAddQueueItemToFrame(MessageQueueContainer.MessageType.ServerRpc, Time.realtimeSinceStartup, transportChannel, | ||
NetworkManager.ServerClientId, null, MessageQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
|
||
writer.WriteByte((byte)MessageQueueContainer.MessageType.ServerRpc); | ||
writer.WriteByte((byte)serverRpcParams.Send.UpdateStage); // NetworkUpdateStage | ||
} | ||
|
||
writer.WriteUInt64Packed(NetworkObjectId); // NetworkObjectId | ||
writer.WriteUInt16Packed(NetworkBehaviourId); // NetworkBehaviourId | ||
writer.WriteUInt32Packed(rpcMethodId); // NetworkRpcMethodId | ||
writer.WriteByte((byte)serverRpcParams.Send.UpdateStage); // NetworkUpdateStage | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so in the old code we wrote out the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that is the "host loopback" and so it doesn't need to write it out because it is being directly inserted into the next frame's inbound local MessageQueue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. UpdateStage is part of the unified header now. It's no longer part of the RPC message. The loopback code doesn't read the header for some reason... the fact that the behavior when pushing to the inbound queue is different than when pushing to the outbound is one of the things I'm addressing in my serialization RFC. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To elaborate a bit further... The message header (including UpdateStage) is only used on the receiving end during HandleIncomingData() in order to properly add the item to the correct queue. When sending to loopback, the header isn't written because HandleIncomingData() is bypassed, so we skip it when the only destination is the inbound queue because skipping it here emulates the behavior of HandleIncomingData in reading it (and thus pushing the read head past it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, brilliant @ShadauxCat , can we hoist this last comment into a comment in the code? This is something subtle and worth placing here for future devs / users. |
||
|
||
return writer.Serializer; | ||
} | ||
|
@@ -89,14 +103,16 @@ internal void __endSendServerRpc(NetworkSerializer serializer, uint rpcMethodId, | |
return; | ||
} | ||
|
||
var rpcQueueContainer = NetworkManager.RpcQueueContainer; | ||
SetUpdateStage(ref serverRpcParams.Send); | ||
|
||
var messageQueueContainer = NetworkManager.MessageQueueContainer; | ||
if (IsHost) | ||
{ | ||
rpcQueueContainer.EndAddQueueItemToFrame(serializer.Writer, RpcQueueHistoryFrame.QueueFrameType.Inbound, serverRpcParams.Send.UpdateStage); | ||
messageQueueContainer.EndAddQueueItemToFrame(serializer.Writer, MessageQueueHistoryFrame.QueueFrameType.Inbound, serverRpcParams.Send.UpdateStage); | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
else | ||
{ | ||
rpcQueueContainer.EndAddQueueItemToFrame(serializer.Writer, RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
messageQueueContainer.EndAddQueueItemToFrame(serializer.Writer, MessageQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
} | ||
} | ||
|
||
|
@@ -107,66 +123,66 @@ internal NetworkSerializer __beginSendClientRpc(uint rpcMethodId, ClientRpcParam | |
{ | ||
PooledNetworkWriter writer; | ||
|
||
SetUpdateStage(ref clientRpcParams.Send); | ||
|
||
if (clientRpcParams.Send.UpdateStage == NetworkUpdateStage.Initialization) | ||
{ | ||
throw new NotSupportedException( | ||
$"{nameof(NetworkUpdateStage.Initialization)} cannot be used as a target for processing RPCs."); | ||
} | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// This will start a new queue item entry and will then return the writer to the current frame's stream | ||
var rpcQueueContainer = NetworkManager.RpcQueueContainer; | ||
var isUsingBatching = rpcQueueContainer.IsUsingBatching(); | ||
var transportChannel = rpcDelivery == RpcDelivery.Reliable ? NetworkChannel.ReliableRpc : NetworkChannel.UnreliableRpc; | ||
|
||
ulong[] clientIds = clientRpcParams.Send.TargetClientIds ?? NetworkManager.ConnectedClientsList.Select(c => c.ClientId).ToArray(); | ||
ulong[] clientIds = clientRpcParams.Send.TargetClientIds ?? NetworkManager.ConnectedClientsIds; | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (clientRpcParams.Send.TargetClientIds != null && clientRpcParams.Send.TargetClientIds.Length == 0) | ||
{ | ||
clientIds = NetworkManager.ConnectedClientsList.Select(c => c.ClientId).ToArray(); | ||
clientIds = NetworkManager.ConnectedClientsIds; | ||
} | ||
|
||
//NOTES ON BELOW CHANGES: | ||
//The following checks for IsHost and whether the host client id is part of the clients to recieve the RPC | ||
//Is part of a patch-fix to handle looping back RPCs into the next frame's inbound queue. | ||
//!!! This code is temporary and will change (soon) when NetworkSerializer can be configured for mutliple NetworkWriters!!! | ||
var containsServerClientId = clientIds.Contains(NetworkManager.ServerClientId); | ||
bool addHeader = true; | ||
var messageQueueContainer = NetworkManager.MessageQueueContainer; | ||
if (IsHost && containsServerClientId) | ||
{ | ||
//Always write to the next frame's inbound queue | ||
writer = rpcQueueContainer.BeginAddQueueItemToFrame(RpcQueueContainer.QueueItemType.ClientRpc, Time.realtimeSinceStartup, transportChannel, | ||
NetworkManager.ServerClientId, null, RpcQueueHistoryFrame.QueueFrameType.Inbound, clientRpcParams.Send.UpdateStage); | ||
writer = messageQueueContainer.BeginAddQueueItemToFrame(MessageQueueContainer.MessageType.ClientRpc, Time.realtimeSinceStartup, transportChannel, | ||
NetworkManager.ServerClientId, null, MessageQueueHistoryFrame.QueueFrameType.Inbound, clientRpcParams.Send.UpdateStage); | ||
|
||
//Handle sending to the other clients, if so the above notes explain why this code is here (a temporary patch-fix) | ||
if (clientIds.Length > 1) | ||
{ | ||
//Set the loopback frame | ||
rpcQueueContainer.SetLoopBackFrameItem(clientRpcParams.Send.UpdateStage); | ||
messageQueueContainer.SetLoopBackFrameItem(clientRpcParams.Send.UpdateStage); | ||
|
||
//Switch to the outbound queue | ||
writer = rpcQueueContainer.BeginAddQueueItemToFrame(RpcQueueContainer.QueueItemType.ClientRpc, Time.realtimeSinceStartup, transportChannel, NetworkObjectId, | ||
clientIds, RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
|
||
if (!isUsingBatching) | ||
{ | ||
writer.WriteByte(NetworkConstants.CLIENT_RPC); // MessageType | ||
} | ||
writer = messageQueueContainer.BeginAddQueueItemToFrame(MessageQueueContainer.MessageType.ClientRpc, Time.realtimeSinceStartup, transportChannel, NetworkObjectId, | ||
clientIds, MessageQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
} | ||
else | ||
{ | ||
if (!isUsingBatching) | ||
{ | ||
writer.WriteByte(NetworkConstants.CLIENT_RPC); // MessageType | ||
} | ||
addHeader = false; | ||
ShadauxCat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
else | ||
{ | ||
writer = rpcQueueContainer.BeginAddQueueItemToFrame(RpcQueueContainer.QueueItemType.ClientRpc, Time.realtimeSinceStartup, transportChannel, NetworkObjectId, | ||
clientIds, RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
|
||
if (!isUsingBatching) | ||
{ | ||
writer.WriteByte(NetworkConstants.CLIENT_RPC); // MessageType | ||
} | ||
writer = messageQueueContainer.BeginAddQueueItemToFrame(MessageQueueContainer.MessageType.ClientRpc, Time.realtimeSinceStartup, transportChannel, NetworkObjectId, | ||
clientIds, MessageQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
} | ||
|
||
if (addHeader) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ultra nit, I would probably rename this / invert to 'skipHeader' and (even though it's pretty obvious when you look at the code) have a comment: "don't send a header for 1 client" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I generally dislike using negative terms for variables and then inverting them. I find "if(do thing)" to be more intuitive and less prone to being misread than "if(not don't do thing)". But I won't die on that hill if you disagree. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I can see that...but let's add the |
||
{ | ||
writer.WriteByte((byte)MessageQueueContainer.MessageType.ClientRpc); | ||
writer.WriteByte((byte)clientRpcParams.Send.UpdateStage); // NetworkUpdateStage | ||
} | ||
writer.WriteUInt64Packed(NetworkObjectId); // NetworkObjectId | ||
writer.WriteUInt16Packed(NetworkBehaviourId); // NetworkBehaviourId | ||
writer.WriteUInt32Packed(rpcMethodId); // NetworkRpcMethodId | ||
writer.WriteByte((byte)clientRpcParams.Send.UpdateStage); // NetworkUpdateStage | ||
|
||
|
||
return writer.Serializer; | ||
} | ||
|
@@ -181,25 +197,27 @@ internal void __endSendClientRpc(NetworkSerializer serializer, uint rpcMethodId, | |
return; | ||
} | ||
|
||
var rpcQueueContainer = NetworkManager.RpcQueueContainer; | ||
SetUpdateStage(ref clientRpcParams.Send); | ||
|
||
var messageQueueContainer = NetworkManager.MessageQueueContainer; | ||
|
||
if (IsHost) | ||
{ | ||
ulong[] clientIds = clientRpcParams.Send.TargetClientIds ?? NetworkManager.ConnectedClientsList.Select(c => c.ClientId).ToArray(); | ||
ulong[] clientIds = clientRpcParams.Send.TargetClientIds ?? NetworkManager.ConnectedClientsIds; | ||
if (clientRpcParams.Send.TargetClientIds != null && clientRpcParams.Send.TargetClientIds.Length == 0) | ||
{ | ||
clientIds = NetworkManager.ConnectedClientsList.Select(c => c.ClientId).ToArray(); | ||
clientIds = NetworkManager.ConnectedClientsIds; | ||
} | ||
|
||
var containsServerClientId = clientIds.Contains(NetworkManager.ServerClientId); | ||
if (containsServerClientId && clientIds.Length == 1) | ||
{ | ||
rpcQueueContainer.EndAddQueueItemToFrame(serializer.Writer, RpcQueueHistoryFrame.QueueFrameType.Inbound, clientRpcParams.Send.UpdateStage); | ||
messageQueueContainer.EndAddQueueItemToFrame(serializer.Writer, MessageQueueHistoryFrame.QueueFrameType.Inbound, clientRpcParams.Send.UpdateStage); | ||
return; | ||
} | ||
} | ||
|
||
rpcQueueContainer.EndAddQueueItemToFrame(serializer.Writer, RpcQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
messageQueueContainer.EndAddQueueItemToFrame(serializer.Writer, MessageQueueHistoryFrame.QueueFrameType.Outbound, NetworkUpdateStage.PostLateUpdate); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -564,8 +582,16 @@ private void NetworkVariableUpdate(ulong clientId, int behaviourIndex) | |
|
||
if (writtenAny) | ||
{ | ||
NetworkManager.MessageSender.Send(clientId, NetworkConstants.NETWORK_VARIABLE_DELTA, | ||
m_ChannelsForNetworkVariableGroups[j], buffer); | ||
var context = NetworkManager.MessageQueueContainer.EnterInternalCommandContext( | ||
mattwalsh-unity marked this conversation as resolved.
Show resolved
Hide resolved
|
||
MessageQueueContainer.MessageType.NetworkVariableDelta, m_ChannelsForNetworkVariableGroups[j], | ||
new[] {clientId}, NetworkUpdateLoop.UpdateStage); | ||
if (context != null) | ||
{ | ||
using (var nonNullContext = (InternalCommandContext)context) | ||
{ | ||
nonNullContext.NetworkWriter.WriteBytes(buffer.GetBuffer(), buffer.Length); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.