diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index f8d6ddfc653..1b12edc1b78 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -18,9 +18,9 @@ use crate::stats::metrics::NetworkMetrics; #[cfg(feature = "test_features")] use crate::types::SetAdvOptions; use crate::types::{ - Consolidate, ConsolidateResponse, EdgeList, GetPeerId, GetPeerIdResult, NetworkInfo, + Consolidate, ConsolidateResponse, GetPeerId, GetPeerIdResult, NetworkInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, PeerRequest, PeerResponse, - PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister, + PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister, ValidateEdgeList, }; use crate::types::{FullPeerInfo, NetworkClientMessages, NetworkRequests, NetworkResponses}; #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] @@ -87,7 +87,7 @@ const EXPONENTIAL_BACKOFF_LIMIT: u64 = 91; /// Limit number of pending Peer actors to avoid OOM. const LIMIT_PENDING_PEERS: usize = 60; /// How ofter should we broadcast edges. -const BROADCAST_EDGES_INTERVAL: Duration = Duration::from_millis(50); +const BROADCAST_VALIDATED_EDGES_INTERVAL: Duration = Duration::from_millis(50); /// Maximum amount of time spend processing edges. const BROAD_CAST_EDGES_MAX_WORK_ALLOWED: Duration = Duration::from_millis(50); /// Delay syncinc for 1 second to avoid race condition @@ -362,7 +362,7 @@ impl PeerManagerActor { /// Receives list of edges that were verified, in a trigger every 20ms, and adds them to /// the routing table. - fn broadcast_edges_trigger(&mut self, ctx: &mut Context) { + fn broadcast_validated_edges_trigger(&mut self, ctx: &mut Context) { let start = Clock::instant(); let mut new_edges = Vec::new(); while let Some(edge) = self.routing_table_exchange_helper.edges_to_add_receiver.pop() { @@ -417,9 +417,9 @@ impl PeerManagerActor { near_performance_metrics::actix::run_later( ctx, - BROADCAST_EDGES_INTERVAL, + BROADCAST_VALIDATED_EDGES_INTERVAL, move |act, ctx| { - act.broadcast_edges_trigger(ctx); + act.broadcast_validated_edges_trigger(ctx); }, ); } @@ -1181,11 +1181,20 @@ impl PeerManagerActor { ); } - fn verify_edges(&mut self, _ctx: &mut Context, peer_id: PeerId, edges: Vec) { + /// Sends list of edges, from peer `peer_id` to check their signatures to `EdgeVerifierActor`. + /// Bans peer `peer_id` if an invalid edge is found. + /// `PeerManagerActor` periodically runs `broadcast_validated_edges_trigger`, which gets edges + /// from `EdgeVerifierActor` concurrent queue and sends edges to be added to `RoutingTableActor`. + fn validate_edges_and_add_to_routing_table( + &mut self, + _ctx: &mut Context, + peer_id: PeerId, + edges: Vec, + ) { if edges.is_empty() { return; } - self.routing_table_addr.do_send(EdgeList { + self.routing_table_addr.do_send(ValidateEdgeList { edges, edges_info_shared: self.routing_table_exchange_helper.edges_info_shared.clone(), sender: self.routing_table_exchange_helper.edges_to_add_sender.clone(), @@ -1518,19 +1527,19 @@ impl Actor for PeerManagerActor { )); } - // Periodically push network information to client + // Periodically push network information to client. self.push_network_info_trigger(ctx); - // Start peer monitoring. + // Periodically starts peer monitoring. self.monitor_peers_trigger(ctx); - // Start active peer stats querying. + // Periodically starts active peer stats querying. self.monitor_peer_stats_trigger(ctx); - // Read verified edges and broadcast them. - self.broadcast_edges_trigger(ctx); + // Periodically reads valid edges from `EdgesVerifierActor` and broadcast. + self.broadcast_validated_edges_trigger(ctx); - // Update routing table and prune edges that are no longer reachable. + // Periodically updates routing table and prune edges that are no longer reachable. self.update_routing_table_trigger(ctx); } @@ -1827,7 +1836,7 @@ impl PeerManagerActor { actix::fut::ready(()) }).spawn(ctx); - self.verify_edges(ctx, peer_id, edges); + self.validate_edges_and_add_to_routing_table(ctx, peer_id, edges); NetworkResponses::NoResponse } @@ -2333,7 +2342,7 @@ impl PeerManagerActor { ) { let mut edges: Vec = Vec::new(); std::mem::swap(&mut edges, &mut ibf_msg.edges); - self.verify_edges(ctx, peer_id.clone(), edges); + self.validate_edges_and_add_to_routing_table(ctx, peer_id.clone(), edges); self.routing_table_addr .send(RoutingTableMessages::ProcessIbfMessage { peer_id: peer_id.clone(), ibf_msg }) .into_actor(self) diff --git a/chain/network/src/routing/edge_verifier_actor.rs b/chain/network/src/routing/edge_verifier_actor.rs index 172262a4133..aca06866951 100644 --- a/chain/network/src/routing/edge_verifier_actor.rs +++ b/chain/network/src/routing/edge_verifier_actor.rs @@ -1,5 +1,5 @@ use crate::routing::edge::Edge; -use crate::types::{EdgeList, StopMsg}; +use crate::types::{StopMsg, ValidateEdgeList}; use actix::{Actor, Handler, SyncContext, System}; use conqueue::{QueueReceiver, QueueSender}; use near_performance_metrics_macros::perf; @@ -21,11 +21,17 @@ impl Handler for EdgeVerifierActor { } } -impl Handler for EdgeVerifierActor { +/// EdgeListToValidate contains list of Edges, and it's associated with a connected peer. +/// Check signatures of all edges in `EdgeListToValidate` and if any signature is not valid, +/// we will ban the peer, who sent us incorrect edges. +/// +/// TODO(#5230): This code needs to be rewritten to fix memory leak - there is a cache that stores +/// all edges `edges_info_shared` forever in memory. +impl Handler for EdgeVerifierActor { type Result = bool; #[perf] - fn handle(&mut self, msg: EdgeList, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ValidateEdgeList, _ctx: &mut Self::Context) -> Self::Result { for edge in msg.edges { let key = edge.key(); if msg.edges_info_shared.lock().unwrap().get(key).cloned().unwrap_or(0u64) diff --git a/chain/network/src/routing/routing_table_actor.rs b/chain/network/src/routing/routing_table_actor.rs index d4a6d22e941..88285d2d1f9 100644 --- a/chain/network/src/routing/routing_table_actor.rs +++ b/chain/network/src/routing/routing_table_actor.rs @@ -2,7 +2,7 @@ use crate::metrics; use crate::routing::edge::{Edge, EdgeType}; use crate::routing::edge_verifier_actor::EdgeVerifierActor; use crate::routing::routing::{Graph, SAVE_PEERS_MAX_TIME}; -use crate::types::{EdgeList, StopMsg}; +use crate::types::{StopMsg, ValidateEdgeList}; use actix::dev::MessageResponse; use actix::{ Actor, ActorFuture, Addr, Context, ContextFutureSpawner, Handler, Message, Running, @@ -414,11 +414,11 @@ impl Handler for RoutingTableActor { } } -impl Handler for RoutingTableActor { +impl Handler for RoutingTableActor { type Result = bool; #[perf] - fn handle(&mut self, msg: EdgeList, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ValidateEdgeList, ctx: &mut Self::Context) -> Self::Result { self.edge_verifier_requests_in_progress += 1; let mut msg = msg; msg.edges.retain(|x| self.is_edge_newer(x.key(), x.nonce())); diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 4ad12f7c33b..5645eb1ff15 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -837,7 +837,7 @@ pub enum NetworkRequests { }, } -pub struct EdgeList { +pub struct ValidateEdgeList { pub(crate) edges: Vec, pub(crate) edges_info_shared: Arc>>, pub(crate) sender: QueueSender, @@ -846,7 +846,7 @@ pub struct EdgeList { pub(crate) peer_id: PeerId, } -impl Message for EdgeList { +impl Message for ValidateEdgeList { type Result = bool; }