Skip to content

fix: player join #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/hyperion-proxy/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ pub fn initiate_player_connection(
packet_writer.enqueue_packet(outgoing_packet);
}
}

// Ensure that the client receives any final packets, especially a disconnect message
// packet if present, when the connection is shut down. Send errors are ignored.
if let Err(e) = packet_writer.flush_pending_packets().await {
warn!("Error flushing packets to player: {e:?}");
}
});

tokio::task::spawn(async move {
Expand Down
8 changes: 0 additions & 8 deletions crates/hyperion/src/common/util/sendable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,3 @@ pub struct SendableRef<'a>(pub WorldRef<'a>);

unsafe impl Send for SendableRef<'_> {}
unsafe impl Sync for SendableRef<'_> {}

pub struct SendableQuery<T>(pub Query<T>)
where
T: QueryTuple;

#[expect(clippy::non_send_fields_in_send_ty)]
unsafe impl<T: QueryTuple + Send> Send for SendableQuery<T> {}
unsafe impl<T: QueryTuple> Sync for SendableQuery<T> {}
205 changes: 134 additions & 71 deletions crates/hyperion/src/egress/player_join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{borrow::Cow, collections::BTreeSet, ops::Index};

use anyhow::Context;
use anyhow::{Context, bail};
use flecs_ecs::prelude::*;
use glam::DVec3;
use hyperion_crafting::{Action, CraftingRegistry, RecipeBookState};
use hyperion_utils::EntityExt;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tracing::{info, instrument, warn};
use valence_protocol::{
ByteAngle, GameMode, Ident, PacketEncoder, RawBytes, VarInt, Velocity,
Expand All @@ -21,7 +20,7 @@ use valence_registry::{BiomeRegistry, RegistryCodec};
use valence_server::entity::EntityKind;
use valence_text::IntoText;

use crate::simulation::{MovementTracking, PacketState, Pitch};
use crate::simulation::{IgnMap, MovementTracking, PacketState, Pitch};

mod list;
pub use list::*;
Expand All @@ -37,19 +36,19 @@ use crate::{
skin::PlayerSkin,
util::registry_codec_raw,
},
util::{SendableQuery, SendableRef},
util::SendableRef,
};

#[expect(
clippy::too_many_arguments,
reason = "todo: we should refactor at some point"
)]
#[instrument(skip_all, fields(name = name))]
#[instrument(skip_all, fields(name = &***name))]
pub fn player_join_world(
entity: &EntityView<'_>,
compose: &Compose,
uuid: uuid::Uuid,
name: &str,
name: &Name,
io: ConnectionId,
position: &Position,
yaw: &Yaw,
Expand All @@ -69,6 +68,7 @@ pub fn player_join_world(
)>,
crafting_registry: &CraftingRegistry,
config: &Config,
ign_map: &IgnMap,
) -> anyhow::Result<()> {
static CACHED_DATA: once_cell::sync::OnceCell<bytes::Bytes> = once_cell::sync::OnceCell::new();

Expand Down Expand Up @@ -322,7 +322,7 @@ pub fn player_join_world(
.add_packet(&pkt)
.context("failed to send player list packet")?;

let player_name = vec![name];
let player_name = vec![&***name];

compose
.broadcast(
Expand Down Expand Up @@ -374,6 +374,64 @@ pub fn player_join_world(

bundle.unicast(io)?;

// The player must be added to the ign map after all of its components have been set and ready
// to receive play packets because other threads may attempt to process the player once it is
// added to the ign map.
if let Some(previous_player) = ign_map.insert((**name).clone(), entity.id()) {
// Disconnect either this player or the previous player with the same username.
// There are some Minecraft accounts with the same username, but this is an extremely
// rare edge case which is not worth handling.
let previous_player = previous_player.entity_view(world);

let pkt = play::DisconnectS2c {
reason: "A different player with the same username as your account has joined on a \
different device"
.into_cow_text(),
};

match previous_player.get_name() {
None => {
// previous_player must be getting processed in another thread in player_join_world
// because it is in ign_map but does not have a name yet. To avoid having two
// entities with the same name, which would cause flecs to abort, this code
// disconnects the current player. In the worst-case scenario, both players may get
// disconnected, which is okay because the players can reconnect.

warn!(
"two players are simultanenously connecting with the same username '{name}'. \
one player will be disconnected."
);

compose.unicast(&pkt, io, system)?;
compose.io_buf().shutdown(io, world);
bail!("another player with the same username is joining");
}
Some(previous_player_name) => {
// Kick the previous player with the same name. One player should only be able to connect
// to the server one time simultaneously, so if the same player connects to this server
// multiple times, the other connection should be disconnected. In general, this wants to
// disconnect the older player connection because the alternative solution of repeatedly kicking
// new player join attempts if an old player connection is somehow still alive would lead to bad
// user experience.
assert_eq!(previous_player_name, &***name);

warn!(
"player {name} has joined with the same username of an already-connected \
player. the previous player with the username will be disconnected."
);

previous_player.remove_name();

let previous_stream_id = previous_player.get::<&ConnectionId>(|id| *id);

compose.unicast(&pkt, previous_stream_id, system)?;
compose.io_buf().shutdown(previous_stream_id, world);
}
}
}

entity.set_name(name);

info!("{name} joined the world");

Ok(())
Expand Down Expand Up @@ -505,8 +563,6 @@ impl Module for PlayerJoinModule {
&EntityFlags,
)>();

let query = SendableQuery(query);

let rayon_threads = rayon::current_num_threads();

#[expect(
Expand Down Expand Up @@ -536,78 +592,85 @@ impl Module for PlayerJoinModule {
let root_command = root_command.id();

system!(
"player_joins",
"update_skins",
world,
&Comms($),
)
.kind(id::<flecs::pipeline::PreUpdate>())
.each_iter(move |it, _, comms| {
let world = it.world();
while let Ok(Some((entity, skin))) = comms.skins_rx.try_recv() {
let entity = world.entity_from_id(entity);
entity.set(skin);
}
});

system!(
"player_join_world",
world,
&Compose($),
&CraftingRegistry($),
&Config($),
&RayonWorldStages($),
&IgnMap($),
&Uuid,
&Name,
&Position,
&Yaw,
&Pitch,
&ConnectionId,
&PlayerSkin,
)
.kind(id::<flecs::pipeline::PreUpdate>())
.with_enum(PacketState::PendingPlay)
.kind(id::<flecs::pipeline::OnUpdate>())
.multi_threaded()
.each_iter(
move |it, _, (comms, compose, crafting_registry, config, stages)| {
let span = tracing::info_span!("joins");
move |it,
row,
(
compose,
crafting_registry,
config,
ign_map,
uuid,
name,
position,
yaw,
pitch,
stream_id,
skin,
)| {
let span = tracing::info_span!("player_join_world");
let _enter = span.enter();

let system = it.system().id();

let mut skins = Vec::new();

while let Ok(Some((entity, skin))) = comms.skins_rx.try_recv() {
skins.push((entity, skin.clone()));
}

// todo: par_iter but bugs...
// for (entity, skin) in skins {
skins.into_par_iter().for_each(|(entity, skin)| {
// if we are not in rayon context that means we are in a single-threaded context and 0 will work
let idx = rayon::current_thread_index().unwrap_or(0);
let world = &stages[idx];

let system = system.entity_view(world);

if !world.is_alive(entity) {
return;
}

let entity = world.entity_from_id(entity);

entity.get::<(&Uuid, &Name, &Position, &Yaw, &Pitch, &ConnectionId)>(
|(uuid, name, position, yaw, pitch, &stream_id)| {
let query = &query;
let query = &query.0;
entity.set_name(name);

// if we get an error joining, we should kick the player
if let Err(e) = player_join_world(
&entity,
compose,
uuid.0,
name,
stream_id,
position,
yaw,
pitch,
world,
&skin,
system,
root_command,
query,
crafting_registry,
config,
) {
warn!("player_join_world error: {e:?}");
compose.io_buf().shutdown(stream_id, world);
}
},
);

let entity = world.entity_from_id(entity);
entity.set(skin);

let system = it.system();
let world = it.world();
let entity = it.entity(row).expect("row must be in bounds");

// if we get an error joining, we should kick the player
if let Err(e) = player_join_world(
&entity,
compose,
uuid.0,
name,
*stream_id,
position,
yaw,
pitch,
&world,
skin,
system,
root_command,
&query,
crafting_registry,
config,
ign_map,
) {
warn!("player_join_world error: {e:?}");
compose.io_buf().shutdown(*stream_id, &world);
entity.add_enum(PacketState::Terminate);
} else {
entity.add_enum(PacketState::Play);
});
}
},
);
}
Expand Down
Loading
Loading