Skip to content

Commit

Permalink
Merge pull request #6 from sigp/further-review
Browse files Browse the repository at this point in the history
Further Review - Simplification of some code
  • Loading branch information
emhane authored Jan 18, 2024
2 parents e3478c2 + fb54740 commit 21ea794
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 188 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ lru = {version = "0.7.1", default-features = false }
hashlink = "0.7.0"
delay_map = "0.3.0"
more-asserts = "0.2.2"
thiserror = "1.0.40"
derive_more = { version = "0.99.17", default-features = false, features = ["from", "display", "deref", "deref_mut"] }
async-trait = "0.1.74"

[dev-dependencies]
rand_07 = { package = "rand", version = "0.7" }
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ pub enum Discv5Error {
Io(std::io::Error),
}

/// An error occurred whilst attempting to hole punch NAT.
#[derive(Debug)]
pub enum NatError {
/// Initiator error.
Initiator(Discv5Error),
/// Relayer error.
Relay(Discv5Error),
/// Target error.
Target(Discv5Error),
}

macro_rules! impl_from_variant {
($(<$($generic: ident,)+>)*, $from_type: ty, $to_type: ty, $variant: path) => {
impl$(<$($generic,)+>)* From<$from_type> for $to_type {
Expand Down
170 changes: 98 additions & 72 deletions src/handler/mod.rs

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions src/handler/nat_hole_punch/utils.rs → src/handler/nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub const PORT_BIND_TRIES: usize = 4;
pub const USER_AND_DYNAMIC_PORTS: RangeInclusive<u16> = 1025..=u16::MAX;

/// Aggregates types necessary to implement nat hole punching for [`crate::handler::Handler`].
pub struct NatUtils {
pub struct Nat {
/// Ip mode as set in config.
pub ip_mode: IpMode,
/// This node has been observed to be behind a NAT.
Expand All @@ -42,7 +42,7 @@ pub struct NatUtils {
pub unreachable_enr_limit: Option<usize>,
}

impl NatUtils {
impl Nat {
pub fn new(
listen_sockets: &[SocketAddr],
local_enr: &Enr,
Expand All @@ -52,7 +52,7 @@ impl NatUtils {
session_cache_capacity: usize,
unreachable_enr_limit: Option<usize>,
) -> Self {
let mut nat_hole_puncher = NatUtils {
let mut nat = Nat {
ip_mode,
is_behind_nat: None,
new_peer_latest_relay_cache: LruCache::new(session_cache_capacity),
Expand All @@ -70,17 +70,17 @@ impl NatUtils {
local_enr.udp6(),
) {
(Some(ip), port, _, _) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip.into()), port);
nat.set_is_behind_nat(listen_sockets, Some(ip.into()), port);
}
(_, _, Some(ip6), port) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip6.into()), port);
nat.set_is_behind_nat(listen_sockets, Some(ip6.into()), port);
}
(None, Some(port), _, _) | (_, _, None, Some(port)) => {
nat_hole_puncher.set_is_behind_nat(listen_sockets, None, Some(port));
nat.set_is_behind_nat(listen_sockets, None, Some(port));
}
(None, None, None, None) => {}
}
nat_hole_puncher
nat
}

pub fn track(&mut self, peer_socket: SocketAddr) {
Expand Down
14 changes: 0 additions & 14 deletions src/handler/nat_hole_punch/error.rs

This file was deleted.

50 changes: 0 additions & 50 deletions src/handler/nat_hole_punch/mod.rs

This file was deleted.

28 changes: 18 additions & 10 deletions src/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn build_handler_with_listen_config<P: ProtocolIdentity>(
let (service_send, handler_recv) = mpsc::channel(50);
let (exit_tx, exit) = oneshot::channel();

let nat_utils = NatUtils::new(
let nat = Nat::new(
&listen_sockets,
&enr,
config.listen_config.ip_mode(),
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn build_handler_with_listen_config<P: ProtocolIdentity>(
service_send,
listen_sockets,
socket,
nat_utils,
nat,
exit,
},
MockService {
Expand Down Expand Up @@ -191,9 +191,11 @@ async fn simple_session_message() {
loop {
if let Some(message) = receiver_recv.recv().await {
match message {
HandlerOut::WhoAreYou(wru_ref) => {
let _ =
recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref)) => {
let _ = recv_send.send(HandlerIn::EnrResponse(
Some(sender_enr.clone()),
EnrRequestData::WhoAreYou(wru_ref),
));
}
HandlerOut::Request(_, request) => {
assert_eq!(request, send_message);
Expand Down Expand Up @@ -307,8 +309,11 @@ async fn multiple_messages() {
let receiver = async move {
loop {
match receiver_handler.recv().await {
Some(HandlerOut::WhoAreYou(wru_ref)) => {
let _ = recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone())));
Some(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref))) => {
let _ = recv_send.send(HandlerIn::EnrResponse(
Some(sender_enr.clone()),
EnrRequestData::WhoAreYou(wru_ref),
));
}
Some(HandlerOut::Request(addr, request)) => {
assert_eq!(request, recv_send_message);
Expand Down Expand Up @@ -551,8 +556,11 @@ async fn nat_hole_punch_relay() {
let mock_service_handle = tokio::spawn(async move {
let service_msg = rx.recv().await.expect("should receive service message");
match service_msg {
HandlerOut::FindHolePunchEnr(relay_init) => tx
.send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_init))
HandlerOut::RequestEnr(EnrRequestData::Nat(relay_init)) => tx
.send(HandlerIn::EnrResponse(
Some(tgt_enr_clone),
EnrRequestData::Nat(relay_init),
))
.expect("should send message to handler"),
_ => panic!("service message should be 'find hole punch enr'"),
}
Expand Down Expand Up @@ -641,7 +649,7 @@ async fn nat_hole_punch_target() {
build_handler_with_listen_config::<DefaultProtocolId>(listen_config).await;
let tgt_addr = handler.enr.read().udp4_socket().unwrap().into();
let tgt_node_id = handler.enr.read().node_id();
handler.nat_utils.is_behind_nat = Some(true);
handler.nat.is_behind_nat = Some(true);

// Relay
let relay_enr = {
Expand Down
63 changes: 30 additions & 33 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use self::{
};
use crate::{
error::{RequestError, ResponseError},
handler::{Handler, HandlerIn, HandlerOut},
handler::{EnrRequestData, Handler, HandlerIn, HandlerOut},
kbucket::{
self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable,
NodeStatus, UpdateResult, MAX_NODES_PER_BUCKET,
Expand Down Expand Up @@ -387,71 +387,68 @@ impl Service {
HandlerOut::Response(node_address, response) => {
self.handle_rpc_response(node_address, *response);
}
HandlerOut::WhoAreYou(whoareyou_ref) => {
HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref)) => {
// check what our latest known ENR is for this node.
if let Some(known_enr) = self.find_enr(&whoareyou_ref.0.node_id) {
if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, Some(known_enr))) {
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(known_enr), EnrRequestData::WhoAreYou(whoareyou_ref))) {
warn!("Failed to send whoareyou {}", e);
};
} else {
// do not know of this peer
debug!("NodeId unknown, requesting ENR. {}", whoareyou_ref.0);
if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, None)) {
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(None, EnrRequestData::WhoAreYou(whoareyou_ref))) {
warn!("Failed to send who are you to unknown enr peer {}", e);
}
}
}
HandlerOut::RequestFailed(request_id, error) => {
if let RequestError::Timeout = error {
debug!("RPC Request timed out. id: {}", request_id);
} else {
warn!("RPC Request failed: id: {}, error {:?}", request_id, error);
}
self.rpc_failure(request_id, error);
}
HandlerOut::FindHolePunchEnr(relay_init) => {
// update initiator's enr if it's in kbuckets
let inr_enr = relay_init.initiator_enr();
let inr_key = kbucket::Key::from(inr_enr.node_id());
match self.kbuckets.write().entry(&inr_key) {
HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation)) => {
// Update initiator's Enr if it's in kbuckets
let initiator_enr = relay_initiation.initiator_enr();
let initiator_key = kbucket::Key::from(initiator_enr.node_id());
match self.kbuckets.write().entry(&initiator_key) {
kbucket::Entry::Present(ref mut entry, _) => {
let enr = entry.value_mut();
if enr.seq() < inr_enr.seq() {
*enr = inr_enr.clone();
if enr.seq() < initiator_enr.seq() {
*enr = initiator_enr.clone();
}
}
kbucket::Entry::Pending(ref mut entry, _) => {
let enr = entry.value_mut();
if enr.seq() < inr_enr.seq() {
*enr = inr_enr.clone();
if enr.seq() < initiator_enr.seq() {
*enr = initiator_enr.clone();
}
}
_ => ()
}
// check if we know the target node id in our routing table, otherwise
// drop relay attempt.
let tgt_node_id = relay_init.target_node_id();
let tgt_key = kbucket::Key::from(tgt_node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&tgt_key) {
let tgt_enr = entry.value().clone();
if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(tgt_enr, relay_init)) {
let target_node_id = relay_initiation.target_node_id();
let target_key = kbucket::Key::from(target_node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&target_key) {
let target_enr = entry.value().clone();
if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation))) {
warn!(
"Failed to send target enr to relay process, error: {e}"
);
}
} else {
// todo(emhane): ban peers that ask us to relay to a peer we very
// unlikely could have sent to them in a NODES response.
let inr_node_id = relay_init.initiator_enr().node_id();

let initiator_node_id = relay_initiation.initiator_enr().node_id();
warn!(
inr_node_id=%inr_node_id,
tgt_node_id=%tgt_node_id,
initiator_node_id=%initiator_node_id,
target_node_id=%target_node_id,
"Peer requested relaying to a peer not in k-buckets"
);
}
},
HandlerOut::PingAllPeers => self.ping_connected_peers(),
HandlerOut::RequestFailed(request_id, error) => {
if let RequestError::Timeout = error {
debug!("RPC Request timed out. id: {}", request_id);
} else {
warn!("RPC Request failed: id: {}, error {:?}", request_id, error);
}
self.rpc_failure(request_id, error);
}
HandlerOut::PingAllPeers => self.ping_connected_peers()
}
}
event = Service::bucket_maintenance_poll(&self.kbuckets) => {
Expand Down

0 comments on commit 21ea794

Please sign in to comment.