Skip to content

Commit

Permalink
Rename InitMessage into ChangeMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur committed Nov 16, 2024
1 parent a148ab1 commit e443c8b
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 280 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Move `Replicated` to the `replication` module.
- Split the `ctx` module and move event-related contexts under `core::events_registry::ctx` and replication-related contexts under `core::replication_registry::ctx`.
- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutate_timeout`.
- Rename `ServerInitTick` into `ServerChangeTick`.
- Rename `ReplicatedClient::init_tick` into `ReplicatedClient::change_tick`.
- Rename `ReplicatedClient::get_change_tick` into `ReplicatedClient::mutation_tick`.
- Rename `ReplicationChannel::Init` into `ReplicationChannel::Changes`.
- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`.

### Removed
Expand Down
77 changes: 40 additions & 37 deletions benches/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn replication<C: Component + Default + Serialize + DeserializeOwned + Clone>(c:
name = &name[MODULE_PREFIX_LEN..];

for clients in [1, 20] {
c.bench_function(&format!("{name}, init send, {clients} client(s)"), |b| {
c.bench_function(&format!("{name}, changes send, {clients} client(s)"), |b| {
b.iter_custom(|iter| {
let mut elapsed = Duration::ZERO;
for _ in 0..iter {
Expand Down Expand Up @@ -82,53 +82,56 @@ fn replication<C: Component + Default + Serialize + DeserializeOwned + Clone>(c:
})
});

c.bench_function(&format!("{name}, mutate send, {clients} client(s)"), |b| {
b.iter_custom(|iter| {
let mut server_app = create_app::<C>();
let mut client_apps = Vec::new();
for _ in 0..clients {
client_apps.push(create_app::<C>());
}

for client_app in &mut client_apps {
server_app.connect_client(client_app);
}

server_app
.world_mut()
.spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]);
let mut query = server_app.world_mut().query::<&mut C>();

server_app.update();
for client_app in &mut client_apps {
server_app.exchange_with_client(client_app);
client_app.update();
assert_eq!(client_app.world().entities().len(), ENTITIES);
}
c.bench_function(
&format!("{name}, mutations send, {clients} client(s)"),
|b| {
b.iter_custom(|iter| {
let mut server_app = create_app::<C>();
let mut client_apps = Vec::new();
for _ in 0..clients {
client_apps.push(create_app::<C>());
}

let mut elapsed = Duration::ZERO;
for _ in 0..iter {
for mut component in query.iter_mut(server_app.world_mut()) {
component.set_changed();
for client_app in &mut client_apps {
server_app.connect_client(client_app);
}

let instant = Instant::now();
server_app.update();
elapsed += instant.elapsed();
server_app
.world_mut()
.spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]);
let mut query = server_app.world_mut().query::<&mut C>();

server_app.update();
for client_app in &mut client_apps {
server_app.exchange_with_client(client_app);
client_app.update();
assert_eq!(client_app.world().entities().len(), ENTITIES);
}
}

elapsed
})
});
let mut elapsed = Duration::ZERO;
for _ in 0..iter {
for mut component in query.iter_mut(server_app.world_mut()) {
component.set_changed();
}

let instant = Instant::now();
server_app.update();
elapsed += instant.elapsed();

for client_app in &mut client_apps {
server_app.exchange_with_client(client_app);
client_app.update();
assert_eq!(client_app.world().entities().len(), ENTITIES);
}
}

elapsed
})
},
);
}

c.bench_function(&format!("{name}, init receive"), |b| {
c.bench_function(&format!("{name}, changes receive"), |b| {
b.iter_custom(|iter| {
let mut elapsed = Duration::ZERO;
for _ in 0..iter {
Expand All @@ -154,7 +157,7 @@ fn replication<C: Component + Default + Serialize + DeserializeOwned + Clone>(c:
})
});

c.bench_function(&format!("{name}, mutate receive"), |b| {
c.bench_function(&format!("{name}, mutations receive"), |b| {
b.iter_custom(|iter| {
let mut server_app = create_app::<C>();
let mut client_app = create_app::<C>();
Expand Down
83 changes: 41 additions & 42 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crate::core::{
channels::{ReplicationChannel, RepliconChannels},
common_conditions::{client_connected, client_just_connected, client_just_disconnected},
replication::{
change_message_arrays::ChangeMessageArrays,
command_markers::{CommandMarkers, EntityMarkers},
deferred_entity::DeferredEntity,
init_message_arrays::InitMessageArrays,
replication_registry::{
ctx::{DespawnCtx, RemoveCtx, WriteCtx},
ReplicationRegistry,
Expand All @@ -38,7 +38,7 @@ impl Plugin for ClientPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<RepliconClient>()
.init_resource::<ServerEntityMap>()
.init_resource::<ServerInitTick>()
.init_resource::<ServerChangeTick>()
.init_resource::<BufferedMutations>()
.configure_sets(
PreUpdate,
Expand Down Expand Up @@ -76,12 +76,12 @@ impl ClientPlugin {

/// Receives and applies replication messages from the server.
///
/// Tick init messages are sent over the [`ReplicationChannel::Init`] and are applied first to ensure valid state
/// Change messages are sent over the [`ReplicationChannel::Changes`] and are applied first to ensure valid state
/// for component mutations.
///
/// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear
/// ahead-of or behind init messages from the same server tick. A mutation will only be applied if its
/// change tick has already appeared in an init message, otherwise it will be buffered while waiting.
/// ahead-of or behind change messages from the same server tick. A mutation will only be applied if its
/// change tick has already appeared in an change message, otherwise it will be buffered while waiting.
/// Since component mutations can arrive in any order, they will only be applied if correspond to a more
/// recent server tick than the last acked server tick for each entity.
///
Expand Down Expand Up @@ -130,11 +130,11 @@ impl ClientPlugin {
}

fn reset(
mut init_tick: ResMut<ServerInitTick>,
mut change_tick: ResMut<ServerChangeTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
) {
*init_tick = Default::default();
*change_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
}
Expand All @@ -149,33 +149,33 @@ fn apply_replication(
client: &mut RepliconClient,
buffered_mutations: &mut BufferedMutations,
) -> bincode::Result<()> {
for message in client.receive(ReplicationChannel::Init) {
apply_init_message(world, params, &message)?;
for message in client.receive(ReplicationChannel::Changes) {
apply_change_message(world, params, &message)?;
}

// Unlike init messages, we read all mutate messages first, sort them by tick
// Unlike change messages, we read all mutate messages first, sort them by tick
// in descending order to ensure that the last mutation will be applied first.
// Since mutate messages manually split by packet size, we apply all messages,
// but skip outdated data per-entity by checking last received tick for it
// (unless user requested history via marker).
let init_tick = *world.resource::<ServerInitTick>();
let change_tick = *world.resource::<ServerChangeTick>();
let acks_size = mem::size_of::<u16>() * client.received_count(ReplicationChannel::Mutations);
if acks_size != 0 {
let mut acks = Vec::with_capacity(acks_size);
for message in client.receive(ReplicationChannel::Mutations) {
let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?;
acks.write_varint(mutate_index)?;
}
client.send(ReplicationChannel::Init, acks);
client.send(ReplicationChannel::Changes, acks);
}

apply_mutate_messages(world, params, buffered_mutations, init_tick)
apply_mutate_messages(world, params, buffered_mutations, change_tick)
}

/// Reads and applies init message.
/// Reads and applies change message.
///
/// For details see [`replication_messages`](crate::server::replication_messages).
fn apply_init_message(
fn apply_change_message(
world: &mut World,
params: &mut ReceiveParams,
message: &[u8],
Expand All @@ -187,41 +187,41 @@ fn apply_init_message(
stats.bytes += end_pos;
}

let arrays = InitMessageArrays::from_bits_retain(cursor.read_fixedint()?);
let arrays = ChangeMessageArrays::from_bits_retain(cursor.read_fixedint()?);
let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
trace!("applying init message for {message_tick:?}");
world.resource_mut::<ServerInitTick>().0 = message_tick;
trace!("applying change message for {message_tick:?}");
world.resource_mut::<ServerChangeTick>().0 = message_tick;

let last_array = arrays.last();
for (_, array) in arrays.iter_names() {
match array {
InitMessageArrays::MAPPINGS => {
ChangeMessageArrays::MAPPINGS => {
let len = apply_array(&mut cursor, array != last_array, |cursor| {
apply_entity_mapping(world, params, cursor)
})?;
if let Some(stats) = &mut params.stats {
stats.mappings += len as u32;
}
}
InitMessageArrays::DESPAWNS => {
ChangeMessageArrays::DESPAWNS => {
let len = apply_array(&mut cursor, array != last_array, |cursor| {
apply_despawn(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.despawns += len as u32;
}
}
InitMessageArrays::REMOVALS => {
ChangeMessageArrays::REMOVALS => {
let len = apply_array(&mut cursor, array != last_array, |cursor| {
apply_removals(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.entities_changed += len as u32;
}
}
InitMessageArrays::CHANGES => {
ChangeMessageArrays::CHANGES => {
let len = apply_array(&mut cursor, false, |cursor| {
apply_init_changes(world, params, cursor, message_tick)
apply_changes(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.entities_changed += len as u32;
Expand Down Expand Up @@ -251,12 +251,12 @@ fn buffer_mutate_message(
stats.bytes += end_pos;
}

let init_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let change_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let mutate_index = cursor.read_varint()?;
trace!("received mutate message for {message_tick:?}");
buffered_mutations.insert(BufferedMutate {
init_tick,
change_tick,
message_tick,
message: message.slice(cursor.position() as usize..),
});
Expand All @@ -266,17 +266,17 @@ fn buffer_mutate_message(

/// Applies mutations from [`BufferedMutations`].
///
/// If the mutate message can't be applied yet (because the init message with the
/// If the mutate message can't be applied yet (because the change message with the
/// corresponding tick hasn't arrived), it will be kept in the buffer.
fn apply_mutate_messages(
world: &mut World,
params: &mut ReceiveParams,
buffered_mutations: &mut BufferedMutations,
init_tick: ServerInitTick,
change_tick: ServerChangeTick,
) -> bincode::Result<()> {
let mut result = Ok(());
buffered_mutations.0.retain(|mutate| {
if mutate.init_tick > *init_tick {
if mutate.change_tick > *change_tick {
return true;
}

Expand Down Expand Up @@ -305,7 +305,7 @@ fn apply_mutate_messages(
/// If the array is serialized with the length, calls `f` the specified number of times.
/// Otherwise, calls `f` until the end of the cursor.
///
/// See [`InitMessageArrays`] for details.
/// See [`ChangeMessageArrays`] for details.
fn apply_array(
cursor: &mut Cursor<&[u8]>,
with_len: bool,
Expand All @@ -330,7 +330,7 @@ fn apply_array(
}
}

/// Deserializes and applies server mapping from client's pre-spawned entities from init message.
/// Deserializes and applies server mapping from client's pre-spawned entities.
fn apply_entity_mapping(
world: &mut World,
params: &mut ReceiveParams,
Expand All @@ -351,7 +351,7 @@ fn apply_entity_mapping(
Ok(())
}

/// Deserializes and applies entity despawn from init message.
/// Deserializes and applies entity despawn from change message.
fn apply_despawn(
world: &mut World,
params: &mut ReceiveParams,
Expand All @@ -374,7 +374,7 @@ fn apply_despawn(
Ok(())
}

/// Deserializes and applies component removals for an entity from init message.
/// Deserializes and applies component removals for an entity.
fn apply_removals(
world: &mut World,
params: &mut ReceiveParams,
Expand Down Expand Up @@ -425,8 +425,8 @@ fn apply_removals(
Ok(())
}

/// Deserializes and applies component insertions or mutations for an entity from init message.
fn apply_init_changes(
/// Deserializes and applies component insertions or mutations for an entity.
fn apply_changes(
world: &mut World,
params: &mut ReceiveParams,
cursor: &mut Cursor<&[u8]>,
Expand Down Expand Up @@ -481,7 +481,7 @@ fn apply_init_changes(
Ok(())
}

/// Deserializes and applies component mutations for all entities from mutate message.
/// Deserializes and applies component mutations for all entities.
///
/// Consumes all remaining bytes in the cursor.
fn apply_mutations(
Expand All @@ -494,7 +494,7 @@ fn apply_mutations(
let data_size: usize = cursor.read_varint()?;

let Some(client_entity) = params.entity_map.get_by_server(server_entity) else {
// Mutation could arrive after a despawn from init message.
// Mutation could arrive after a despawn from change message.
debug!("ignoring mutations received for unknown server's {server_entity:?}");
cursor.set_position(cursor.position() + data_size as u64);
return Ok(());
Expand Down Expand Up @@ -674,15 +674,14 @@ pub enum ClientSet {
Reset,
}

/// Last received tick for init message from server.
/// Last received tick for change message from server.
///
/// In other words, last [`RepliconTick`] with a removal, insertion, spawn or despawn.
/// When a component mutates, this value is not updated.
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct ServerInitTick(RepliconTick);
pub struct ServerChangeTick(RepliconTick);

/// Cached buffered mutate messages, used by the replicon client to align them with initialization
/// messages.
/// Cached buffered mutate messages, used by the replicon client to align them with change messages.
///
/// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`].
#[derive(Default, Resource)]
Expand All @@ -702,12 +701,12 @@ impl BufferedMutations {
}
}

/// Partially-deserialized mutate message that is waiting for its tick to appear in an init message.
/// Partially-deserialized mutate message that is waiting for its tick to appear in an change message.
///
/// See also [`crate::server::replication_messages`].
pub(super) struct BufferedMutate {
/// Required tick to wait for.
init_tick: RepliconTick,
change_tick: RepliconTick,

/// The tick this mutations corresponds to.
message_tick: RepliconTick,
Expand Down
Loading

0 comments on commit e443c8b

Please sign in to comment.