Skip to content

Commit

Permalink
re: Document EdgeVerifierActor pt1 (near#5429)
Browse files Browse the repository at this point in the history
Document methods as requires in near#5230 (comment) near#5230 (comment)
  • Loading branch information
pmnoxx authored Nov 25, 2021
1 parent 9f2060f commit feb0acd
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
41 changes: 25 additions & 16 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use crate::stats::metrics::NetworkMetrics;
#[cfg(feature = "test_features")]
use crate::types::SetAdvOptions;
use crate::types::{
Consolidate, ConsolidateResponse, EdgeList, GetPeerId, GetPeerIdResult, NetworkInfo,
Consolidate, ConsolidateResponse, GetPeerId, GetPeerIdResult, NetworkInfo,
PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, PeerRequest, PeerResponse,
PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister,
PeersRequest, PeersResponse, SendMessage, StopMsg, SyncData, Unregister, ValidateEdgeList,
};
use crate::types::{FullPeerInfo, NetworkClientMessages, NetworkRequests, NetworkResponses};
#[cfg(feature = "protocol_feature_routing_exchange_algorithm")]
Expand Down Expand Up @@ -87,7 +87,7 @@ const EXPONENTIAL_BACKOFF_LIMIT: u64 = 91;
/// Limit number of pending Peer actors to avoid OOM.
const LIMIT_PENDING_PEERS: usize = 60;
/// How ofter should we broadcast edges.
const BROADCAST_EDGES_INTERVAL: Duration = Duration::from_millis(50);
const BROADCAST_VALIDATED_EDGES_INTERVAL: Duration = Duration::from_millis(50);
/// Maximum amount of time spend processing edges.
const BROAD_CAST_EDGES_MAX_WORK_ALLOWED: Duration = Duration::from_millis(50);
/// Delay syncinc for 1 second to avoid race condition
Expand Down Expand Up @@ -362,7 +362,7 @@ impl PeerManagerActor {

/// Receives list of edges that were verified, in a trigger every 20ms, and adds them to
/// the routing table.
fn broadcast_edges_trigger(&mut self, ctx: &mut Context<PeerManagerActor>) {
fn broadcast_validated_edges_trigger(&mut self, ctx: &mut Context<PeerManagerActor>) {
let start = Clock::instant();
let mut new_edges = Vec::new();
while let Some(edge) = self.routing_table_exchange_helper.edges_to_add_receiver.pop() {
Expand Down Expand Up @@ -417,9 +417,9 @@ impl PeerManagerActor {

near_performance_metrics::actix::run_later(
ctx,
BROADCAST_EDGES_INTERVAL,
BROADCAST_VALIDATED_EDGES_INTERVAL,
move |act, ctx| {
act.broadcast_edges_trigger(ctx);
act.broadcast_validated_edges_trigger(ctx);
},
);
}
Expand Down Expand Up @@ -1181,11 +1181,20 @@ impl PeerManagerActor {
);
}

fn verify_edges(&mut self, _ctx: &mut Context<Self>, peer_id: PeerId, edges: Vec<Edge>) {
/// Sends list of edges, from peer `peer_id` to check their signatures to `EdgeVerifierActor`.
/// Bans peer `peer_id` if an invalid edge is found.
/// `PeerManagerActor` periodically runs `broadcast_validated_edges_trigger`, which gets edges
/// from `EdgeVerifierActor` concurrent queue and sends edges to be added to `RoutingTableActor`.
fn validate_edges_and_add_to_routing_table(
&mut self,
_ctx: &mut Context<Self>,
peer_id: PeerId,
edges: Vec<Edge>,
) {
if edges.is_empty() {
return;
}
self.routing_table_addr.do_send(EdgeList {
self.routing_table_addr.do_send(ValidateEdgeList {
edges,
edges_info_shared: self.routing_table_exchange_helper.edges_info_shared.clone(),
sender: self.routing_table_exchange_helper.edges_to_add_sender.clone(),
Expand Down Expand Up @@ -1518,19 +1527,19 @@ impl Actor for PeerManagerActor {
));
}

// Periodically push network information to client
// Periodically push network information to client.
self.push_network_info_trigger(ctx);

// Start peer monitoring.
// Periodically starts peer monitoring.
self.monitor_peers_trigger(ctx);

// Start active peer stats querying.
// Periodically starts active peer stats querying.
self.monitor_peer_stats_trigger(ctx);

// Read verified edges and broadcast them.
self.broadcast_edges_trigger(ctx);
// Periodically reads valid edges from `EdgesVerifierActor` and broadcast.
self.broadcast_validated_edges_trigger(ctx);

// Update routing table and prune edges that are no longer reachable.
// Periodically updates routing table and prune edges that are no longer reachable.
self.update_routing_table_trigger(ctx);
}

Expand Down Expand Up @@ -1827,7 +1836,7 @@ impl PeerManagerActor {
actix::fut::ready(())
}).spawn(ctx);

self.verify_edges(ctx, peer_id, edges);
self.validate_edges_and_add_to_routing_table(ctx, peer_id, edges);

NetworkResponses::NoResponse
}
Expand Down Expand Up @@ -2333,7 +2342,7 @@ impl PeerManagerActor {
) {
let mut edges: Vec<Edge> = Vec::new();
std::mem::swap(&mut edges, &mut ibf_msg.edges);
self.verify_edges(ctx, peer_id.clone(), edges);
self.validate_edges_and_add_to_routing_table(ctx, peer_id.clone(), edges);
self.routing_table_addr
.send(RoutingTableMessages::ProcessIbfMessage { peer_id: peer_id.clone(), ibf_msg })
.into_actor(self)
Expand Down
12 changes: 9 additions & 3 deletions chain/network/src/routing/edge_verifier_actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::routing::edge::Edge;
use crate::types::{EdgeList, StopMsg};
use crate::types::{StopMsg, ValidateEdgeList};
use actix::{Actor, Handler, SyncContext, System};
use conqueue::{QueueReceiver, QueueSender};
use near_performance_metrics_macros::perf;
Expand All @@ -21,11 +21,17 @@ impl Handler<StopMsg> for EdgeVerifierActor {
}
}

impl Handler<EdgeList> for EdgeVerifierActor {
/// EdgeListToValidate contains list of Edges, and it's associated with a connected peer.
/// Check signatures of all edges in `EdgeListToValidate` and if any signature is not valid,
/// we will ban the peer, who sent us incorrect edges.
///
/// TODO(#5230): This code needs to be rewritten to fix memory leak - there is a cache that stores
/// all edges `edges_info_shared` forever in memory.
impl Handler<ValidateEdgeList> for EdgeVerifierActor {
type Result = bool;

#[perf]
fn handle(&mut self, msg: EdgeList, _ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: ValidateEdgeList, _ctx: &mut Self::Context) -> Self::Result {
for edge in msg.edges {
let key = edge.key();
if msg.edges_info_shared.lock().unwrap().get(key).cloned().unwrap_or(0u64)
Expand Down
6 changes: 3 additions & 3 deletions chain/network/src/routing/routing_table_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::metrics;
use crate::routing::edge::{Edge, EdgeType};
use crate::routing::edge_verifier_actor::EdgeVerifierActor;
use crate::routing::routing::{Graph, SAVE_PEERS_MAX_TIME};
use crate::types::{EdgeList, StopMsg};
use crate::types::{StopMsg, ValidateEdgeList};
use actix::dev::MessageResponse;
use actix::{
Actor, ActorFuture, Addr, Context, ContextFutureSpawner, Handler, Message, Running,
Expand Down Expand Up @@ -414,11 +414,11 @@ impl Handler<StopMsg> for RoutingTableActor {
}
}

impl Handler<EdgeList> for RoutingTableActor {
impl Handler<ValidateEdgeList> for RoutingTableActor {
type Result = bool;

#[perf]
fn handle(&mut self, msg: EdgeList, ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: ValidateEdgeList, ctx: &mut Self::Context) -> Self::Result {
self.edge_verifier_requests_in_progress += 1;
let mut msg = msg;
msg.edges.retain(|x| self.is_edge_newer(x.key(), x.nonce()));
Expand Down
4 changes: 2 additions & 2 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ pub enum NetworkRequests {
},
}

pub struct EdgeList {
pub struct ValidateEdgeList {
pub(crate) edges: Vec<Edge>,
pub(crate) edges_info_shared: Arc<Mutex<HashMap<(PeerId, PeerId), u64>>>,
pub(crate) sender: QueueSender<Edge>,
Expand All @@ -846,7 +846,7 @@ pub struct EdgeList {
pub(crate) peer_id: PeerId,
}

impl Message for EdgeList {
impl Message for ValidateEdgeList {
type Result = bool;
}

Expand Down

0 comments on commit feb0acd

Please sign in to comment.