diff --git a/Cargo.toml b/Cargo.toml index 4d911c3b4..351c7d62d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,7 @@ lru = {version = "0.7.1", default-features = false } hashlink = "0.7.0" delay_map = "0.3.0" more-asserts = "0.2.2" -thiserror = "1.0.40" derive_more = { version = "0.99.17", default-features = false, features = ["from", "display", "deref", "deref_mut"] } -async-trait = "0.1.74" [dev-dependencies] rand_07 = { package = "rand", version = "0.7" } diff --git a/src/error.rs b/src/error.rs index e0c9fb4e2..7b9198d8c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -56,6 +56,17 @@ pub enum Discv5Error { Io(std::io::Error), } +/// An error occurred whilst attempting to hole punch NAT. +#[derive(Debug)] +pub enum NatError { + /// Initiator error. + Initiator(Discv5Error), + /// Relayer error. + Relay(Discv5Error), + /// Target error. + Target(Discv5Error), +} + macro_rules! impl_from_variant { ($(<$($generic: ident,)+>)*, $from_type: ty, $to_type: ty, $variant: path) => { impl$(<$($generic,)+>)* From<$from_type> for $to_type { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 745747ade..74a54f348 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -29,7 +29,7 @@ use crate::{ config::Discv5Config, discv5::PERMIT_BAN_LIST, - error::{Discv5Error, RequestError}, + error::{Discv5Error, NatError, RequestError}, packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity}, rpc::{ Message, Payload, RelayInitNotification, RelayMsgNotification, Request, RequestBody, @@ -59,7 +59,7 @@ use tracing::{debug, error, trace, warn}; mod active_requests; mod crypto; -mod nat_hole_punch; +mod nat; mod request_call; mod session; mod tests; @@ -69,7 +69,7 @@ pub use crate::node_info::{NodeAddress, NodeContact}; use crate::{lru_time_cache::LruTimeCache, socket::ListenConfig}; use active_requests::ActiveRequests; -use nat_hole_punch::{Error as NatError, HolePunchNat, NatUtils}; +use nat::Nat; use request_call::RequestCall; use session::Session; @@ -85,7 +85,6 @@ const ONE_TIME_SESSION_CACHE_CAPACITY: usize = 100; /// Messages sent from the application layer to `Handler`. #[derive(Debug, Clone, PartialEq, Eq)] -#[allow(clippy::large_enum_variant)] pub enum HandlerIn { /// A Request to send to a `NodeContact` has been received from the application layer. A /// `NodeContact` is an abstract type that allows for either an ENR to be sent or a `Raw` type @@ -108,15 +107,9 @@ pub enum HandlerIn { /// response back to the `NodeAddress` from which the request was received. Response(NodeAddress, Box), - /// A Random packet has been received and we have requested the application layer to inform - /// us what the highest known ENR is for this node. - /// The `WhoAreYouRef` is sent out in the `HandlerOut::WhoAreYou` event and should - /// be returned here to submit the application's response. - WhoAreYou(WhoAreYouRef, Option), - - /// A response to a [`HandlerOut::FindHolePunchEnr`]. Returns the ENR and the - /// [`RelayInitNotification`] from [`HandlerOut::FindHolePunchEnr`]. - HolePunchEnr(Enr, RelayInitNotification), + /// The application layer is responding with an ENR to a `RequestEnr` request. This function + /// returns the requested data and optionally and ENR if one is found. + EnrResponse(Option, EnrRequestData), /// Observed socket has been update. The old socket and the current socket. SocketUpdate(Option, SocketAddr), @@ -138,19 +131,15 @@ pub enum HandlerOut { /// A Response has been received from a node on the network. Response(NodeAddress, Box), - /// An unknown source has requested information from us. Return the reference with the known - /// ENR of this node (if known). See the `HandlerIn::WhoAreYou` variant. - WhoAreYou(WhoAreYouRef), + /// We need to request the ENR of a specific node. This could be due to an unknown ENR or a + /// hole punch request. + RequestEnr(EnrRequestData), /// An RPC request failed. /// /// This returns the request ID and an error indicating why the request failed. RequestFailed(RequestId, RequestError), - /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the - /// [`RelayMsgNotification`] we intend to send to it. - FindHolePunchEnr(RelayInitNotification), - /// Triggers a ping to all peers, outside of the regular ping interval. Needed to trigger /// renewed session establishment after updating the local ENR from unreachable to reachable /// and clearing all sessions. Only this way does the local node have a chance to make it into @@ -170,6 +159,19 @@ pub enum ConnectionDirection { Outgoing, } +/// The kind of request data being sent to the service. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EnrRequestData { + /// A Random packet has been received and request the application layer to inform + /// us what the highest known ENR is for this node. + /// The `WhoAreYouRef` is sent out in the `HandlerOut::WhoAreYou` event and should + /// be returned here to submit the application's response. + WhoAreYou(WhoAreYouRef), + /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the + /// [`RelayMsgNotification`] we intend to send to it. + Nat(RelayInitNotification), +} + /// A reference for the application layer to send back when the handler requests any known /// ENR for the NodeContact. #[derive(Debug, Clone, PartialEq, Eq)] @@ -233,8 +235,8 @@ pub struct Handler { socket: Socket, /// Exit channel to shutdown the handler. exit: oneshot::Receiver<()>, - /// Types necessary to plug in nat hole punching. - nat_utils: NatUtils, + /// Struct to handle nat hole punching logic. + nat: Nat, } type HandlerReturn = ( @@ -319,7 +321,7 @@ impl Handler { let sessions = LruTimeCache::new(session_timeout, Some(session_cache_capacity)); - let nat_utils = NatUtils::new( + let nat = Nat::new( &listen_sockets, &enr.read(), ip_mode, @@ -350,7 +352,7 @@ impl Handler { service_send, listen_sockets, socket, - nat_utils, + nat, exit, }; debug!("Handler Starting"); @@ -378,15 +380,19 @@ impl Handler { } } HandlerIn::Response(dst, response) => self.send_response::

(dst, *response).await, - HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge::

(wru_ref, enr).await, - HandlerIn::HolePunchEnr(tgt_enr, relay_init) => { + HandlerIn::EnrResponse(enr, EnrRequestData::WhoAreYou(wru_ref)) => self.send_challenge::

(wru_ref, enr).await, + HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation)) => { // Assemble the notification for the target - let (inr_enr, _tgt, timed_out_nonce) = relay_init.into(); - let relay_msg_notif = RelayMsgNotification::new(inr_enr, timed_out_nonce); - if let Err(e) = self.send_relay_msg_notif::

(tgt_enr, relay_msg_notif).await { - warn!("Failed to relay. Error: {}", e); + let (initiator_enr, _target, timed_out_nonce) = relay_initiation.into(); + let relay_msg_notification = RelayMsgNotification::new(initiator_enr, timed_out_nonce); + if let Err(e) = self.send_relay_msg_notification::

(target_enr, relay_msg_notification).await { + warn!("Failed to relay. Error: {:?}", e); } } + HandlerIn::EnrResponse(_,_) => {} // This handles the case that No ENR was + // found for a target relayer. This + // message never gets sent, so it is + // ignored. HandlerIn::SocketUpdate(old_socket, socket) => { let ip = socket.ip(); let port = socket.port(); @@ -408,7 +414,7 @@ impl Handler { warn!("Failed to inform that request failed {}", e); } } - self.nat_utils.set_is_behind_nat(&self.listen_sockets, Some(ip), Some(port)); + self.nat.set_is_behind_nat(&self.listen_sockets, Some(ip), Some(port)); } } } @@ -423,14 +429,14 @@ impl Handler { // challenge. We process them here self.send_next_request::

(node_address).await; } - Some(Ok(peer_socket)) = self.nat_utils.hole_punch_tracker.next() => { - if self.nat_utils.is_behind_nat == Some(false) { + Some(Ok(peer_socket)) = self.nat.hole_punch_tracker.next() => { + if self.nat.is_behind_nat == Some(false) { // Until ip voting is done and an observed public address is finalised, all nodes act as // if they are behind a NAT. return; } if let Err(e) = self.on_hole_punch_expired(peer_socket).await { - warn!("Failed to keep hole punched for peer, error: {}", e); + warn!("Failed to keep hole punched for peer, error: {:?}", e); } } _ = banned_nodes_check.tick() => self.unban_nodes_check(), // Unban nodes that are past the timeout @@ -539,7 +545,7 @@ impl Handler { if request_call.retries() >= self.request_retries { trace!("Request timed out with {}", node_address); if let Some(relay) = self - .nat_utils + .nat .new_peer_latest_relay_cache .pop(&node_address.node_id) { @@ -841,7 +847,7 @@ impl Handler { ConnectionDirection::Incoming }; - enr_not_reachable = NatUtils::is_enr_reachable(&enr); + enr_not_reachable = Nat::is_enr_reachable(&enr); // We already know the ENR. Send the handshake response packet trace!("Sending Authentication response to node: {}", node_address); @@ -933,12 +939,10 @@ impl Handler { // Keep count of the unreachable Sessions we are tracking // Peer is reachable - let enr_not_reachable = !NatUtils::is_enr_reachable(&most_recent_enr); + let enr_not_reachable = !Nat::is_enr_reachable(&most_recent_enr); // Decide whether to establish this connection based on our appetite for unreachable - if enr_not_reachable - && Some(self.sessions.tagged()) >= self.nat_utils.unreachable_enr_limit - { + if enr_not_reachable && Some(self.sessions.tagged()) >= self.nat.unreachable_enr_limit { debug!("Reached limit of unreachable ENR sessions. Avoiding a new connection. Limit: {}", self.sessions.tagged()); return; } @@ -967,7 +971,7 @@ impl Handler { ) .await; self.new_session(node_address.clone(), session, enr_not_reachable); - self.nat_utils + self.nat .new_peer_latest_relay_cache .pop(&node_address.node_id); self.handle_message::

( @@ -1142,14 +1146,17 @@ impl Handler { warn!("peer {node_address} tried to initiate hole punch attempt for another node {initiator_node_id}, banning peer {node_address}"); self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) .await; - let ban_timeout = self.nat_utils.ban_duration.map(|v| Instant::now() + v); + let ban_timeout = self.nat.ban_duration.map(|v| Instant::now() + v); PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } else if let Err(e) = self.on_relay_init(notification).await { - warn!("failed handling notification to relay for {node_address}, {e}"); + } else if let Err(e) = self.on_relay_initiation(notification).await { + warn!( + "failed handling notification to relay for {node_address}, {:?}", + e + ); } } Message::RelayMsgNotification(notification) => { - match self.nat_utils.is_behind_nat { + match self.nat.is_behind_nat { Some(false) => { // inr may not be malicious and initiated a hole punch attempt when // a request to this node timed out for another reason @@ -1157,7 +1164,10 @@ impl Handler { } _ => { if let Err(e) = self.on_relay_msg::

(notification).await { - warn!("failed handling notification relayed from {node_address}, {e}"); + warn!( + "failed handling notification relayed from {node_address}, {:?}", + e + ); } } } @@ -1210,7 +1220,9 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::WhoAreYou(whoareyou_ref)) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou( + whoareyou_ref, + ))) .await { warn!("Failed to send WhoAreYou to the service {}", e) @@ -1256,7 +1268,9 @@ impl Handler { let whoareyou_ref = WhoAreYouRef(node_address, message_nonce); if let Err(e) = self .service_send - .send(HandlerOut::WhoAreYou(whoareyou_ref)) + .send(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou( + whoareyou_ref, + ))) .await { warn!( @@ -1338,14 +1352,14 @@ impl Handler { // extra responses if let ResponseBody::Nodes { total, ref nodes } = response.body { for node in nodes { - if let Some(socket_addr) = self.nat_utils.ip_mode.get_contactable_addr(node) { + if let Some(socket_addr) = self.nat.ip_mode.get_contactable_addr(node) { let node_id = node.node_id(); let new_peer_node_address = NodeAddress { socket_addr, node_id, }; if self.sessions.peek(&new_peer_node_address).is_none() { - self.nat_utils + self.nat .new_peer_latest_relay_cache .put(node_id, node_address.clone()); } @@ -1477,7 +1491,7 @@ impl Handler { } } let node_address = request_call.contact().node_address(); - self.nat_utils + self.nat .new_peer_latest_relay_cache .pop(&node_address.node_id); self.fail_session(&node_address, error, remove_session) @@ -1497,7 +1511,7 @@ impl Handler { .active_sessions .store(self.sessions.len(), Ordering::Relaxed); // stop keeping hole punched for peer - self.nat_utils.untrack(&node_address.socket_addr); + self.nat.untrack(&node_address.socket_addr); } if let Some(to_remove) = self.pending_requests.remove(node_address) { for PendingRequest { request_id, .. } in to_remove { @@ -1534,7 +1548,7 @@ impl Handler { if let Err(e) = self.socket.send.send(packet).await { warn!("Failed to send outbound packet {}", e) } - self.nat_utils.track(dst); + self.nat.track(dst); } /// Check if any banned nodes have served their time and unban them. @@ -1585,8 +1599,11 @@ fn most_recent_enr(first: Option, second: Option) -> Result { } } -#[async_trait::async_trait] -impl HolePunchNat for Handler { +// NAT-related functions +impl Handler { + /// A request times out. Should trigger the initiation of a hole punch attempt, given a + /// transitive route to the target exists. Sends a RELAYINIT notification to the given + /// relay. async fn on_request_time_out( &mut self, relay: NodeAddress, @@ -1631,11 +1648,18 @@ impl HolePunchNat for Handler { Ok(()) } - async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), NatError> { + /// A RelayInit notification is received over discv5 indicating this node is the relay. Should + /// trigger sending a RelayMsg to the target. + async fn on_relay_initiation( + &mut self, + relay_initiation: RelayInitNotification, + ) -> Result<(), NatError> { // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send - .send(HandlerOut::FindHolePunchEnr(relay_init)) + .send(HandlerOut::RequestEnr(EnrRequestData::Nat( + relay_initiation, + ))) .await { return Err(NatError::Relay(e.into())); @@ -1643,16 +1667,17 @@ impl HolePunchNat for Handler { Ok(()) } + /// A RelayMsg notification is received over discv5 indicating this node is the target. Should + /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. async fn on_relay_msg( &mut self, relay_msg: RelayMsgNotification, ) -> Result<(), NatError> { let (inr_enr, timed_out_msg_nonce) = relay_msg.into(); - let initiator_node_address = - match NodeContact::try_from_enr(inr_enr, self.nat_utils.ip_mode) { - Ok(contact) => contact.node_address(), - Err(e) => return Err(NatError::Target(e.into())), - }; + let initiator_node_address = match NodeContact::try_from_enr(inr_enr, self.nat.ip_mode) { + Ok(contact) => contact.node_address(), + Err(e) => return Err(NatError::Target(e.into())), + }; // A session may already have been established. if self.sessions.get(&initiator_node_address).is_some() { @@ -1677,31 +1702,32 @@ impl HolePunchNat for Handler { Ok(()) } - async fn send_relay_msg_notif( + /// Send a RELAYMSG notification. + async fn send_relay_msg_notification( &mut self, - tgt_enr: Enr, - relay_msg_notif: RelayMsgNotification, + target_enr: Enr, + relay_msg_notification: RelayMsgNotification, ) -> Result<(), NatError> { - let tgt_node_address = match NodeContact::try_from_enr(tgt_enr, self.nat_utils.ip_mode) { + let target_node_address = match NodeContact::try_from_enr(target_enr, self.nat.ip_mode) { Ok(contact) => contact.node_address(), Err(e) => return Err(NatError::Relay(e.into())), }; - if let Some(session) = self.sessions.get_mut(&tgt_node_address) { + if let Some(session) = self.sessions.get_mut(&target_node_address) { trace!( - "Sending notif to target {}. relay msg: {}", - tgt_node_address.node_id, - relay_msg_notif, + "Sending notification to target {}. relay msg: {}", + target_node_address.node_id, + relay_msg_notification, ); // Encrypt the notification and send let packet = match session - .encrypt_session_message::

(self.node_id, &relay_msg_notif.encode()) + .encrypt_session_message::

(self.node_id, &relay_msg_notification.encode()) { Ok(packet) => packet, Err(e) => { return Err(NatError::Relay(e)); } }; - self.send(tgt_node_address, packet).await; + self.send(target_node_address, packet).await; Ok(()) } else { // Either the session is being established or has expired. We simply drop the diff --git a/src/handler/nat_hole_punch/utils.rs b/src/handler/nat.rs similarity index 94% rename from src/handler/nat_hole_punch/utils.rs rename to src/handler/nat.rs index 5674fb238..b9e859f65 100644 --- a/src/handler/nat_hole_punch/utils.rs +++ b/src/handler/nat.rs @@ -19,7 +19,7 @@ pub const PORT_BIND_TRIES: usize = 4; pub const USER_AND_DYNAMIC_PORTS: RangeInclusive = 1025..=u16::MAX; /// Aggregates types necessary to implement nat hole punching for [`crate::handler::Handler`]. -pub struct NatUtils { +pub struct Nat { /// Ip mode as set in config. pub ip_mode: IpMode, /// This node has been observed to be behind a NAT. @@ -42,7 +42,7 @@ pub struct NatUtils { pub unreachable_enr_limit: Option, } -impl NatUtils { +impl Nat { pub fn new( listen_sockets: &[SocketAddr], local_enr: &Enr, @@ -52,7 +52,7 @@ impl NatUtils { session_cache_capacity: usize, unreachable_enr_limit: Option, ) -> Self { - let mut nat_hole_puncher = NatUtils { + let mut nat = Nat { ip_mode, is_behind_nat: None, new_peer_latest_relay_cache: LruCache::new(session_cache_capacity), @@ -70,17 +70,17 @@ impl NatUtils { local_enr.udp6(), ) { (Some(ip), port, _, _) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip.into()), port); + nat.set_is_behind_nat(listen_sockets, Some(ip.into()), port); } (_, _, Some(ip6), port) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, Some(ip6.into()), port); + nat.set_is_behind_nat(listen_sockets, Some(ip6.into()), port); } (None, Some(port), _, _) | (_, _, None, Some(port)) => { - nat_hole_puncher.set_is_behind_nat(listen_sockets, None, Some(port)); + nat.set_is_behind_nat(listen_sockets, None, Some(port)); } (None, None, None, None) => {} } - nat_hole_puncher + nat } pub fn track(&mut self, peer_socket: SocketAddr) { diff --git a/src/handler/nat_hole_punch/error.rs b/src/handler/nat_hole_punch/error.rs deleted file mode 100644 index 58d48b522..000000000 --- a/src/handler/nat_hole_punch/error.rs +++ /dev/null @@ -1,14 +0,0 @@ -use thiserror::Error; - -use crate::Discv5Error; - -/// An error occurred whilst attempting to hole punch NAT. -#[derive(Debug, Error)] -pub enum Error { - #[error("NAT error, failed as initiator of a hole punch attempt, {0}")] - Initiator(Discv5Error), - #[error("NAT error, failed as relay of a hole punch attempt, {0}")] - Relay(Discv5Error), - #[error("NAT error, failed as target of a hole punch attempt, {0}")] - Target(Discv5Error), -} diff --git a/src/handler/nat_hole_punch/mod.rs b/src/handler/nat_hole_punch/mod.rs deleted file mode 100644 index e5b03a6fb..000000000 --- a/src/handler/nat_hole_punch/mod.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::net::SocketAddr; - -use crate::{ - node_info::NodeAddress, - packet::MessageNonce, - rpc::{RelayInitNotification, RelayMsgNotification}, - Enr, ProtocolIdentity, -}; - -mod error; -mod utils; - -pub use error::Error; -pub use utils::NatUtils; - -#[async_trait::async_trait] -pub trait HolePunchNat { - /// A request times out. Should trigger the initiation of a hole punch attempt, given a - /// transitive route to the target exists. Sends a RELAYINIT notification to the given - /// relay. - async fn on_request_time_out( - &mut self, - relay: NodeAddress, - local_enr: Enr, // initiator-enr - timed_out_nonce: MessageNonce, - target_node_address: NodeAddress, - ) -> Result<(), Error>; - - /// A RelayInit notification is received over discv5 indicating this node is the relay. Should - /// trigger sending a RelayMsg to the target. - async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), Error>; - - /// A RelayMsg notification is received over discv5 indicating this node is the target. Should - /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. - async fn on_relay_msg( - &mut self, - relay_msg: RelayMsgNotification, - ) -> Result<(), Error>; - - /// Send a RELAYMSG notification. - async fn send_relay_msg_notif( - &mut self, - tgt_enr: Enr, - relay_msg_notif: RelayMsgNotification, - ) -> Result<(), Error>; - - /// A hole punched for a peer closes. Should trigger an empty packet to be sent to the - /// peer to keep it open. - async fn on_hole_punch_expired(&mut self, peer: SocketAddr) -> Result<(), Error>; -} diff --git a/src/handler/tests.rs b/src/handler/tests.rs index b82099010..c2358c394 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -75,7 +75,7 @@ async fn build_handler_with_listen_config( let (service_send, handler_recv) = mpsc::channel(50); let (exit_tx, exit) = oneshot::channel(); - let nat_utils = NatUtils::new( + let nat = Nat::new( &listen_sockets, &enr, config.listen_config.ip_mode(), @@ -107,7 +107,7 @@ async fn build_handler_with_listen_config( service_send, listen_sockets, socket, - nat_utils, + nat, exit, }, MockService { @@ -191,9 +191,11 @@ async fn simple_session_message() { loop { if let Some(message) = receiver_recv.recv().await { match message { - HandlerOut::WhoAreYou(wru_ref) => { - let _ = - recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))); + HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref)) => { + let _ = recv_send.send(HandlerIn::EnrResponse( + Some(sender_enr.clone()), + EnrRequestData::WhoAreYou(wru_ref), + )); } HandlerOut::Request(_, request) => { assert_eq!(request, send_message); @@ -307,8 +309,11 @@ async fn multiple_messages() { let receiver = async move { loop { match receiver_handler.recv().await { - Some(HandlerOut::WhoAreYou(wru_ref)) => { - let _ = recv_send.send(HandlerIn::WhoAreYou(wru_ref, Some(sender_enr.clone()))); + Some(HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(wru_ref))) => { + let _ = recv_send.send(HandlerIn::EnrResponse( + Some(sender_enr.clone()), + EnrRequestData::WhoAreYou(wru_ref), + )); } Some(HandlerOut::Request(addr, request)) => { assert_eq!(request, recv_send_message); @@ -551,8 +556,11 @@ async fn nat_hole_punch_relay() { let mock_service_handle = tokio::spawn(async move { let service_msg = rx.recv().await.expect("should receive service message"); match service_msg { - HandlerOut::FindHolePunchEnr(relay_init) => tx - .send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_init)) + HandlerOut::RequestEnr(EnrRequestData::Nat(relay_init)) => tx + .send(HandlerIn::EnrResponse( + Some(tgt_enr_clone), + EnrRequestData::Nat(relay_init), + )) .expect("should send message to handler"), _ => panic!("service message should be 'find hole punch enr'"), } @@ -641,7 +649,7 @@ async fn nat_hole_punch_target() { build_handler_with_listen_config::(listen_config).await; let tgt_addr = handler.enr.read().udp4_socket().unwrap().into(); let tgt_node_id = handler.enr.read().node_id(); - handler.nat_utils.is_behind_nat = Some(true); + handler.nat.is_behind_nat = Some(true); // Relay let relay_enr = { diff --git a/src/service.rs b/src/service.rs index 783d175ca..f55089010 100644 --- a/src/service.rs +++ b/src/service.rs @@ -19,7 +19,7 @@ use self::{ }; use crate::{ error::{RequestError, ResponseError}, - handler::{Handler, HandlerIn, HandlerOut}, + handler::{EnrRequestData, Handler, HandlerIn, HandlerOut}, kbucket::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, NodeStatus, UpdateResult, MAX_NODES_PER_BUCKET, @@ -387,71 +387,68 @@ impl Service { HandlerOut::Response(node_address, response) => { self.handle_rpc_response(node_address, *response); } - HandlerOut::WhoAreYou(whoareyou_ref) => { + HandlerOut::RequestEnr(EnrRequestData::WhoAreYou(whoareyou_ref)) => { // check what our latest known ENR is for this node. if let Some(known_enr) = self.find_enr(&whoareyou_ref.0.node_id) { - if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, Some(known_enr))) { + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(known_enr), EnrRequestData::WhoAreYou(whoareyou_ref))) { warn!("Failed to send whoareyou {}", e); }; } else { // do not know of this peer debug!("NodeId unknown, requesting ENR. {}", whoareyou_ref.0); - if let Err(e) = self.handler_send.send(HandlerIn::WhoAreYou(whoareyou_ref, None)) { + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(None, EnrRequestData::WhoAreYou(whoareyou_ref))) { warn!("Failed to send who are you to unknown enr peer {}", e); } } } - HandlerOut::RequestFailed(request_id, error) => { - if let RequestError::Timeout = error { - debug!("RPC Request timed out. id: {}", request_id); - } else { - warn!("RPC Request failed: id: {}, error {:?}", request_id, error); - } - self.rpc_failure(request_id, error); - } - HandlerOut::FindHolePunchEnr(relay_init) => { - // update initiator's enr if it's in kbuckets - let inr_enr = relay_init.initiator_enr(); - let inr_key = kbucket::Key::from(inr_enr.node_id()); - match self.kbuckets.write().entry(&inr_key) { + HandlerOut::RequestEnr(EnrRequestData::Nat(relay_initiation)) => { + // Update initiator's Enr if it's in kbuckets + let initiator_enr = relay_initiation.initiator_enr(); + let initiator_key = kbucket::Key::from(initiator_enr.node_id()); + match self.kbuckets.write().entry(&initiator_key) { kbucket::Entry::Present(ref mut entry, _) => { let enr = entry.value_mut(); - if enr.seq() < inr_enr.seq() { - *enr = inr_enr.clone(); + if enr.seq() < initiator_enr.seq() { + *enr = initiator_enr.clone(); } } kbucket::Entry::Pending(ref mut entry, _) => { let enr = entry.value_mut(); - if enr.seq() < inr_enr.seq() { - *enr = inr_enr.clone(); + if enr.seq() < initiator_enr.seq() { + *enr = initiator_enr.clone(); } } _ => () } // check if we know the target node id in our routing table, otherwise // drop relay attempt. - let tgt_node_id = relay_init.target_node_id(); - let tgt_key = kbucket::Key::from(tgt_node_id); - if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&tgt_key) { - let tgt_enr = entry.value().clone(); - if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(tgt_enr, relay_init)) { + let target_node_id = relay_initiation.target_node_id(); + let target_key = kbucket::Key::from(target_node_id); + if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&target_key) { + let target_enr = entry.value().clone(); + if let Err(e) = self.handler_send.send(HandlerIn::EnrResponse(Some(target_enr), EnrRequestData::Nat(relay_initiation))) { warn!( "Failed to send target enr to relay process, error: {e}" ); } } else { - // todo(emhane): ban peers that ask us to relay to a peer we very - // unlikely could have sent to them in a NODES response. - let inr_node_id = relay_init.initiator_enr().node_id(); - + let initiator_node_id = relay_initiation.initiator_enr().node_id(); warn!( - inr_node_id=%inr_node_id, - tgt_node_id=%tgt_node_id, + initiator_node_id=%initiator_node_id, + target_node_id=%target_node_id, "Peer requested relaying to a peer not in k-buckets" ); } + }, + HandlerOut::PingAllPeers => self.ping_connected_peers(), + HandlerOut::RequestFailed(request_id, error) => { + if let RequestError::Timeout = error { + debug!("RPC Request timed out. id: {}", request_id); + } else { + warn!("RPC Request failed: id: {}, error {:?}", request_id, error); + } + self.rpc_failure(request_id, error); } - HandlerOut::PingAllPeers => self.ping_connected_peers() } } event = Service::bucket_maintenance_poll(&self.kbuckets) => {