Skip to content

Commit 1f28877

Browse files
authored
feat: support multiple connected proxies (#940)
Implements #916
1 parent c691c9e commit 1f28877

File tree

7 files changed

+476
-116
lines changed

7 files changed

+476
-116
lines changed

crates/hyperion/src/egress/channel.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use bevy::{ecs::world::OnDespawn, prelude::*};
2-
use hyperion_proto::{ServerToProxyMessage, UpdateChannelPosition, UpdateChannelPositions};
2+
use hyperion_proto::UpdateChannelPosition;
33
use hyperion_utils::EntityExt;
44
use tracing::error;
55
use valence_bytes::CowBytes;
66
use valence_protocol::{ByteAngle, RawBytes, VarInt, packets::play};
77

88
use crate::{
99
egress::metadata::show_all,
10-
net::{Channel, ChannelId, Compose, ConnectionId},
10+
net::{
11+
Channel, ChannelId, Compose, ConnectionId,
12+
intermediate::{IntermediateServerToProxyMessage, UpdateChannelPositions},
13+
},
1114
simulation::{
1215
Pitch, Position, RequestSubscribeChannelPackets, Uuid, Velocity, Yaw,
1316
entity_kind::EntityKind,
@@ -47,7 +50,7 @@ fn update_channel_positions(
4750

4851
compose
4952
.io_buf()
50-
.add_proxy_message(&ServerToProxyMessage::UpdateChannelPositions(
53+
.add_proxy_message(&IntermediateServerToProxyMessage::UpdateChannelPositions(
5154
UpdateChannelPositions { updates: &updates },
5255
));
5356
}

crates/hyperion/src/egress/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use bevy::prelude::*;
2-
use hyperion_proto::{ServerToProxyMessage, UpdatePlayerPositions};
32
use tracing::error;
43
use valence_protocol::{VarInt, packets::play::PlayerActionResponseS2c};
54

65
use crate::{
76
Blocks,
8-
net::{Compose, ConnectionId},
7+
net::{
8+
Compose, ConnectionId,
9+
intermediate::{IntermediateServerToProxyMessage, UpdatePlayerPositions},
10+
},
911
simulation::Position,
1012
};
1113
mod channel;
@@ -29,14 +31,14 @@ fn send_chunk_positions(
2931
let mut stream = Vec::with_capacity(count);
3032
let mut positions = Vec::with_capacity(count);
3133

32-
for (io, pos) in query.iter() {
33-
stream.push(io.inner());
34+
for (&io, pos) in query.iter() {
35+
stream.push(io);
3436
positions.push(hyperion_proto::ChunkPosition::from(pos.to_chunk()));
3537
}
3638

3739
let packet = UpdatePlayerPositions { stream, positions };
3840

39-
let chunk_positions = ServerToProxyMessage::UpdatePlayerPositions(packet);
41+
let chunk_positions = IntermediateServerToProxyMessage::UpdatePlayerPositions(packet);
4042

4143
compose.io_buf().add_proxy_message(&chunk_positions);
4244
}

crates/hyperion/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,21 +247,21 @@ impl Plugin for HyperionCore {
247247

248248
let global = Global::new(shared.clone());
249249

250-
let mut compose = Compose::new(shared.compression_level, global, IoBuf::default());
251-
252250
app.add_plugins(CommandChannelPlugin);
253251

254252
if let Some(address) = app.world().get_resource::<Endpoint>() {
255253
let crypto = app.world().resource::<Crypto>();
256254
let command_channel = app.world().resource::<CommandChannel>();
257-
let egress_comm =
258-
init_proxy_comms(&runtime, command_channel.clone(), address.0, crypto.clone());
259-
compose.io_buf_mut().add_egress_comm(egress_comm);
255+
init_proxy_comms(&runtime, command_channel.clone(), address.0, crypto.clone());
260256
} else {
261257
warn!("Endpoint was not set while loading HyperionCore");
262258
}
263259

264-
app.insert_resource(compose);
260+
app.insert_resource(Compose::new(
261+
shared.compression_level,
262+
global,
263+
IoBuf::default(),
264+
));
265265
app.insert_resource(runtime);
266266
app.insert_resource(CraftingRegistry::default());
267267
app.insert_resource(StreamLookup::default());
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
use hyperion_proto::{ChunkPosition, ServerToProxyMessage, UpdateChannelPosition};
2+
3+
use crate::net::{ConnectionId, ProxyId};
4+
5+
#[derive(Clone, PartialEq)]
6+
pub struct UpdatePlayerPositions {
7+
pub stream: Vec<ConnectionId>,
8+
pub positions: Vec<ChunkPosition>,
9+
}
10+
11+
#[derive(Clone, Copy, PartialEq, Eq)]
12+
pub struct AddChannel<'a> {
13+
pub channel_id: u32,
14+
15+
pub unsubscribe_packets: &'a [u8],
16+
}
17+
18+
#[derive(Clone, PartialEq)]
19+
pub struct UpdateChannelPositions<'a> {
20+
pub updates: &'a [UpdateChannelPosition],
21+
}
22+
23+
#[derive(Clone, Copy, PartialEq, Eq)]
24+
pub struct RemoveChannel {
25+
pub channel_id: u32,
26+
}
27+
28+
#[derive(Clone, Copy, PartialEq, Eq)]
29+
pub struct SubscribeChannelPackets<'a> {
30+
pub channel_id: u32,
31+
pub exclude: Option<ConnectionId>,
32+
33+
pub data: &'a [u8],
34+
}
35+
36+
#[derive(Clone, Copy, PartialEq, Eq)]
37+
pub struct SetReceiveBroadcasts {
38+
pub stream: ConnectionId,
39+
}
40+
41+
#[derive(Clone, PartialEq, Eq)]
42+
pub struct BroadcastGlobal<'a> {
43+
pub exclude: Option<ConnectionId>,
44+
45+
pub data: &'a [u8],
46+
}
47+
48+
#[derive(Clone, PartialEq)]
49+
pub struct BroadcastLocal<'a> {
50+
pub center: ChunkPosition,
51+
pub exclude: Option<ConnectionId>,
52+
53+
pub data: &'a [u8],
54+
}
55+
56+
#[derive(Clone, PartialEq, Eq)]
57+
pub struct BroadcastChannel<'a> {
58+
pub channel_id: u32,
59+
pub exclude: Option<ConnectionId>,
60+
61+
pub data: &'a [u8],
62+
}
63+
64+
#[derive(Clone, PartialEq, Eq)]
65+
pub struct Unicast<'a> {
66+
pub stream: ConnectionId,
67+
68+
pub data: &'a [u8],
69+
}
70+
71+
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
72+
pub struct Shutdown {
73+
pub stream: ConnectionId,
74+
}
75+
76+
#[derive(Clone, PartialEq)]
77+
pub enum IntermediateServerToProxyMessage<'a> {
78+
UpdatePlayerPositions(UpdatePlayerPositions),
79+
AddChannel(AddChannel<'a>),
80+
UpdateChannelPositions(UpdateChannelPositions<'a>),
81+
RemoveChannel(RemoveChannel),
82+
SubscribeChannelPackets(SubscribeChannelPackets<'a>),
83+
BroadcastGlobal(BroadcastGlobal<'a>),
84+
BroadcastLocal(BroadcastLocal<'a>),
85+
BroadcastChannel(BroadcastChannel<'a>),
86+
Unicast(Unicast<'a>),
87+
SetReceiveBroadcasts(SetReceiveBroadcasts),
88+
Shutdown(Shutdown),
89+
}
90+
91+
impl IntermediateServerToProxyMessage<'_> {
92+
/// Whether the result of [`IntermediateServerToProxyMessage::transform_for_proxy`] will be
93+
/// affected by the proxy id provided
94+
#[must_use]
95+
pub const fn affected_by_proxy(&self) -> bool {
96+
match self {
97+
Self::UpdatePlayerPositions(_)
98+
| Self::SubscribeChannelPackets(_)
99+
| Self::BroadcastGlobal(_)
100+
| Self::BroadcastLocal(_)
101+
| Self::BroadcastChannel(_)
102+
| Self::Unicast(_)
103+
| Self::SetReceiveBroadcasts(_)
104+
| Self::Shutdown(_) => true,
105+
Self::AddChannel(_) | Self::UpdateChannelPositions(_) | Self::RemoveChannel(_) => false,
106+
}
107+
}
108+
109+
/// Transforms an intermediate message to a message suitable for sending to a particular proxy.
110+
/// Returns `None` if this message should not be sent to the proxy.
111+
#[must_use]
112+
pub fn transform_for_proxy(&self, proxy_id: ProxyId) -> Option<ServerToProxyMessage<'_>> {
113+
let filter_map_connection_id =
114+
|id: ConnectionId| (id.proxy_id() == proxy_id).then(|| id.inner());
115+
match self {
116+
Self::UpdatePlayerPositions(message) => {
117+
Some(ServerToProxyMessage::UpdatePlayerPositions(
118+
hyperion_proto::UpdatePlayerPositions {
119+
stream: message
120+
.stream
121+
.iter()
122+
.copied()
123+
.filter_map(filter_map_connection_id)
124+
.collect::<Vec<_>>(),
125+
positions: message.positions.clone(),
126+
},
127+
))
128+
}
129+
Self::AddChannel(message) => Some(ServerToProxyMessage::AddChannel(
130+
hyperion_proto::AddChannel {
131+
channel_id: message.channel_id,
132+
unsubscribe_packets: message.unsubscribe_packets,
133+
},
134+
)),
135+
Self::UpdateChannelPositions(message) => {
136+
Some(ServerToProxyMessage::UpdateChannelPositions(
137+
hyperion_proto::UpdateChannelPositions {
138+
updates: message.updates,
139+
},
140+
))
141+
}
142+
Self::RemoveChannel(message) => Some(ServerToProxyMessage::RemoveChannel(
143+
hyperion_proto::RemoveChannel {
144+
channel_id: message.channel_id,
145+
},
146+
)),
147+
Self::SubscribeChannelPackets(message) => {
148+
Some(ServerToProxyMessage::SubscribeChannelPackets(
149+
hyperion_proto::SubscribeChannelPackets {
150+
channel_id: message.channel_id,
151+
exclude: message
152+
.exclude
153+
.and_then(filter_map_connection_id)
154+
.unwrap_or_default(),
155+
data: message.data,
156+
},
157+
))
158+
}
159+
Self::BroadcastGlobal(message) => Some(ServerToProxyMessage::BroadcastGlobal(
160+
hyperion_proto::BroadcastGlobal {
161+
exclude: message
162+
.exclude
163+
.and_then(filter_map_connection_id)
164+
.unwrap_or_default(),
165+
data: message.data,
166+
},
167+
)),
168+
Self::BroadcastLocal(message) => Some(ServerToProxyMessage::BroadcastLocal(
169+
hyperion_proto::BroadcastLocal {
170+
center: message.center,
171+
exclude: message
172+
.exclude
173+
.and_then(filter_map_connection_id)
174+
.unwrap_or_default(),
175+
data: message.data,
176+
},
177+
)),
178+
Self::BroadcastChannel(message) => Some(ServerToProxyMessage::BroadcastChannel(
179+
hyperion_proto::BroadcastChannel {
180+
channel_id: message.channel_id,
181+
exclude: message
182+
.exclude
183+
.and_then(filter_map_connection_id)
184+
.unwrap_or_default(),
185+
data: message.data,
186+
},
187+
)),
188+
Self::Unicast(message) => {
189+
Some(ServerToProxyMessage::Unicast(hyperion_proto::Unicast {
190+
stream: filter_map_connection_id(message.stream)?,
191+
data: message.data,
192+
}))
193+
}
194+
Self::SetReceiveBroadcasts(message) => Some(
195+
ServerToProxyMessage::SetReceiveBroadcasts(hyperion_proto::SetReceiveBroadcasts {
196+
stream: filter_map_connection_id(message.stream)?,
197+
}),
198+
),
199+
Self::Shutdown(message) => Some(ServerToProxyMessage::SetReceiveBroadcasts(
200+
hyperion_proto::SetReceiveBroadcasts {
201+
stream: filter_map_connection_id(message.stream)?,
202+
},
203+
)),
204+
}
205+
}
206+
}

0 commit comments

Comments
 (0)