Skip to content

Commit

Permalink
Refactor ReplicationMessages
Browse files Browse the repository at this point in the history
Instead of consuming `ReplicatedClients` and return it back after
send, just pass it along. It makes much easier to reason about
the code.

This way I discovered an extra mut on one of the public methods.
  • Loading branch information
Shatur committed Nov 8, 2024
1 parent a00436b commit bebd5ee
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 81 deletions.
2 changes: 1 addition & 1 deletion src/core/replicated_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl ReplicatedClient {
}

/// Gets the change tick for an entity that is replicated to this client.
pub fn get_change_tick(&mut self, entity: Entity) -> Option<Tick> {
pub fn get_change_tick(&self, entity: Entity) -> Option<Tick> {
self.change_ticks.get(&entity).copied()
}

Expand Down
54 changes: 37 additions & 17 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,18 @@ impl ServerPlugin {
replicated_archetypes.update(set.p0(), &rules);

// Take ownership to avoid borrowing issues.
let replicated_clients = mem::take(&mut *set.p1());
let mut replicated_clients = mem::take(&mut *set.p1());
let mut removal_buffer = mem::take(&mut *set.p2());
let mut client_buffers = mem::take(&mut *set.p3());

messages.prepare(replicated_clients);
messages.reset(replicated_clients.len());

collect_mappings(&mut messages, &mut set.p4())?;
collect_despawns(&mut messages, &mut set.p5())?;
collect_mappings(&mut messages, &replicated_clients, &mut set.p4())?;
collect_despawns(&mut messages, &mut replicated_clients, &mut set.p5())?;
collect_removals(&mut messages, &removal_buffer)?;
collect_changes(
&mut messages,
&mut replicated_clients,
&replicated_archetypes,
&registry,
&removal_buffer,
Expand All @@ -290,13 +291,23 @@ impl ServerPlugin {
)?;
removal_buffer.clear();

let replicated_clients = messages.send(
&mut set.p6(),
&mut client_buffers,
**server_tick,
change_tick.this_run(),
time.elapsed(),
)?;
for ((init_message, update_message), client) in
messages.iter_mut().zip(replicated_clients.iter_mut())
{
let server = &mut set.p6();

init_message.send(server, client, **server_tick)?;
update_message.send(
server,
client,
&mut client_buffers,
**server_tick,
change_tick.this_run(),
time.elapsed(),
)?;

client.visibility_mut().update();
}

// Return borrowed data back.
*set.p1() = replicated_clients;
Expand Down Expand Up @@ -325,9 +336,10 @@ impl ServerPlugin {
/// On deserialization mappings should be processed first, so all referenced entities after it will behave correctly.
fn collect_mappings(
messages: &mut ReplicationMessages,
replicated_clients: &ReplicatedClients,
entity_map: &mut ClientEntityMap,
) -> bincode::Result<()> {
for (message, _, client) in messages.iter_mut_with_clients() {
for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter()) {
message.start_array();

if let Some(mappings) = entity_map.0.get_mut(&client.id()) {
Expand All @@ -344,6 +356,7 @@ fn collect_mappings(
/// Collect entity despawns from this tick into init messages.
fn collect_despawns(
messages: &mut ReplicationMessages,
replicated_clients: &mut ReplicatedClients,
despawn_buffer: &mut DespawnBuffer,
) -> bincode::Result<()> {
for (message, _) in messages.iter_mut() {
Expand All @@ -352,13 +365,13 @@ fn collect_despawns(

for entity in despawn_buffer.drain(..) {
let mut shared_bytes = None;
for (message, _, client) in messages.iter_mut_with_clients() {
for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) {
client.remove_despawned(entity);
message.write_entity(&mut shared_bytes, entity)?;
}
}

for (message, _, client) in messages.iter_mut_with_clients() {
for ((message, _), client) in messages.iter_mut().zip(replicated_clients.iter_mut()) {
for entity in client.drain_lost_visibility() {
message.write_entity(&mut None, entity)?;
}
Expand Down Expand Up @@ -399,6 +412,7 @@ fn collect_removals(
/// since the last entity tick.
fn collect_changes(
messages: &mut ReplicationMessages,
replicated_clients: &mut ReplicatedClients,
replicated_archetypes: &ReplicatedArchetypes,
registry: &ReplicationRegistry,
removal_buffer: &RemovalBuffer,
Expand Down Expand Up @@ -428,7 +442,9 @@ fn collect_changes(
};

for entity in archetype.entities() {
for (init_message, update_message, client) in messages.iter_mut_with_clients() {
for ((init_message, update_message), client) in
messages.iter_mut().zip(replicated_clients.iter_mut())
{
init_message.start_entity_data(entity.id());
update_message.start_entity_data(entity.id());
client.visibility_mut().cache_visibility(entity.id());
Expand Down Expand Up @@ -470,7 +486,9 @@ fn collect_changes(
component_id,
};
let mut shared_bytes = None;
for (init_message, update_message, client) in messages.iter_mut_with_clients() {
for ((init_message, update_message), client) in
messages.iter_mut().zip(replicated_clients.iter_mut())
{
let visibility = client.visibility().cached_visibility();
if visibility == Visibility::Hidden {
continue;
Expand Down Expand Up @@ -505,7 +523,9 @@ fn collect_changes(
}
}

for (init_message, update_message, client) in messages.iter_mut_with_clients() {
for ((init_message, update_message), client) in
messages.iter_mut().zip(replicated_clients.iter_mut())
{
let visibility = client.visibility().cached_visibility();
if visibility == Visibility::Hidden {
continue;
Expand Down
75 changes: 15 additions & 60 deletions src/server/replication_messages.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
pub(super) mod init_message;
pub(super) mod update_message;

use std::{
io::{Cursor, Write},
mem,
time::Duration,
};
use std::io::{Cursor, Write};

use bevy::{ecs::component::Tick, prelude::*};
use bevy::prelude::*;
use varint_rs::VarintWriter;

use crate::core::{
replicated_clients::{ClientBuffers, ReplicatedClient, ReplicatedClients},
replicon_server::RepliconServer,
replicon_tick::RepliconTick,
};
use init_message::InitMessage;
use update_message::UpdateMessage;

/// Accumulates replication messages and sends them to clients.
/// Accumulates replication messages.
///
/// Messages are serialized and deserialized manually because using an intermediate structure
/// leads to allocations and according to our benchmarks it's much slower.
///
/// Reuses allocated memory from older messages.
#[derive(Default)]
pub(crate) struct ReplicationMessages {
replicated_clients: ReplicatedClients,
data: Vec<(InitMessage, UpdateMessage)>,
messages: Vec<(InitMessage, UpdateMessage)>,
len: usize,
}

impl ReplicationMessages {
Expand All @@ -36,62 +27,26 @@ impl ReplicationMessages {
/// Reuses already allocated messages.
/// Creates new messages if the number of clients is bigger then the number of allocated messages.
/// If there are more messages than the number of clients, then the extra messages remain untouched
/// and iteration methods will not include them.
pub(super) fn prepare(&mut self, replicated_clients: ReplicatedClients) {
self.data
.reserve(replicated_clients.len().saturating_sub(self.data.len()));
/// and [`Self::iter_mut`] will not include them.
pub(super) fn reset(&mut self, clients_count: usize) {
self.len = clients_count;

for index in 0..replicated_clients.len() {
if let Some((init_message, update_message)) = self.data.get_mut(index) {
let additional = clients_count.saturating_sub(self.messages.len());
self.messages.reserve(additional);

for index in 0..clients_count {
if let Some((init_message, update_message)) = self.messages.get_mut(index) {
init_message.reset();
update_message.reset();
} else {
self.data.push(Default::default());
self.messages.push(Default::default());
}
}

self.replicated_clients = replicated_clients;
}

/// Returns iterator over messages for each client.
pub(super) fn iter_mut(&mut self) -> impl Iterator<Item = &mut (InitMessage, UpdateMessage)> {
self.data.iter_mut().take(self.replicated_clients.len())
}

/// Same as [`Self::iter_mut`], but also includes [`ReplicatedClient`].
pub(super) fn iter_mut_with_clients(
&mut self,
) -> impl Iterator<Item = (&mut InitMessage, &mut UpdateMessage, &mut ReplicatedClient)> {
self.data
.iter_mut()
.zip(self.replicated_clients.iter_mut())
.map(|((init_message, update_message), client)| (init_message, update_message, client))
}

/// Sends cached messages to clients specified in the last [`Self::prepare`] call.
///
/// The change tick of each client with an init message is updated to equal the latest replicon tick.
/// messages were sent to clients. If only update messages were sent (or no messages at all) then
/// it will equal the input `last_change_tick`.
pub(super) fn send(
&mut self,
server: &mut RepliconServer,
client_buffers: &mut ClientBuffers,
server_tick: RepliconTick,
tick: Tick,
timestamp: Duration,
) -> bincode::Result<ReplicatedClients> {
for ((init_message, update_message), client) in
self.data.iter_mut().zip(self.replicated_clients.iter_mut())
{
init_message.send(server, client, server_tick)?;
update_message.send(server, client_buffers, client, server_tick, tick, timestamp)?;
client.visibility_mut().update();
}

let replicated_clients = mem::take(&mut self.replicated_clients);

Ok(replicated_clients)
self.messages.iter_mut().take(self.len)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/replication_messages/init_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl InitMessage {
///
/// Updates change tick for the client if there are data to send.
/// Does nothing if there is no data to send.
pub(super) fn send(
pub(crate) fn send(
&self,
server: &mut RepliconServer,
client: &mut ReplicatedClient,
Expand Down
4 changes: 2 additions & 2 deletions src/server/replication_messages/update_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ impl UpdateMessage {
/// Splits message according to entities inside it and sends it to the specified client.
///
/// Does nothing if there is no data to send.
pub(super) fn send(
pub(crate) fn send(
&mut self,
server: &mut RepliconServer,
client_buffers: &mut ClientBuffers,
client: &mut ReplicatedClient,
client_buffers: &mut ClientBuffers,
server_tick: RepliconTick,
tick: Tick,
timestamp: Duration,
Expand Down

0 comments on commit bebd5ee

Please sign in to comment.