Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further Review - Simplification of some code #6

Merged
merged 8 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ lru = {version = "0.7.1", default-features = false }
hashlink = "0.7.0"
delay_map = "0.3.0"
more-asserts = "0.2.2"
thiserror = "1.0.40"
derive_more = { version = "0.99.17", default-features = false, features = ["from", "display", "deref", "deref_mut"] }
async-trait = "0.1.74"

[dev-dependencies]
rand_07 = { package = "rand", version = "0.7" }
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ pub enum Discv5Error {
Io(std::io::Error),
}

/// An error occurred whilst attempting to hole punch NAT.
#[derive(Debug)]
pub enum NatError {
/// Initiator error.
Initiator(Discv5Error),
/// Relayer error.
Relay(Discv5Error),
/// Target error.
Target(Discv5Error),
}

macro_rules! impl_from_variant {
($(<$($generic: ident,)+>)*, $from_type: ty, $to_type: ty, $variant: path) => {
impl$(<$($generic,)+>)* From<$from_type> for $to_type {
Expand Down
168 changes: 96 additions & 72 deletions src/handler/mod.rs

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions src/handler/nat_hole_punch/utils.rs → src/handler/nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub const PORT_BIND_TRIES: usize = 4;
pub const USER_AND_DYNAMIC_PORTS: RangeInclusive<u16> = 1025..=u16::MAX;

/// Aggregates types necessary to implement nat hole punching for [`crate::handler::Handler`].
pub struct NatUtils {
pub struct Nat {
/// Ip mode as set in config.
pub ip_mode: IpMode,
/// This node has been observed to be behind a NAT.
Expand All @@ -42,7 +42,7 @@ pub struct NatUtils {
pub unreachable_enr_limit: Option<usize>,
}

impl NatUtils {
impl Nat {
pub fn new(
listen_sockets: &[SocketAddr],
local_enr: &Enr,
Expand All @@ -52,7 +52,7 @@ impl NatUtils {
session_cache_capacity: usize,
unreachable_enr_limit: Option<usize>,
) -> Self {
let mut nat_hole_puncher = NatUtils {
let mut nat = Nat {
ip_mode,
is_behind_nat: None,
new_peer_latest_relay_cache: LruCache::new(session_cache_capacity),
Expand All @@ -70,17 +70,17 @@ impl NatUtils {
local_enr.udp6(),
) {
(Some(ip), port, _, _) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip.into()), port);
nat.set_is_behind_nat(listen_sockets, Some(ip.into()), port);
}
(_, _, Some(ip6), port) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip6.into()), port);
nat.set_is_behind_nat(listen_sockets, Some(ip6.into()), port);
}
(None, Some(port), _, _) | (_, _, None, Some(port)) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, None, Some(port));
nat.set_is_behind_nat(listen_sockets, None, Some(port));
}
(None, None, None, None) => {}
}
nat_hole_puncher
nat
}

pub fn track(&mut self, peer_socket: SocketAddr) {
Expand Down
14 changes: 0 additions & 14 deletions src/handler/nat_hole_punch/error.rs

This file was deleted.

50 changes: 0 additions & 50 deletions src/handler/nat_hole_punch/mod.rs

This file was deleted.

28 changes: 18 additions & 10 deletions src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn build_handler_with_listen_config<P: ProtocolIdentity>(
let (service_send, handler_recv) = mpsc::channel(50);
let (exit_tx, exit) = oneshot::channel();

let nat_utils = NatUtils::new(
let nat = Nat::new(
&listen_sockets,
&enr,
config.listen_config.ip_mode(),
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn build_handler_with_listen_config<P: ProtocolIdentity>(
service_send,
listen_sockets,
socket,
nat_utils,
nat,
exit,
},
MockService {
Expand Down Expand Up @@ -191,9 +191,11 @@ async fn simple_session_message() {
loop {
if let Some(message) = receiver_recv.recv().await {
match message {
HandlerOut::WhoAreYou(wru_ref) => {
let _ =
recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref)) => {
let _ = recv_send.send(HandlerIn::EnrResponse(
Some(sender_enr.clone()),
EnrRequestData::WhoAreYou(wru_ref),
));
}
HandlerOut::Request(_, request) => {
assert_eq!(request, send_message);
Expand Down Expand Up @@ -307,8 +309,11 @@ async fn multiple_messages() {
let receiver = async move {
loop {
match receiver_handler.recv().await {
Some(HandlerOut::WhoAreYou(wru_ref)) => {
let _ = recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
Some(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref))) => {
let _ = recv_send.send(HandlerIn::EnrResponse(
Some(sender_enr.clone()),
EnrRequestData::WhoAreYou(wru_ref),
));
}
Some(HandlerOut::Request(addr, request)) => {
assert_eq!(request, recv_send_message);
Expand Down Expand Up @@ -551,8 +556,11 @@ async fn nat_hole_punch_relay() {
let mock_service_handle = tokio::spawn(async move {
let service_msg = rx.recv().await.expect("should receive service message");
match service_msg {
HandlerOut::FindHolePunchEnr(relay_init) => tx
.send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_init))
HandlerOut::RequestEnr(EnrRequestData::Nat(relay_init)) => tx
.send(HandlerIn::EnrResponse(
Some(tgt_enr_clone),
EnrRequestData::Nat(relay_init),
))
.expect("should send message to handler"),
_ => panic!("service message should be 'find hole punch enr'"),
}
Expand Down Expand Up @@ -641,7 +649,7 @@ async fn nat_hole_punch_target() {
build_handler_with_listen_config::<DefaultProtocolId>(listen_config).await;
let tgt_addr = handler.enr.read().udp4_socket().unwrap().into();
let tgt_node_id = handler.enr.read().node_id();
handler.nat_utils.is_behind_nat = Some(true);
handler.nat.is_behind_nat = Some(true);

// Relay
let relay_enr = {
Expand Down
63 changes: 30 additions & 33 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use self::{
};
use crate::{
error::{RequestError, ResponseError},
handler::{Handler, HandlerIn, HandlerOut},
handler::{EnrRequestData, Handler, HandlerIn, HandlerOut},
kbucket::{
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
NodeStatus, UpdateResult, MAX_NODES_PER_BUCKET,
Expand Down Expand Up @@ -387,71 +387,68 @@ impl Service {
HandlerOut::Response(node_address, response) => {
self.handle_rpc_response(node_address, *response);
}
HandlerOut::WhoAreYou(whoareyou_ref) => {
HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref)) => {
// check what our latest known ENR is for this node.
if let Some(known_enr) = self.find_enr(&whoareyou_ref.0.node_id) {
if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, Some(known_enr))) {
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(known_enr), EnrRequestData::WhoAreYou(whoareyou_ref))) {
warn!("Failed to send whoareyou {}", e);
};
} else {
// do not know of this peer
debug!("NodeId unknown, requesting ENR. {}", whoareyou_ref.0);
if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, None)) {
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(None, EnrRequestData::WhoAreYou(whoareyou_ref))) {
warn!("Failed to send who are you to unknown enr peer {}", e);
}
}
}
HandlerOut::RequestFailed(request_id, error) => {
if let RequestError::Timeout = error {
debug!("RPC Request timed out. id: {}", request_id);
} else {
warn!("RPC Request failed: id: {}, error {:?}", request_id, error);
}
self.rpc_failure(request_id, error);
}
HandlerOut::FindHolePunchEnr(relay_init) => {
// update initiator's enr if it's in kbuckets
let inr_enr = relay_init.initiator_enr();
let inr_key = kbucket::Key::from(inr_enr.node_id());
match self.kbuckets.write().entry(&inr_key) {
HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiator)) => {
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
// Update initiator's Enr if it's in kbuckets
let initiator_enr = relay_initiator.initiator_enr();
let initiator_key = kbucket::Key::from(initiator_enr.node_id());
match self.kbuckets.write().entry(&initiator_key) {
kbucket::Entry::Present(ref mut entry, _) => {
let enr = entry.value_mut();
if enr.seq() < inr_enr.seq() {
*enr = inr_enr.clone();
if enr.seq() < initiator_enr.seq() {
*enr = initiator_enr.clone();
}
}
kbucket::Entry::Pending(ref mut entry, _) => {
let enr = entry.value_mut();
if enr.seq() < inr_enr.seq() {
*enr = inr_enr.clone();
if enr.seq() < initiator_enr.seq() {
*enr = initiator_enr.clone();
}
}
_ => ()
}
// check if we know the target node id in our routing table, otherwise
// drop relay attempt.
let tgt_node_id = relay_init.target_node_id();
let tgt_key = kbucket::Key::from(tgt_node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&tgt_key) {
let tgt_enr = entry.value().clone();
if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(tgt_enr, relay_init)) {
let target_node_id = relay_initiator.target_node_id();
let target_key = kbucket::Key::from(target_node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&target_key) {
let target_enr = entry.value().clone();
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiator))) {
warn!(
"Failed to send target enr to relay process, error: {e}"
);
}
} else {
// todo(emhane): ban peers that ask us to relay to a peer we very
// unlikely could have sent to them in a NODES response.
let inr_node_id = relay_init.initiator_enr().node_id();

let initiator_node_id = relay_initiator.initiator_enr().node_id();
warn!(
inr_node_id=%inr_node_id,
tgt_node_id=%tgt_node_id,
initiator_node_id=%initiator_node_id,
target_node_id=%target_node_id,
"Peer requested relaying to a peer not in k-buckets"
);
}
},
HandlerOut::PingAllPeers => self.ping_connected_peers(),
HandlerOut::RequestFailed(request_id, error) => {
if let RequestError::Timeout = error {
debug!("RPC Request timed out. id: {}", request_id);
} else {
warn!("RPC Request failed: id: {}, error {:?}", request_id, error);
}
self.rpc_failure(request_id, error);
}
HandlerOut::PingAllPeers => self.ping_connected_peers()
}
}
event = Service::bucket_maintenance_poll(&self.kbuckets) => {
Expand Down