Skip to content

Commit

Permalink
perf: add tracy tracing and optimize player join packets
Browse files Browse the repository at this point in the history
Improve performance monitoring and network efficiency:
- Integrate tracy tracing with info_span for better profiling visibility
- Batch player join packets using new DataBundle to reduce network calls
- Encapsulate PlayerHandle with proper error handling
- Add GametickSpan for accurate performance tracking
  • Loading branch information
andrewgazelka committed Nov 2, 2024
1 parent 5fdff98 commit 937fbc2
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 154 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/hyperion-proxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[dependencies]
colored = "2.1.0"
kanal = "0.1.0-pre8"
# integer-encoding = "4.0.2"
papaya = "0.1.4"
rkyv = "0.8.8"
rustc-hash = "2.0.0"
Expand Down
39 changes: 36 additions & 3 deletions crates/hyperion-proxy/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};
use std::sync::{atomic, atomic::AtomicBool, Arc};

use anyhow::bail;
use bytes::Bytes;
use slotmap::{new_key_type, KeyData};

Expand Down Expand Up @@ -98,20 +99,52 @@ impl Ord for OrderedBytes {

#[derive(Debug)]
pub struct PlayerHandle {
pub writer: kanal::AsyncSender<OrderedBytes>,
writer: kanal::AsyncSender<OrderedBytes>,

/// Whether the player is allowed to send broadcasts.
///
/// This exists because the player is not automatically in the play state,
/// and if they are not in the play state and they receive broadcasts,
/// they will get packets that it deems are invalid because the broadcasts are using the play
/// state and play IDs.
pub can_receive_broadcasts: AtomicBool,
can_receive_broadcasts: AtomicBool,
}

impl PlayerHandle {
#[must_use]
pub const fn new(writer: kanal::AsyncSender<OrderedBytes>) -> Self {
Self {
writer,
can_receive_broadcasts: AtomicBool::new(false),
}
}

pub fn shutdown(&self) {
let _ = self.writer.try_send(OrderedBytes::SHUTDOWN);
self.writer.close();
}

pub fn enable_receive_broadcasts(&self) {
self.can_receive_broadcasts
.store(true, atomic::Ordering::Relaxed);
}

pub fn can_receive_broadcasts(&self) -> bool {
self.can_receive_broadcasts.load(atomic::Ordering::Relaxed)
}

pub fn send(&self, ordered_bytes: OrderedBytes) -> anyhow::Result<()> {
match self.writer.try_send(ordered_bytes) {
Ok(true) => Ok(()),

Ok(false) => {
self.shutdown();
bail!("failed to send packet to player, channel is full");
}
Err(e) => {
self.writer.close();
bail!("failed to send packet to player: {e}");
}
}
}
}
83 changes: 20 additions & 63 deletions crates/hyperion-proxy/src/egress.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{atomic::Ordering, Arc};
use std::sync::Arc;

use bvh::{Aabb, Bvh};
use bytes::Bytes;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Egress {
// imo it makes sense to read once... it is a fast loop
#[allow(clippy::significant_drop_in_scrutinee)]
for (player_id, player) in &players {
if !player.can_receive_broadcasts.load(Ordering::Relaxed) {
if !player.can_receive_broadcasts() {
continue;
}

Expand All @@ -94,21 +94,10 @@ impl Egress {
exclusions.clone(),
);

match player.writer.try_send(to_send) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
if let Some(result) = players.remove(player_id) {
result.shutdown();
}

warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(player_id) {
result.shutdown();
}
if let Err(e) = player.send(to_send) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(player_id) {
result.shutdown();
}
}
}
Expand All @@ -125,20 +114,10 @@ impl Egress {
tokio::spawn(
async move {
for (id, player) in &players {
match player.writer.try_send(OrderedBytes::FLUSH) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
if let Some(result) = players.remove(id) {
result.shutdown();
}
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
if let Err(e) = player.send(OrderedBytes::FLUSH) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
}
}
Expand Down Expand Up @@ -171,7 +150,7 @@ impl Egress {
continue;
};

if !player.can_receive_broadcasts.load(Ordering::Relaxed) {
if !player.can_receive_broadcasts() {
continue;
}

Expand All @@ -198,22 +177,10 @@ impl Egress {
exclusions: Some(exclusions.clone()),
};

match player.writer.try_send(to_send) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
if let Some(result) = players.remove(id) {
result.shutdown();
}
warn!(
"Failed to send data to player due to channel being full"
);
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
if let Err(e) = player.send(to_send) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(id) {
result.shutdown();
}
}
}
Expand Down Expand Up @@ -249,20 +216,10 @@ impl Egress {
};

// todo: handle error; kick player if cannot send (buffer full)
match player.writer.try_send(ordered) {
Ok(true) => {} // success
Ok(false) => {
// player is disconnect
if let Some(result) = players.remove(&id) {
result.shutdown();
}
warn!("Failed to send data to player due to channel being full");
}
Err(e) => {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(&id) {
result.shutdown();
}
if let Err(e) = player.send(ordered) {
warn!("Failed to send data to player: {:?}", e);
if let Some(result) = players.remove(&id) {
result.shutdown();
}
}

Expand All @@ -280,6 +237,6 @@ impl Egress {
return;
};

player.can_receive_broadcasts.store(true, Ordering::Relaxed);
player.enable_receive_broadcasts();
}
}
7 changes: 2 additions & 5 deletions crates/hyperion-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
clippy::future_not_send
)]

use std::{fmt::Debug, sync::atomic::AtomicBool};
use std::fmt::Debug;

use anyhow::Context;
use colored::Colorize;
Expand Down Expand Up @@ -195,10 +195,7 @@ async fn connect_to_server_and_run_proxy(

// todo: re-add bounding but issues if have MASSIVE number of packets
let (tx, rx) = kanal::bounded_async(MAX_PLAYER_PENDING_MESSAGES);
registry.insert(player_id_on, PlayerHandle {
writer: tx,
can_receive_broadcasts: AtomicBool::new(false),
});
registry.insert(player_id_on, PlayerHandle::new(tx));

// todo: some SlotMap like thing
debug!("got player with id {player_id_on:?}");
Expand Down
1 change: 1 addition & 0 deletions crates/hyperion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rustc_version.workspace = true
colored = "2.1.0"
flate2 = {workspace = true, features = ["zlib-ng"]}
glam = {workspace = true, features = ["serde"]}
replace_with = "0.1.7"
rkyv = "0.8.8"
roaring = {workspace = true, features = ["simd"]}
serde = {workspace = true, features = ["derive"]}
Expand Down
22 changes: 16 additions & 6 deletions crates/hyperion/src/egress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use byteorder::WriteBytesExt;
use flecs_ecs::prelude::*;
use hyperion_proto::{Flush, ServerToProxyMessage, UpdatePlayerChunkPositions};
use rkyv::util::AlignedVec;
use tracing::{error, trace_span};
use tracing::{error, info_span};
use valence_protocol::{packets::play, VarInt};

use crate::{net::Compose, simulation::EgressComm};
Expand All @@ -19,6 +19,7 @@ use sync_chunks::SyncChunksModule;
use sync_position::SyncPositionModule;

use crate::{
ingress::GametickSpan,
net::NetworkStreamRef,
simulation::{blocks::Blocks, ChunkPosition},
system_registry::SystemId,
Expand Down Expand Up @@ -64,7 +65,7 @@ impl Module for EgressModule {
.multi_threaded()
.kind::<flecs::pipeline::OnUpdate>()
.each_iter(move |it: TableIter<'_, false>, _, (compose, mc)| {
let span = trace_span!("broadcast_chunk_deltas");
let span = info_span!("broadcast_chunk_deltas");
let _enter = span.enter();

let world = it.world();
Expand Down Expand Up @@ -104,11 +105,11 @@ impl Module for EgressModule {
)
.kind_id(pipeline)
.each(move |(compose, egress)| {
let span = trace_span!("egress");
let span = info_span!("egress");
let _enter = span.enter();

{
let span = trace_span!("chunk_positions");
let span = info_span!("chunk_positions");
let _enter = span.enter();

let mut stream = Vec::new();
Expand Down Expand Up @@ -167,15 +168,24 @@ impl Module for EgressModule {
"clear_bump",
world,
&mut Compose($),
&mut GametickSpan($)
)
.kind_id(pipeline)
.each(move |compose| {
let span = tracing::trace_span!("clear_bump");
.each(move |(compose, gametick_span)| {
let span = tracing::info_span!("clear_bump");
let _enter = span.enter();

for bump in &mut compose.bump {
bump.reset();
}

replace_with::replace_with_or_abort(gametick_span, |span| {
let GametickSpan::Entered(span) = span else {
panic!("gametick_span should be exited");
};

GametickSpan::Exited(span.exit())
});
});
}
}
Loading

0 comments on commit 937fbc2

Please sign in to comment.