From bebd5eef2092b9c380a79994c80b485ab5514a5e Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Fri, 8 Nov 2024 20:07:06 +0200 Subject: [PATCH] Refactor `ReplicationMessages` Instead of consuming `ReplicatedClients` and return it back after send, just pass it along. It makes much easier to reason about the code. This way I discovered an extra mut on one of the public methods. --- src/core/replicated_clients.rs | 2 +- src/server.rs | 54 ++++++++----- src/server/replication_messages.rs | 75 ++++--------------- .../replication_messages/init_message.rs | 2 +- .../replication_messages/update_message.rs | 4 +- 5 files changed, 56 insertions(+), 81 deletions(-) diff --git a/src/core/replicated_clients.rs b/src/core/replicated_clients.rs index f9addca8..4b3ad9d0 100644 --- a/src/core/replicated_clients.rs +++ b/src/core/replicated_clients.rs @@ -289,7 +289,7 @@ impl ReplicatedClient { } /// Gets the change tick for an entity that is replicated to this client. - pub fn get_change_tick(&mut self, entity: Entity) -> Option { + pub fn get_change_tick(&self, entity: Entity) -> Option { self.change_ticks.get(&entity).copied() } diff --git a/src/server.rs b/src/server.rs index 04d4c2fe..1126d443 100644 --- a/src/server.rs +++ b/src/server.rs @@ -270,17 +270,18 @@ impl ServerPlugin { replicated_archetypes.update(set.p0(), &rules); // Take ownership to avoid borrowing issues. - let replicated_clients = mem::take(&mut *set.p1()); + let mut replicated_clients = mem::take(&mut *set.p1()); let mut removal_buffer = mem::take(&mut *set.p2()); let mut client_buffers = mem::take(&mut *set.p3()); - messages.prepare(replicated_clients); + messages.reset(replicated_clients.len()); - collect_mappings(&mut messages, &mut set.p4())?; - collect_despawns(&mut messages, &mut set.p5())?; + collect_mappings(&mut messages, &replicated_clients, &mut set.p4())?; + collect_despawns(&mut messages, &mut replicated_clients, &mut set.p5())?; collect_removals(&mut messages, &removal_buffer)?; collect_changes( &mut messages, + &mut replicated_clients, &replicated_archetypes, ®istry, &removal_buffer, @@ -290,13 +291,23 @@ impl ServerPlugin { )?; removal_buffer.clear(); - let replicated_clients = messages.send( - &mut set.p6(), - &mut client_buffers, - **server_tick, - change_tick.this_run(), - time.elapsed(), - )?; + for ((init_message, update_message), client) in + messages.iter_mut().zip(replicated_clients.iter_mut()) + { + let server = &mut set.p6(); + + init_message.send(server, client, **server_tick)?; + update_message.send( + server, + client, + &mut client_buffers, + **server_tick, + change_tick.this_run(), + time.elapsed(), + )?; + + client.visibility_mut().update(); + } // Return borrowed data back. *set.p1() = replicated_clients; @@ -325,9 +336,10 @@ impl ServerPlugin { /// On deserialization mappings should be processed first, so all referenced entities after it will behave correctly. fn collect_mappings( messages: &mut ReplicationMessages, + replicated_clients: &ReplicatedClients, entity_map: &mut ClientEntityMap, ) -> bincode::Result<()> { - for (message, _, client) in messages.iter_mut_with_clients() { + for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter()) { message.start_array(); if let Some(mappings) = entity_map.0.get_mut(&client.id()) { @@ -344,6 +356,7 @@ fn collect_mappings( /// Collect entity despawns from this tick into init messages. fn collect_despawns( messages: &mut ReplicationMessages, + replicated_clients: &mut ReplicatedClients, despawn_buffer: &mut DespawnBuffer, ) -> bincode::Result<()> { for (message, _) in messages.iter_mut() { @@ -352,13 +365,13 @@ fn collect_despawns( for entity in despawn_buffer.drain(..) { let mut shared_bytes = None; - for (message, _, client) in messages.iter_mut_with_clients() { + for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) { client.remove_despawned(entity); message.write_entity(&mut shared_bytes, entity)?; } } - for (message, _, client) in messages.iter_mut_with_clients() { + for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) { for entity in client.drain_lost_visibility() { message.write_entity(&mut None, entity)?; } @@ -399,6 +412,7 @@ fn collect_removals( /// since the last entity tick. fn collect_changes( messages: &mut ReplicationMessages, + replicated_clients: &mut ReplicatedClients, replicated_archetypes: &ReplicatedArchetypes, registry: &ReplicationRegistry, removal_buffer: &RemovalBuffer, @@ -428,7 +442,9 @@ fn collect_changes( }; for entity in archetype.entities() { - for (init_message, update_message, client) in messages.iter_mut_with_clients() { + for ((init_message, update_message), client) in + messages.iter_mut().zip(replicated_clients.iter_mut()) + { init_message.start_entity_data(entity.id()); update_message.start_entity_data(entity.id()); client.visibility_mut().cache_visibility(entity.id()); @@ -470,7 +486,9 @@ fn collect_changes( component_id, }; let mut shared_bytes = None; - for (init_message, update_message, client) in messages.iter_mut_with_clients() { + for ((init_message, update_message), client) in + messages.iter_mut().zip(replicated_clients.iter_mut()) + { let visibility = client.visibility().cached_visibility(); if visibility == Visibility::Hidden { continue; @@ -505,7 +523,9 @@ fn collect_changes( } } - for (init_message, update_message, client) in messages.iter_mut_with_clients() { + for ((init_message, update_message), client) in + messages.iter_mut().zip(replicated_clients.iter_mut()) + { let visibility = client.visibility().cached_visibility(); if visibility == Visibility::Hidden { continue; diff --git a/src/server/replication_messages.rs b/src/server/replication_messages.rs index 468ecadf..f5629b01 100644 --- a/src/server/replication_messages.rs +++ b/src/server/replication_messages.rs @@ -1,24 +1,15 @@ pub(super) mod init_message; pub(super) mod update_message; -use std::{ - io::{Cursor, Write}, - mem, - time::Duration, -}; +use std::io::{Cursor, Write}; -use bevy::{ecs::component::Tick, prelude::*}; +use bevy::prelude::*; use varint_rs::VarintWriter; -use crate::core::{ - replicated_clients::{ClientBuffers, ReplicatedClient, ReplicatedClients}, - replicon_server::RepliconServer, - replicon_tick::RepliconTick, -}; use init_message::InitMessage; use update_message::UpdateMessage; -/// Accumulates replication messages and sends them to clients. +/// Accumulates replication messages. /// /// Messages are serialized and deserialized manually because using an intermediate structure /// leads to allocations and according to our benchmarks it's much slower. @@ -26,8 +17,8 @@ use update_message::UpdateMessage; /// Reuses allocated memory from older messages. #[derive(Default)] pub(crate) struct ReplicationMessages { - replicated_clients: ReplicatedClients, - data: Vec<(InitMessage, UpdateMessage)>, + messages: Vec<(InitMessage, UpdateMessage)>, + len: usize, } impl ReplicationMessages { @@ -36,62 +27,26 @@ impl ReplicationMessages { /// Reuses already allocated messages. /// Creates new messages if the number of clients is bigger then the number of allocated messages. /// If there are more messages than the number of clients, then the extra messages remain untouched - /// and iteration methods will not include them. - pub(super) fn prepare(&mut self, replicated_clients: ReplicatedClients) { - self.data - .reserve(replicated_clients.len().saturating_sub(self.data.len())); + /// and [`Self::iter_mut`] will not include them. + pub(super) fn reset(&mut self, clients_count: usize) { + self.len = clients_count; - for index in 0..replicated_clients.len() { - if let Some((init_message, update_message)) = self.data.get_mut(index) { + let additional = clients_count.saturating_sub(self.messages.len()); + self.messages.reserve(additional); + + for index in 0..clients_count { + if let Some((init_message, update_message)) = self.messages.get_mut(index) { init_message.reset(); update_message.reset(); } else { - self.data.push(Default::default()); + self.messages.push(Default::default()); } } - - self.replicated_clients = replicated_clients; } /// Returns iterator over messages for each client. pub(super) fn iter_mut(&mut self) -> impl Iterator { - self.data.iter_mut().take(self.replicated_clients.len()) - } - - /// Same as [`Self::iter_mut`], but also includes [`ReplicatedClient`]. - pub(super) fn iter_mut_with_clients( - &mut self, - ) -> impl Iterator { - self.data - .iter_mut() - .zip(self.replicated_clients.iter_mut()) - .map(|((init_message, update_message), client)| (init_message, update_message, client)) - } - - /// Sends cached messages to clients specified in the last [`Self::prepare`] call. - /// - /// The change tick of each client with an init message is updated to equal the latest replicon tick. - /// messages were sent to clients. If only update messages were sent (or no messages at all) then - /// it will equal the input `last_change_tick`. - pub(super) fn send( - &mut self, - server: &mut RepliconServer, - client_buffers: &mut ClientBuffers, - server_tick: RepliconTick, - tick: Tick, - timestamp: Duration, - ) -> bincode::Result { - for ((init_message, update_message), client) in - self.data.iter_mut().zip(self.replicated_clients.iter_mut()) - { - init_message.send(server, client, server_tick)?; - update_message.send(server, client_buffers, client, server_tick, tick, timestamp)?; - client.visibility_mut().update(); - } - - let replicated_clients = mem::take(&mut self.replicated_clients); - - Ok(replicated_clients) + self.messages.iter_mut().take(self.len) } } diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs index 6ff8eccf..50908372 100644 --- a/src/server/replication_messages/init_message.rs +++ b/src/server/replication_messages/init_message.rs @@ -294,7 +294,7 @@ impl InitMessage { /// /// Updates change tick for the client if there are data to send. /// Does nothing if there is no data to send. - pub(super) fn send( + pub(crate) fn send( &self, server: &mut RepliconServer, client: &mut ReplicatedClient, diff --git a/src/server/replication_messages/update_message.rs b/src/server/replication_messages/update_message.rs index 0fc2bfd6..7fcc368f 100644 --- a/src/server/replication_messages/update_message.rs +++ b/src/server/replication_messages/update_message.rs @@ -145,11 +145,11 @@ impl UpdateMessage { /// Splits message according to entities inside it and sends it to the specified client. /// /// Does nothing if there is no data to send. - pub(super) fn send( + pub(crate) fn send( &mut self, server: &mut RepliconServer, - client_buffers: &mut ClientBuffers, client: &mut ReplicatedClient, + client_buffers: &mut ClientBuffers, server_tick: RepliconTick, tick: Tick, timestamp: Duration,