From 03eb0d62161e8a3e69d1d6abd74ee55f27046c2f Mon Sep 17 00:00:00 2001 From: pompon0 Date: Mon, 5 Dec 2022 19:32:13 +0100 Subject: [PATCH] TIER1 implementation (#8141) Implemented TIER1 connections: * added TIER1 support to PeerActor * added logic connecting a TIER1 node to its proxies before broadcasting its AccountData. * added logic making TIER1 nodes connect to other TIER1 nodes (or proxies) based on the collected AccountData. * made TIER1 nodes send some specific message types over TIER1 connections (with a fallback to TIER2). * made TIER1 proxies route the TIER1 messages. * added e2e tests of the TIER1 functionality Monitoring of the TIER1 performance will come in the next PR. --- chain/network/src/config.rs | 32 +- chain/network/src/config_json.rs | 37 ++ .../network/src/network_protocol/testonly.rs | 4 + chain/network/src/peer/peer_actor.rs | 184 ++++++++-- chain/network/src/peer/tests/communication.rs | 8 +- chain/network/src/peer/tests/stream.rs | 2 +- .../src/peer_manager/connection/mod.rs | 31 +- .../src/peer_manager/connection/tests.rs | 13 +- .../src/peer_manager/network_state/mod.rs | 195 +++++++--- .../src/peer_manager/network_state/tier1.rs | 285 ++++++++++++++- .../src/peer_manager/peer_manager_actor.rs | 36 +- chain/network/src/peer_manager/testonly.rs | 31 +- .../src/peer_manager/tests/accounts_data.rs | 21 +- .../src/peer_manager/tests/connection_pool.rs | 8 +- chain/network/src/peer_manager/tests/mod.rs | 1 + chain/network/src/peer_manager/tests/nonce.rs | 4 +- .../network/src/peer_manager/tests/routing.rs | 71 ++-- chain/network/src/peer_manager/tests/tier1.rs | 343 ++++++++++++++++++ chain/network/src/private_actix.rs | 2 + chain/network/src/tcp.rs | 29 +- chain/network/src/testonly/fake_client.rs | 5 +- chain/network/src/types.rs | 3 +- integration-tests/src/tests/network/runner.rs | 2 +- 23 files changed, 1159 insertions(+), 188 deletions(-) create mode 100644 chain/network/src/peer_manager/tests/tier1.rs diff --git a/chain/network/src/config.rs b/chain/network/src/config.rs index 4924ed3d20d..5fb320bad60 100644 --- a/chain/network/src/config.rs +++ b/chain/network/src/config.rs @@ -12,6 +12,7 @@ use near_crypto::{KeyType, SecretKey}; use near_primitives::network::PeerId; use near_primitives::types::AccountId; use near_primitives::validator_signer::{InMemoryValidatorSigner, ValidatorSigner}; +use std::collections::HashSet; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Arc; @@ -67,9 +68,23 @@ impl ValidatorConfig { #[derive(Clone)] pub struct Tier1 { + /// Interval between attempts to connect to proxies of other TIER1 nodes. + pub connect_interval: time::Duration, + /// Maximal number of new connections established every connect_interval. + /// TIER1 can consists of hundreds of nodes, so it is not feasible to connect to all of them at + /// once. + pub new_connections_per_attempt: u64, /// Interval between broacasts of the list of validator's proxies. /// Before the broadcast, validator tries to establish all the missing connections to proxies. pub advertise_proxies_interval: time::Duration, + /// Support for gradual TIER1 feature rollout: + /// - establishing connection to node's own proxies is always enabled (it is a part of peer + /// discovery mechanism). Note that unless the proxy has enable_inbound set, establishing + /// those connections will fail anyway. + /// - a node will start accepting TIER1 inbound connections iff `enable_inbound` is true. + /// - a node will try to start outbound TIER1 connections iff `enable_outbound` is true. + pub enable_inbound: bool, + pub enable_outbound: bool, } /// Validated configuration for the peer-to-peer manager. @@ -164,7 +179,12 @@ impl NetworkConfig { if cfg.public_addrs.len() > 0 && cfg.trusted_stun_servers.len() > 0 { anyhow::bail!("you cannot specify both public_addrs and trusted_stun_servers"); } + let mut proxies = HashSet::new(); for proxy in &cfg.public_addrs { + if proxies.contains(&proxy.peer_id) { + anyhow::bail!("public_addrs: found multiple entries with peer_id {}. Only 1 entry per peer_id is supported.",proxy.peer_id); + } + proxies.insert(proxy.peer_id.clone()); let ip = proxy.addr.ip(); if cfg.allow_private_ip_in_public_addrs { if ip.is_unspecified() { @@ -253,7 +273,13 @@ impl NetworkConfig { archive, accounts_data_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 }, routing_table_update_rate_limit: rate::Limit { qps: 1., burst: 1 }, - tier1: Some(Tier1 { advertise_proxies_interval: time::Duration::minutes(15) }), + tier1: Some(Tier1 { + connect_interval: cfg.experimental.tier1_connect_interval.try_into()?, + new_connections_per_attempt: cfg.experimental.tier1_new_connections_per_attempt, + advertise_proxies_interval: time::Duration::minutes(15), + enable_inbound: cfg.experimental.tier1_enable_inbound, + enable_outbound: cfg.experimental.tier1_enable_outbound, + }), inbound_disabled: cfg.experimental.inbound_disabled, skip_tombstones: if cfg.experimental.skip_sending_tombstones_seconds > 0 { Some(time::Duration::seconds(cfg.experimental.skip_sending_tombstones_seconds)) @@ -321,7 +347,11 @@ impl NetworkConfig { tier1: Some(Tier1 { // Interval is very large, so that it doesn't happen spontaneously in tests. // It should rather be triggered manually in tests. + connect_interval: time::Duration::hours(1000), + new_connections_per_attempt: 10000, advertise_proxies_interval: time::Duration::hours(1000), + enable_inbound: true, + enable_outbound: true, }), skip_tombstones: None, event_sink: Sink::null(), diff --git a/chain/network/src/config_json.rs b/chain/network/src/config_json.rs index a86c446a576..696d03fda6b 100644 --- a/chain/network/src/config_json.rs +++ b/chain/network/src/config_json.rs @@ -176,6 +176,23 @@ pub struct Config { pub experimental: ExperimentalConfig, } +fn default_tier1_enable_inbound() -> bool { + true +} +/// This default will be changed over the next releases. +/// It allows us to gradually roll out the TIER1 feature. +fn default_tier1_enable_outbound() -> bool { + false +} + +fn default_tier1_connect_interval() -> Duration { + Duration::from_secs(60) +} + +fn default_tier1_new_connections_per_attempt() -> u64 { + 50 +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ExperimentalConfig { // If true - don't allow any inbound connections. @@ -192,6 +209,22 @@ pub struct ExperimentalConfig { // compatibility. #[serde(default = "default_skip_tombstones")] pub skip_sending_tombstones_seconds: i64, + + /// See `near_network::config::Tier1::enable_inbound`. + #[serde(default = "default_tier1_enable_inbound")] + pub tier1_enable_inbound: bool, + + /// See `near_network::config::Tier1::enable_outbound`. + #[serde(default = "default_tier1_enable_outbound")] + pub tier1_enable_outbound: bool, + + /// See `near_network::config::Tier1::connect_interval`. + #[serde(default = "default_tier1_connect_interval")] + pub tier1_connect_interval: Duration, + + /// See `near_network::config::Tier1::new_connections_per_attempt`. + #[serde(default = "default_tier1_new_connections_per_attempt")] + pub tier1_new_connections_per_attempt: u64, } impl Default for ExperimentalConfig { @@ -200,6 +233,10 @@ impl Default for ExperimentalConfig { inbound_disabled: false, connect_only_to_boot_nodes: false, skip_sending_tombstones_seconds: default_skip_tombstones(), + tier1_enable_inbound: default_tier1_enable_inbound(), + tier1_enable_outbound: default_tier1_enable_outbound(), + tier1_connect_interval: default_tier1_connect_interval(), + tier1_new_connections_per_attempt: default_tier1_new_connections_per_attempt(), } } } diff --git a/chain/network/src/network_protocol/testonly.rs b/chain/network/src/network_protocol/testonly.rs index 63033b118af..555325ed8b6 100644 --- a/chain/network/src/network_protocol/testonly.rs +++ b/chain/network/src/network_protocol/testonly.rs @@ -229,6 +229,10 @@ impl ChunkSet { } } +pub fn make_hash(rng: &mut R) -> CryptoHash { + CryptoHash::hash_bytes(&rng.gen::<[u8; 19]>()) +} + pub fn make_account_keys(signers: &[InMemoryValidatorSigner]) -> AccountKeys { let mut account_keys = AccountKeys::new(); for s in signers { diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 074bab0ef2b..dea5d2c8407 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -4,7 +4,7 @@ use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; use crate::network_protocol::{ Edge, EdgeState, Encoding, OwnedAccount, ParsePeerMessageError, PartialEdgeInfo, - PeerChainInfoV2, PeerIdOrHash, PeerInfo, RawRoutedMessage, RoutedMessageBody, + PeerChainInfoV2, PeerIdOrHash, PeerInfo, RawRoutedMessage, RoutedMessageBody, RoutedMessageV2, RoutingTableUpdate, SyncAccountsData, }; use crate::peer::stream; @@ -60,6 +60,8 @@ const ROUTED_MESSAGE_CACHE_SIZE: usize = 1000; const DROP_DUPLICATED_MESSAGES_PERIOD: time::Duration = time::Duration::milliseconds(50); /// How often to send the latest block to peers. const SYNC_LATEST_BLOCK_INTERVAL: time::Duration = time::Duration::seconds(60); +/// How often to perform a full sync of AccountsData with the peer. +const ACCOUNTS_DATA_FULL_SYNC_INTERVAL: time::Duration = time::Duration::minutes(10); #[derive(Debug, Clone, PartialEq, Eq)] pub struct ConnectionClosedEvent { @@ -76,6 +78,7 @@ pub struct HandshakeStartedEvent { pub struct HandshakeCompletedEvent { pub(crate) stream_id: tcp::StreamId, pub(crate) edge: Edge, + pub(crate) tier: tcp::Tier, } #[derive(thiserror::Error, Clone, PartialEq, Eq, Debug)] @@ -93,6 +96,10 @@ pub(crate) enum ClosingReason { RejectedByPeerManager(RegisterPeerError), #[error("stream error")] StreamError, + /// Read through `tcp::Tier::is_allowed()` to see which message types + /// are allowed for a connection of each tier. + #[error("Received a message of type not allowed on this connection.")] + DisallowedMessage, #[error("PeerManager requested to close the connection")] PeerManager, #[error("Received DisconnectMessage from peer")] @@ -155,6 +162,7 @@ impl Debug for PeerActor { struct HandshakeSpec { /// ID of the peer on the other side of the connection. peer_id: PeerId, + tier: tcp::Tier, protocol_version: ProtocolVersion, partial_edge_info: PartialEdgeInfo, } @@ -215,10 +223,13 @@ impl PeerActor { .try_acquire_owned() .map_err(|_| ClosingReason::TooManyInbound)?, ), - tcp::StreamType::Outbound { peer_id } => ConnectingStatus::Outbound { - _permit: { - // This block will be executed for TIER2 only. - { + tcp::StreamType::Outbound { tier, peer_id } => ConnectingStatus::Outbound { + _permit: match tier { + tcp::Tier::T1 => network_state + .tier1 + .start_outbound(peer_id.clone()) + .map_err(ClosingReason::OutboundNotAllowed)?, + tcp::Tier::T2 => { // A loop connection is not allowed on TIER2 // (it is allowed on TIER1 to verify node's public IP). // TODO(gprusak): try to make this more consistent. @@ -236,11 +247,19 @@ impl PeerActor { handshake_spec: HandshakeSpec { partial_edge_info: network_state.propose_edge(&clock, peer_id, None), protocol_version: PROTOCOL_VERSION, + tier: *tier, peer_id: peer_id.clone(), }, }, }; - + // Override force_encoding for outbound Tier1 connections, + // since Tier1Handshake is supported only with proto encoding. + let force_encoding = match &stream.type_ { + tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 => { + Some(Encoding::Proto) + } + _ => force_encoding, + }; let my_node_info = PeerInfo { id: network_state.config.node_id(), addr: network_state.config.node_addr.clone(), @@ -276,7 +295,7 @@ impl PeerActor { force_encoding, peer_info: match &stream_type { tcp::StreamType::Inbound => None, - tcp::StreamType::Outbound { peer_id } => Some(PeerInfo { + tcp::StreamType::Outbound { peer_id, .. } => Some(PeerInfo { id: peer_id.clone(), addr: Some(peer_addr), account_id: None, @@ -393,7 +412,10 @@ impl PeerActor { .sign(vc.signer.as_ref()) }), }; - let msg = PeerMessage::Tier2Handshake(handshake); + let msg = match spec.tier { + tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), + tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + }; self.send_message_or_log(&msg); } @@ -418,6 +440,7 @@ impl PeerActor { fn process_handshake( &mut self, ctx: &mut ::Context, + tier: tcp::Tier, handshake: Handshake, ) { tracing::debug!(target: "network", "{:?}: Received handshake {:?}", self.my_node_info.id, handshake); @@ -442,6 +465,14 @@ impl PeerActor { self.stop(ctx, ClosingReason::HandshakeFailed); return; } + // This can happen only in case of a malicious node. + // Outbound peer requests a connection of a given TIER, the inbound peer can just + // confirm the TIER or drop connection. TIER is not negotiable during handshake. + if tier != spec.tier { + tracing::warn!(target: "network", "Connection TIER mismatch. Disconnecting peer {}", handshake.sender_peer_id); + self.stop(ctx, ClosingReason::HandshakeFailed); + return; + } if handshake.partial_edge_info.nonce != spec.partial_edge_info.nonce { tracing::warn!(target: "network", "Nonce mismatch. Disconnecting peer {}", handshake.sender_peer_id); self.stop(ctx, ClosingReason::HandshakeFailed); @@ -552,6 +583,7 @@ impl PeerActor { let now = self.clock.now(); let conn = Arc::new(connection::Connection { + tier, addr: ctx.address(), peer_info: peer_info.clone(), edge: ArcMutex::new(edge), @@ -576,6 +608,7 @@ impl PeerActor { let tracker = self.tracker.clone(); let clock = self.clock.clone(); + let mut interval = time::Interval::new(clock.now(), self.network_state.config.peer_stats_period); ctx.spawn({ @@ -636,19 +669,39 @@ impl PeerActor { if act.peer_type == PeerType::Inbound { act.send_handshake(HandshakeSpec{ peer_id: handshake.sender_peer_id.clone(), + tier, protocol_version: handshake.protocol_version, partial_edge_info: partial_edge_info, }); } - // TODO(gprusak): This block will be executed for TIER2 only. - { + // TIER1 is strictly reserved for BFT consensensus messages, + // so all kinds of periodical syncs happen only on TIER2 connections. + if tier==tcp::Tier::T2 { + // Trigger a full accounts data sync periodically. + // Note that AccountsData is used to establish TIER1 network, + // it is broadcasted over TIER2 network. This is a bootstrapping + // mechanism, because TIER2 is established before TIER1. + // + // TODO(gprusak): consider whether it wouldn't be more uniform to just + // send full sync from both sides of the connection independently. Or + // perhaps make the full sync request a separate message which doesn't + // carry the accounts_data at all. if conn.peer_type == PeerType::Outbound { - // Outbound peer triggers the inital full accounts data sync. - // TODO(gprusak): implement triggering the periodic full sync. - act.send_message_or_log(&PeerMessage::SyncAccountsData(SyncAccountsData{ - accounts_data: act.network_state.accounts_data.load().data.values().cloned().collect(), - incremental: false, - requesting_full_sync: true, + ctx.spawn(wrap_future({ + let clock = act.clock.clone(); + let conn = conn.clone(); + let network_state = act.network_state.clone(); + let mut interval = time::Interval::new(clock.now(),ACCOUNTS_DATA_FULL_SYNC_INTERVAL); + async move { + loop { + interval.tick(&clock).await; + conn.send_message(Arc::new(PeerMessage::SyncAccountsData(SyncAccountsData{ + accounts_data: network_state.accounts_data.load().data.values().cloned().collect(), + incremental: false, + requesting_full_sync: true, + }))); + } + } })); } // Exchange peers periodically. @@ -714,6 +767,7 @@ impl PeerActor { act.network_state.config.event_sink.push(Event::HandshakeCompleted(HandshakeCompletedEvent{ stream_id: act.stream_id, edge: conn.edge.load().as_ref().clone(), + tier: conn.tier, })); }, Err(err) => { @@ -821,8 +875,11 @@ impl PeerActor { actix::fut::ready(()) })); } + (PeerStatus::Connecting { .. }, PeerMessage::Tier1Handshake(msg)) => { + self.process_handshake(ctx, tcp::Tier::T1, msg) + } (PeerStatus::Connecting { .. }, PeerMessage::Tier2Handshake(msg)) => { - self.process_handshake(ctx, msg) + self.process_handshake(ctx, tcp::Tier::T2, msg) } (_, msg) => { tracing::warn!(target:"network","unexpected message during handshake: {}",msg) @@ -913,7 +970,7 @@ impl PeerActor { .network_state .config .event_sink - .delayed_push(|| Event::MessageProcessed(msg.clone())); + .delayed_push(|| Event::MessageProcessed(conn.tier, msg.clone())); let was_requested = match &msg { PeerMessage::Block(block) => { self.network_state.txns_since_last_block.store(0, Ordering::Release); @@ -987,6 +1044,26 @@ impl PeerActor { ); } + fn add_route_back(&self, conn: &connection::Connection, msg: &RoutedMessageV2) { + if !msg.expect_response() { + return; + } + tracing::trace!(target: "network", route_back = ?msg.clone(), "Received peer message that requires response"); + let from = &conn.peer_info.id; + match conn.tier { + tcp::Tier::T1 => self.network_state.tier1_route_back.lock().insert( + &self.clock, + msg.hash(), + from.clone(), + ), + tcp::Tier::T2 => self.network_state.graph.routing_table.add_route_back( + &self.clock, + msg.hash(), + from.clone(), + ), + } + } + fn handle_msg_ready( &mut self, ctx: &mut actix::Context, @@ -1003,7 +1080,7 @@ impl PeerActor { tracing::debug!(target: "network", "Disconnect signal. Me: {:?} Peer: {:?}", self.my_node_info.id, self.other_peer_id()); self.stop(ctx, ClosingReason::DisconnectMessage); } - PeerMessage::Tier2Handshake(_) => { + PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) => { // Received handshake after already have seen handshake from this peer. tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } @@ -1016,7 +1093,10 @@ impl PeerActor { tracing::debug!(target: "network", "Peers request from {}: sending {} peers.", self.peer_info, peers.len()); self.send_message_or_log(&PeerMessage::PeersResponse(peers)); } - self.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + self.network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); } PeerMessage::PeersResponse(peers) => { tracing::debug!(target: "network", "Received peers from {}: {} peers.", self.peer_info, peers.len()); @@ -1027,7 +1107,10 @@ impl PeerActor { ) { tracing::error!(target: "network", ?err, "Fail to update peer store"); }; - self.network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + self.network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); } PeerMessage::RequestUpdateNonce(edge_info) => { let clock = self.clock.clone(); @@ -1055,15 +1138,23 @@ impl PeerActor { conn.send_message(Arc::new(PeerMessage::SyncRoutingTable( RoutingTableUpdate::from_edges(vec![edge]), ))); - network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); })); } PeerMessage::SyncRoutingTable(rtu) => { let clock = self.clock.clone(); + let conn = conn.clone(); let network_state = self.network_state.clone(); ctx.spawn(wrap_future(async move { - Self::handle_sync_routing_table(&clock, &network_state, conn, rtu).await; - network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + Self::handle_sync_routing_table(&clock, &network_state, conn.clone(), rtu) + .await; + network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); })); } PeerMessage::SyncAccountsData(msg) => { @@ -1085,7 +1176,10 @@ impl PeerActor { } // Early exit, if there is no data in the message. if msg.accounts_data.is_empty() { - network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); return; } let network_state = self.network_state.clone(); @@ -1101,7 +1195,10 @@ impl PeerActor { } })); } - network_state.config.event_sink.push(Event::MessageProcessed(peer_msg)); + network_state + .config + .event_sink + .push(Event::MessageProcessed(conn.tier, peer_msg)); })); } PeerMessage::Routed(mut msg) => { @@ -1146,44 +1243,42 @@ impl PeerActor { self.stop(ctx, ClosingReason::Ban(ReasonForBan::InvalidSignature)); return; } - let from = &conn.peer_info.id; - if msg.expect_response() { - tracing::trace!(target: "network", route_back = ?msg.clone(), "Received peer message that requires response"); - self.network_state.graph.routing_table.add_route_back( - &self.clock, - msg.hash(), - from.clone(), - ); - } + + self.add_route_back(&conn, msg.as_ref()); if for_me { // Handle Ping and Pong message if they are for us without sending to client. // i.e. Return false in case of Ping and Pong match &msg.body { RoutedMessageBody::Ping(ping) => { - self.network_state.send_pong(&self.clock, ping.nonce, msg.hash()); + self.network_state.send_pong( + &self.clock, + conn.tier, + ping.nonce, + msg.hash(), + ); // TODO(gprusak): deprecate Event::Ping/Pong in favor of // MessageProcessed. self.network_state.config.event_sink.push(Event::Ping(ping.clone())); self.network_state .config .event_sink - .push(Event::MessageProcessed(PeerMessage::Routed(msg))); + .push(Event::MessageProcessed(conn.tier, PeerMessage::Routed(msg))); } RoutedMessageBody::Pong(pong) => { self.network_state.config.event_sink.push(Event::Pong(pong.clone())); self.network_state .config .event_sink - .push(Event::MessageProcessed(PeerMessage::Routed(msg))); + .push(Event::MessageProcessed(conn.tier, PeerMessage::Routed(msg))); } _ => self.receive_message(ctx, &conn, PeerMessage::Routed(msg.clone())), } } else { if msg.decrease_ttl() { - self.network_state.send_message_to_peer(&self.clock, msg); + self.network_state.send_message_to_peer(&self.clock, conn.tier, msg); } else { self.network_state.config.event_sink.push(Event::RoutedMessageDropped); - tracing::warn!(target: "network", ?msg, ?from, "Message dropped because TTL reached 0."); + tracing::warn!(target: "network", ?msg, from = ?conn.peer_info.id, "Message dropped because TTL reached 0."); metrics::ROUTED_MESSAGE_DROPPED .with_label_values(&[msg.body_variant()]) .inc(); @@ -1393,6 +1488,15 @@ impl actix::Handler for PeerActor { return; } conn.last_time_received_message.store(self.clock.now()); + // Check if the message type is allowed given the TIER of the connection: + // TIER1 connections are reserved exclusively for BFT consensus messages. + if !conn.tier.is_allowed(&peer_msg) { + tracing::warn!(target: "network", "Received {} on {:?} connection, disconnecting",peer_msg.msg_variant(),conn.tier); + // TODO(gprusak): this is abusive behavior. Consider banning for it. + self.stop(ctx, ClosingReason::DisallowedMessage); + return; + } + // Optionally, ignore any received tombstones after startup. This is to // prevent overload from too much accumulated deleted edges. // diff --git a/chain/network/src/peer/tests/communication.rs b/chain/network/src/peer/tests/communication.rs index 7a3963c6346..7334a2f102b 100644 --- a/chain/network/src/peer/tests/communication.rs +++ b/chain/network/src/peer/tests/communication.rs @@ -35,7 +35,8 @@ async fn test_peer_communication( network: chain.make_config(&mut rng), force_encoding: outbound_encoding, }; - let (outbound_stream, inbound_stream) = tcp::Stream::loopback(inbound_cfg.id()).await; + let (outbound_stream, inbound_stream) = + tcp::Stream::loopback(inbound_cfg.id(), tcp::Tier::T2).await; let mut inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; let mut outbound = PeerHandle::start_endpoint(clock.clock(), outbound_cfg, outbound_stream).await; @@ -45,7 +46,7 @@ async fn test_peer_communication( let message_processed = |want| { move |ev| match ev { - Event::Network(PME::MessageProcessed(got)) if got == want => Some(()), + Event::Network(PME::MessageProcessed(_, got)) if got == want => Some(()), _ => None, } }; @@ -190,7 +191,8 @@ async fn test_handshake(outbound_encoding: Option, inbound_encoding: O chain: chain.clone(), force_encoding: outbound_encoding, }; - let (outbound_stream, inbound_stream) = tcp::Stream::loopback(inbound_cfg.id()).await; + let (outbound_stream, inbound_stream) = + tcp::Stream::loopback(inbound_cfg.id(), tcp::Tier::T2).await; let inbound = PeerHandle::start_endpoint(clock.clock(), inbound_cfg, inbound_stream).await; let outbound_port = outbound_stream.local_addr.port(); let mut outbound = Stream::new(outbound_encoding, outbound_stream); diff --git a/chain/network/src/peer/tests/stream.rs b/chain/network/src/peer/tests/stream.rs index ce9961b7b43..a28e5da41c1 100644 --- a/chain/network/src/peer/tests/stream.rs +++ b/chain/network/src/peer/tests/stream.rs @@ -67,7 +67,7 @@ impl Actor { #[tokio::test] async fn send_recv() { let mut rng = make_rng(98324532); - let (s1, s2) = tcp::Stream::loopback(data::make_peer_id(&mut rng)).await; + let (s1, s2) = tcp::Stream::loopback(data::make_peer_id(&mut rng), tcp::Tier::T2).await; let a1 = Actor::spawn(s1).await; let mut a2 = Actor::spawn(s2).await; diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index bf66bc654cd..89231af4cce 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -2,12 +2,14 @@ use crate::concurrency::arc_mutex::ArcMutex; use crate::concurrency::atomic_cell::AtomicCell; use crate::concurrency::demux; use crate::network_protocol::{ - Edge, PeerInfo, PeerMessage, SignedAccountData, SignedOwnedAccount, SyncAccountsData, + Edge, PeerInfo, PeerMessage, RoutedMessageBody, SignedAccountData, SignedOwnedAccount, + SyncAccountsData, }; use crate::peer::peer_actor; use crate::peer::peer_actor::PeerActor; use crate::private_actix::SendMessage; use crate::stats::metrics; +use crate::tcp; use crate::time; use crate::types::{BlockInfo, FullPeerInfo, PeerChainInfo, PeerType, ReasonForBan}; use arc_swap::ArcSwap; @@ -25,6 +27,31 @@ use std::sync::{Arc, Weak}; #[cfg(test)] mod tests; +impl tcp::Tier { + /// Checks if the given message type is allowed on a connection of the given Tier. + /// TIER1 is reserved exclusively for BFT consensus messages. + /// Each validator establishes a lot of TIER1 connections, so bandwidth shouldn't be + /// wasted on broadcasting or periodic state syncs on TIER1 connections. + pub(crate) fn is_allowed(self, msg: &PeerMessage) -> bool { + match msg { + PeerMessage::Tier1Handshake(_) => self == tcp::Tier::T1, + PeerMessage::Tier2Handshake(_) => self == tcp::Tier::T2, + PeerMessage::HandshakeFailure(_, _) => true, + PeerMessage::LastEdge(_) => true, + PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body), + _ => self == tcp::Tier::T2, + } + } + + pub(crate) fn is_allowed_routed(self, body: &RoutedMessageBody) -> bool { + match body { + RoutedMessageBody::BlockApproval(..) => true, + RoutedMessageBody::VersionedPartialEncodedChunk(..) => true, + _ => self == tcp::Tier::T2, + } + } +} + #[derive(Default)] pub(crate) struct Stats { /// Number of messages received since the last reset of the counter. @@ -44,6 +71,8 @@ pub(crate) struct Stats { /// Contains information relevant to a connected peer. pub(crate) struct Connection { + // TODO(gprusak): add rate limiting on TIER1 connections for defence in-depth. + pub tier: tcp::Tier, // TODO(gprusak): addr should be internal, so that Connection will become an API of the // PeerActor. pub addr: actix::Addr, diff --git a/chain/network/src/peer_manager/connection/tests.rs b/chain/network/src/peer_manager/connection/tests.rs index e2aec85b7ae..0af99d16368 100644 --- a/chain/network/src/peer_manager/connection/tests.rs +++ b/chain/network/src/peer_manager/connection/tests.rs @@ -3,6 +3,7 @@ use crate::peer::peer_actor::ClosingReason; use crate::peer_manager; use crate::peer_manager::connection; use crate::private_actix::RegisterPeerError; +use crate::tcp; use crate::testonly::make_rng; use crate::time; use near_o11y::testonly::init_test_logger; @@ -28,7 +29,7 @@ async fn connection_tie_break() { .await; // pm.id is lower - let outbound_conn = pm.start_outbound(chain.clone(), cfgs[2].clone()).await; + let outbound_conn = pm.start_outbound(chain.clone(), cfgs[2].clone(), tcp::Tier::T2).await; let inbound_conn = pm.start_inbound(chain.clone(), cfgs[2].clone()).await; // inbound should be rejected, outbound accepted. assert_eq!( @@ -40,7 +41,7 @@ async fn connection_tie_break() { outbound_conn.handshake(&clock.clock()).await; // pm.id is higher - let outbound_conn = pm.start_outbound(chain.clone(), cfgs[0].clone()).await; + let outbound_conn = pm.start_outbound(chain.clone(), cfgs[0].clone(), tcp::Tier::T2).await; let inbound_conn = pm.start_inbound(chain.clone(), cfgs[0].clone()).await; // inbound should be accepted, outbound rejected by PM. let inbound = inbound_conn.handshake(&clock.clock()).await; @@ -71,8 +72,8 @@ async fn duplicate_connections() { // Double outbound. let cfg = chain.make_config(rng); - let conn1 = pm.start_outbound(chain.clone(), cfg.clone()).await; - let conn2 = pm.start_outbound(chain.clone(), cfg.clone()).await; + let conn1 = pm.start_outbound(chain.clone(), cfg.clone(), tcp::Tier::T2).await; + let conn2 = pm.start_outbound(chain.clone(), cfg.clone(), tcp::Tier::T2).await; // conn2 shouldn't even be started, so it should fail before conn1 completes. assert_eq!( ClosingReason::OutboundNotAllowed(connection::PoolError::AlreadyStartedConnecting), @@ -99,7 +100,7 @@ async fn duplicate_connections() { let cfg = chain.make_config(rng); let conn1 = pm.start_inbound(chain.clone(), cfg.clone()).await; let conn1 = conn1.handshake(&clock.clock()).await; - let conn2 = pm.start_outbound(chain.clone(), cfg.clone()).await; + let conn2 = pm.start_outbound(chain.clone(), cfg.clone(), tcp::Tier::T2).await; assert_eq!( ClosingReason::OutboundNotAllowed(connection::PoolError::AlreadyConnected), conn2.manager_fail_handshake(&clock.clock()).await, @@ -108,7 +109,7 @@ async fn duplicate_connections() { // Outbound then inbound. let cfg = chain.make_config(rng); - let conn1 = pm.start_outbound(chain.clone(), cfg.clone()).await; + let conn1 = pm.start_outbound(chain.clone(), cfg.clone(), tcp::Tier::T2).await; let conn1 = conn1.handshake(&clock.clock()).await; let conn2 = pm.start_inbound(chain.clone(), cfg.clone()).await; assert_eq!( diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 03d5fcb5795..67a2cfd3d38 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -12,6 +12,7 @@ use crate::peer_manager::connection; use crate::peer_manager::peer_manager_actor::Event; use crate::peer_manager::peer_store; use crate::private_actix::RegisterPeerError; +use crate::routing::route_back_cache::RouteBackCache; use crate::stats::metrics; use crate::store; use crate::tcp; @@ -89,6 +90,7 @@ pub(crate) struct NetworkState { pub accounts_data: Arc, /// Connected peers (inbound and outbound) with their full peer information. pub tier2: connection::Pool, + pub tier1: connection::Pool, /// Semaphore limiting inflight inbound handshakes. pub inbound_handshake_permits: Arc, /// Peer store that provides read/write access to peers. @@ -96,6 +98,15 @@ pub(crate) struct NetworkState { /// A graph of the whole NEAR network. pub graph: Arc, + /// Hash of messages that requires routing back to respective previous hop. + /// Currently unused, as TIER1 messages do not require a response. + /// Also TIER1 connections are direct by design (except for proxies), + /// so routing shouldn't really be needed. + /// TODO(gprusak): consider removing it altogether. + /// + /// Note that the route_back table for TIER2 is stored in graph.routing_table_view. + pub tier1_route_back: Mutex, + /// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx` /// messages sincce last block. pub txns_since_last_block: AtomicUsize, @@ -110,6 +121,8 @@ pub(crate) struct NetworkState { /// in the first place. pub max_num_peers: AtomicU32, + /// Mutex which prevents overlapping calls to tier1_advertise_proxies. + tier1_advertise_proxies_mutex: tokio::sync::Mutex<()>, /// Demultiplexer aggregating calls to add_edges(). add_edges_demux: demux::Demux, ()>, @@ -142,9 +155,11 @@ impl NetworkState { client, chain_info: Default::default(), tier2: connection::Pool::new(config.node_id()), + tier1: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), peer_store, accounts_data: Arc::new(accounts_data::Cache::new()), + tier1_route_back: Mutex::new(RouteBackCache::default()), txns_since_last_block: AtomicUsize::new(0), whitelist_nodes, max_num_peers: AtomicU32::new(config.max_num_peers), @@ -152,6 +167,7 @@ impl NetworkState { set_chain_info_mutex: Mutex::new(()), config, created_at: clock.now(), + tier1_advertise_proxies_mutex: tokio::sync::Mutex::new(()), } } @@ -243,28 +259,48 @@ impl NetworkState { return Err(RegisterPeerError::Banned); } - if conn.peer_type == PeerType::Inbound { - if !this.is_inbound_allowed(&peer_info) { - // TODO(1896): Gracefully drop inbound connection for other peer. - let tier2 = this.tier2.load(); - tracing::debug!(target: "network", - tier2 = tier2.ready.len(), outgoing_peers = tier2.outbound_handshakes.len(), - max_num_peers = this.max_num_peers.load(Ordering::Relaxed), - "Dropping handshake (network at max capacity)." - ); - return Err(RegisterPeerError::ConnectionLimitExceeded); + match conn.tier { + tcp::Tier::T1 => { + if conn.peer_type == PeerType::Inbound { + if !this.config.tier1.as_ref().map_or(false, |c| c.enable_inbound) { + return Err(RegisterPeerError::Tier1InboundDisabled); + } + // Allow for inbound TIER1 connections only directly from a TIER1 peers. + let owned_account = match &conn.owned_account { + Some(it) => it, + None => return Err(RegisterPeerError::NotTier1Peer), + }; + if !this.accounts_data.load().keys.contains(&owned_account.account_key) { + return Err(RegisterPeerError::NotTier1Peer); + } + } + this.tier1.insert_ready(conn).map_err(RegisterPeerError::PoolError)?; + } + tcp::Tier::T2 => { + if conn.peer_type == PeerType::Inbound { + if !this.is_inbound_allowed(&peer_info) { + // TODO(1896): Gracefully drop inbound connection for other peer. + let tier2 = this.tier2.load(); + tracing::debug!(target: "network", + tier2 = tier2.ready.len(), outgoing_peers = tier2.outbound_handshakes.len(), + max_num_peers = this.max_num_peers.load(Ordering::Relaxed), + "Dropping handshake (network at max capacity)." + ); + return Err(RegisterPeerError::ConnectionLimitExceeded); + } + } + // Verify and broadcast the edge of the connection. Only then insert the new + // connection to TIER2 pool, so that nothing is broadcasted to conn. + // TODO(gprusak): consider actually banning the peer for consistency. + this.add_edges(&clock, vec![conn.edge.load().as_ref().clone()]) + .await + .map_err(|_: ReasonForBan| RegisterPeerError::InvalidEdge)?; + this.tier2.insert_ready(conn.clone()).map_err(RegisterPeerError::PoolError)?; + // Best effort write to DB. + if let Err(err) = this.peer_store.peer_connected(&clock, peer_info) { + tracing::error!(target: "network", ?err, "Failed to save peer data"); + } } - } - // Verify and broadcast the edge of the connection. Only then insert the new - // connection to TIER2 pool, so that nothing is broadcasted to conn. - // TODO(gprusak): consider actually banning the peer for consistency. - this.add_edges(&clock, vec![conn.edge.load().as_ref().clone()]) - .await - .map_err(|_: ReasonForBan| RegisterPeerError::InvalidEdge)?; - this.tier2.insert_ready(conn.clone()).map_err(RegisterPeerError::PoolError)?; - // Best effort write to DB. - if let Err(err) = this.peer_store.peer_connected(&clock, peer_info) { - tracing::error!(target: "network", ?err, "Failed to save peer data"); } Ok(()) }).await.unwrap() @@ -286,6 +322,12 @@ impl NetworkState { let conn = conn.clone(); self.spawn(async move { let peer_id = conn.peer_info.id.clone(); + if conn.tier == tcp::Tier::T1 { + // There is no banning or routing table for TIER1. + // Just remove the connection from the network_state. + this.tier1.remove(&conn); + return; + } this.tier2.remove(&conn); // If the last edge we have with this peer represent a connection addition, create the edge @@ -325,16 +367,16 @@ impl NetworkState { } } - pub fn send_ping(&self, clock: &time::Clock, nonce: u64, target: PeerId) { + pub fn send_ping(&self, clock: &time::Clock, tier: tcp::Tier, nonce: u64, target: PeerId) { let body = RoutedMessageBody::Ping(Ping { nonce, source: self.config.node_id() }); let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body }; - self.send_message_to_peer(clock, self.sign_message(clock, msg)); + self.send_message_to_peer(clock, tier, self.sign_message(clock, msg)); } - pub fn send_pong(&self, clock: &time::Clock, nonce: u64, target: CryptoHash) { + pub fn send_pong(&self, clock: &time::Clock, tier: tcp::Tier, nonce: u64, target: CryptoHash) { let body = RoutedMessageBody::Pong(Pong { nonce, source: self.config.node_id() }); let msg = RawRoutedMessage { target: PeerIdOrHash::Hash(target), body }; - self.send_message_to_peer(clock, self.sign_message(clock, msg)); + self.send_message_to_peer(clock, tier, self.sign_message(clock, msg)); } pub fn sign_message(&self, clock: &time::Clock, msg: RawRoutedMessage) -> Box { @@ -347,7 +389,12 @@ impl NetworkState { /// Route signed message to target peer. /// Return whether the message is sent or not. - pub fn send_message_to_peer(&self, clock: &time::Clock, msg: Box) -> bool { + pub fn send_message_to_peer( + &self, + clock: &time::Clock, + tier: tcp::Tier, + msg: Box, + ) -> bool { let my_peer_id = self.config.node_id(); // Check if the message is for myself and don't try to send it in that case. @@ -358,40 +405,87 @@ impl NetworkState { return false; } } - match self.graph.routing_table.find_route(&clock, &msg.target) { - Ok(peer_id) => { - // Remember if we expect a response for this message. - if msg.author == my_peer_id && msg.expect_response() { - tracing::trace!(target: "network", ?msg, "initiate route back"); - self.graph.routing_table.add_route_back(&clock, msg.hash(), my_peer_id); - } - return self.tier2.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); + match tier { + tcp::Tier::T1 => { + let peer_id = match &msg.target { + // If a message is a response, we try to load the target from the route back + // cache. + PeerIdOrHash::Hash(hash) => { + match self.tier1_route_back.lock().remove(clock, hash) { + Some(peer_id) => peer_id, + None => return false, + } + } + PeerIdOrHash::PeerId(peer_id) => peer_id.clone(), + }; + return self.tier1.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); } - Err(find_route_error) => { - // TODO(MarX, #1369): Message is dropped here. Define policy for this case. - metrics::MessageDropped::NoRouteFound.inc(&msg.body); + tcp::Tier::T2 => match self.graph.routing_table.find_route(&clock, &msg.target) { + Ok(peer_id) => { + // Remember if we expect a response for this message. + if msg.author == my_peer_id && msg.expect_response() { + tracing::trace!(target: "network", ?msg, "initiate route back"); + self.graph.routing_table.add_route_back(&clock, msg.hash(), my_peer_id); + } + return self.tier2.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); + } + Err(find_route_error) => { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::NoRouteFound.inc(&msg.body); - tracing::debug!(target: "network", - account_id = ?self.config.validator.as_ref().map(|v|v.account_id()), - to = ?msg.target, - reason = ?find_route_error, - known_peers = ?self.graph.routing_table.reachable_peers(), - msg = ?msg.body, - "Drop signed message" - ); - return false; - } + tracing::debug!(target: "network", + account_id = ?self.config.validator.as_ref().map(|v|v.account_id()), + to = ?msg.target, + reason = ?find_route_error, + known_peers = ?self.graph.routing_table.reachable_peers(), + msg = ?msg.body, + "Drop signed message" + ); + return false; + } + }, } } /// Send message to specific account. /// Return whether the message is sent or not. + /// The message might be sent over TIER1 and/or TIER2 connection depending on the message type. pub fn send_message_to_account( &self, clock: &time::Clock, account_id: &AccountId, msg: RoutedMessageBody, ) -> bool { + let mut success = false; + // All TIER1 messages are being sent over both TIER1 and TIER2 connections for now, + // so that we can actually observe the latency/reliability improvements in practice: + // for each message we track over which network tier it arrived faster? + if tcp::Tier::T1.is_allowed_routed(&msg) { + let accounts_data = self.accounts_data.load(); + for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) + { + let data = match accounts_data.data.get(key) { + Some(data) => data, + None => continue, + }; + let conn = match self.get_tier1_proxy(data) { + Some(conn) => conn, + None => continue, + }; + // TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything + // but the header. This will bound the message size + conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message( + clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(data.peer_id.clone()), + body: msg.clone(), + }, + )))); + success |= true; + break; + } + } + let target = match self.graph.routing_table.account_owner(account_id) { Some(peer_id) => peer_id, None => { @@ -410,14 +504,13 @@ impl NetworkState { let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; let msg = self.sign_message(clock, msg); if msg.body.is_important() { - let mut success = false; for _ in 0..IMPORTANT_MESSAGE_RESENT_COUNT { - success |= self.send_message_to_peer(clock, msg.clone()); + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone()); } - success } else { - self.send_message_to_peer(clock, msg) + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg) } + success } pub async fn add_accounts_data( diff --git a/chain/network/src/peer_manager/network_state/tier1.rs b/chain/network/src/peer_manager/network_state/tier1.rs index f9ecbe6e486..23d29065333 100644 --- a/chain/network/src/peer_manager/network_state/tier1.rs +++ b/chain/network/src/peer_manager/network_state/tier1.rs @@ -1,8 +1,20 @@ use crate::accounts_data; use crate::config; -use crate::network_protocol::{AccountData, PeerMessage, SignedAccountData, SyncAccountsData}; +use crate::network_protocol::{ + AccountData, PeerAddr, PeerInfo, PeerMessage, SignedAccountData, SyncAccountsData, +}; +use crate::peer::peer_actor::PeerActor; +use crate::peer_manager::connection; use crate::peer_manager::peer_manager_actor::Event; +use crate::tcp; use crate::time; +use crate::types::PeerType; +use near_crypto::PublicKey; +use near_o11y::log_assert; +use near_primitives::network::PeerId; +use rand::seq::IteratorRandom as _; +use rand::seq::SliceRandom as _; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; impl super::NetworkState { @@ -20,6 +32,40 @@ impl super::NetworkState { .filter(|cfg| accounts_data.keys.contains(&cfg.signer.public_key())) } + async fn tier1_connect_to_my_proxies( + self: &Arc, + clock: &time::Clock, + proxies: &[PeerAddr], + ) { + let tier1 = self.tier1.load(); + // Try to connect to all proxies in parallel. + let mut handles = vec![]; + for proxy in proxies { + // Skip the proxies we are already connected to. + if tier1.ready.contains_key(&proxy.peer_id) { + continue; + } + handles.push(async move { + let res = async { + let stream = tcp::Stream::connect( + &PeerInfo { + id: proxy.peer_id.clone(), + addr: Some(proxy.addr), + account_id: None, + }, + tcp::Tier::T1, + ) + .await?; + anyhow::Ok(PeerActor::spawn_and_handshake(clock.clone(), stream, None, self.clone()).await?) + }.await; + if let Err(err) = res { + tracing::warn!(target:"network", ?err, "failed to establish connection to TIER1 proxy {:?}",proxy); + } + }); + } + futures_util::future::join_all(handles).await; + } + /// Tries to connect to ALL trusted proxies from the config, then broadcasts AccountData with /// the set of proxies it managed to connect to. This way other TIER1 nodes can just connect /// to ANY proxy of this node. @@ -27,16 +73,80 @@ impl super::NetworkState { self: &Arc, clock: &time::Clock, ) -> Vec> { + // Tier1 advertise proxies calls should be disjoint, + // to avoid a race condition while connecting to the proxies. + // TODO(gprusak): there are more corner cases to cover, because + // tier1_connect may also spawn TIER1 connections conflicting with + // tier1_advertise_proxies. It would be better to be able to await + // handshake on connection attempts, even if another call spawned them. + let _lock = self.tier1_advertise_proxies_mutex.lock().await; let accounts_data = self.accounts_data.load(); - let Some(vc) = self.tier1_validator_config(&accounts_data) else { - return vec![]; + let vc = match self.tier1_validator_config(&accounts_data) { + Some(it) => it, + None => { + return vec![]; + } + }; + let proxies = match &vc.proxies { + config::ValidatorProxies::Dynamic(_) => { + // TODO(gprusak): If Dynamic are specified, + // it means that this node is its own proxy. + // Resolve the public IP of this node using those STUN servers, + // then connect to yourself (to verify the public IP). + vec![] + } + config::ValidatorProxies::Static(peer_addrs) => peer_addrs.clone(), }; - // TODO(gprusak): for now we just blindly broadcast the static list of proxies, however - // here we should try to connect to the TIER1 proxies, before broadcasting them. + self.tier1_connect_to_my_proxies(clock, &proxies).await; + + // Snapshot tier1 connections again before broadcasting. + let tier1 = self.tier1.load(); + let my_proxies = match &vc.proxies { - config::ValidatorProxies::Dynamic(_) => vec![], - config::ValidatorProxies::Static(proxies) => proxies.clone(), + // In case of dynamic configuration, only the node itself can be its proxy, + // so we look for a loop connection which would prove our node's address. + config::ValidatorProxies::Dynamic(_) => match tier1.ready.get(&self.config.node_id()) { + Some(conn) => { + log_assert!(PeerType::Outbound == conn.peer_type); + log_assert!(conn.peer_info.addr.is_some()); + match conn.peer_info.addr { + Some(addr) => vec![PeerAddr { peer_id: self.config.node_id(), addr }], + None => vec![], + } + } + None => vec![], + }, + // In case of static configuration, we look for connections to proxies matching the config. + config::ValidatorProxies::Static(proxies) => { + let mut connected_proxies = vec![]; + for proxy in proxies { + match tier1.ready.get(&proxy.peer_id) { + // Here we compare the address from the config with the + // address of the connection (which is the IP, to which the + // TCP socket is connected + port indicated by the peer). + // We will broadcast only those addresses which we confirmed are + // valid (i.e. we managed to connect to them). + // + // TODO(gprusak): It may happen that a single peer will be + // available under multiple IPs, in which case, we should + // prefer to connect to the IP from the config, however + // that would require having separate inbound and outbound + // pools, so that both endpoints can keep a connection + // to the IP that they prefer. This is a corner case which can happen + // only if 2 TIER1 validators are proxies for some other validator. + Some(conn) if conn.peer_info.addr == Some(proxy.addr) => { + connected_proxies.push(proxy.clone()); + } + Some(conn) => { + tracing::info!(target:"network", "connected to {}, but got addr {:?}, while want {}",conn.peer_info.id,conn.peer_info.addr,proxy.addr) + } + _ => {} + } + } + connected_proxies + } }; + tracing::info!(target:"network","connected to proxies {my_proxies:?}"); let now = clock.now_utc(); let version = self.accounts_data.load().data.get(&vc.signer.public_key()).map_or(0, |d| d.version) @@ -77,4 +187,165 @@ impl super::NetworkState { self.config.event_sink.push(Event::Tier1AdvertiseProxies(new_data.clone())); new_data } + + /// Closes TIER1 connections from nodes which are not TIER1 any more. + /// If this node is TIER1, it additionally connects to proxies of other TIER1 nodes. + pub async fn tier1_connect(self: &Arc, clock: &time::Clock) { + let tier1_cfg = match &self.config.tier1 { + Some(it) => it, + None => return, + }; + if !tier1_cfg.enable_outbound { + return; + } + let accounts_data = self.accounts_data.load(); + let validator_cfg = self.tier1_validator_config(&accounts_data); + + // Construct indices on accounts_data. + let mut accounts_by_proxy = HashMap::<_, Vec<_>>::new(); + let mut proxies_by_account = HashMap::<_, Vec<_>>::new(); + for d in accounts_data.data.values() { + proxies_by_account.entry(&d.account_key).or_default().extend(d.proxies.iter()); + for p in &d.proxies { + accounts_by_proxy.entry(&p.peer_id).or_default().push(&d.account_key); + } + } + + // Browse the connections from newest to oldest. + let tier1 = self.tier1.load(); + let mut ready: Vec<_> = tier1.ready.values().collect(); + ready.sort_unstable_by_key(|c| c.established_time); + ready.reverse(); + + // Select the oldest TIER1 connection for each account. + let mut safe = HashMap::<&PublicKey, &PeerId>::new(); + + match validator_cfg { + // TIER1 nodes can establish outbound connections to other TIER1 nodes and TIER1 proxies. + // TIER1 nodes can also accept inbound connections from TIER1 nodes. + Some(_) => { + for conn in &ready { + if conn.peer_type != PeerType::Outbound { + continue; + } + let peer_id = &conn.peer_info.id; + for key in accounts_by_proxy.get(peer_id).into_iter().flatten() { + safe.insert(key, peer_id); + } + } + // Direct TIER1 connections have priority over proxy connections. + for key in &accounts_data.keys { + if let Some(conn) = tier1.ready_by_account_key.get(&key) { + safe.insert(key, &conn.peer_info.id); + } + } + } + // All the other nodes should accept inbound connections from TIER1 nodes + // (to act as a TIER1 proxy). + None => { + for key in &accounts_data.keys { + if let Some(conn) = tier1.ready_by_account_key.get(&key) { + if conn.peer_type == PeerType::Inbound { + safe.insert(key, &conn.peer_info.id); + } + } + } + } + } + + // Construct a safe set of connections. + let mut safe_set: HashSet = safe.values().map(|v| (*v).clone()).collect(); + // Add proxies of our node to the safe set. + if let Some(vc) = validator_cfg { + match &vc.proxies { + config::ValidatorProxies::Dynamic(_) => { + safe_set.insert(self.config.node_id()); + } + config::ValidatorProxies::Static(peer_addrs) => { + // TODO(gprusak): here we add peer_id to a safe set, even if + // the conn.peer_addr doesn't match the address from the validator config + // (so we cannot advertise it as our proxy). Consider making it more precise. + safe_set.extend(peer_addrs.iter().map(|pa| pa.peer_id.clone())); + } + } + } + // Close all other connections, as they are redundant or are no longer TIER1. + for conn in tier1.ready.values() { + if !safe_set.contains(&conn.peer_info.id) { + conn.stop(None); + } + } + if let Some(vc) = validator_cfg { + // Try to establish new TIER1 connections to accounts in random order. + let mut handles = vec![]; + let mut account_keys: Vec<_> = proxies_by_account.keys().copied().collect(); + account_keys.shuffle(&mut rand::thread_rng()); + for account_key in account_keys { + // tier1_connect() is responsible for connecting to proxies + // of this node. tier1_connect() connects only to proxies + // of other TIER1 nodes. + if account_key == &vc.signer.public_key() { + continue; + } + // Bound the number of connections established at a single call to + // tier1_connect(). + if handles.len() as u64 >= tier1_cfg.new_connections_per_attempt { + break; + } + // If we are already connected to some proxy of account_key, then + // don't establish another connection. + if safe.contains_key(account_key) { + continue; + } + // Find addresses of proxies of account_key. + let proxies: Vec<&PeerAddr> = + proxies_by_account.get(account_key).into_iter().flatten().map(|x| *x).collect(); + // Select a random proxy of the account_key and try to connect to it. + let proxy = proxies.iter().choose(&mut rand::thread_rng()); + if let Some(proxy) = proxy { + let proxy = (*proxy).clone(); + handles.push(async move { + let stream = tcp::Stream::connect( + &PeerInfo { + id: proxy.peer_id, + addr: Some(proxy.addr), + account_id: None, + }, + tcp::Tier::T1, + ) + .await?; + PeerActor::spawn_and_handshake(clock.clone(), stream, None, self.clone()) + .await + }); + } + } + tracing::debug!(target:"network","{}: establishing {} new connections",self.config.node_id(),handles.len()); + for res in futures_util::future::join_all(handles).await { + if let Err(err) = res { + tracing::info!(target:"network", ?err, "{}: failed to establish a TIER1 connection",self.config.node_id()); + } + } + tracing::debug!(target:"network","{}: establishing new connections DONE",self.config.node_id()); + } + } + + /// Finds a TIER1 connection for the given SignedAccountData. + /// It is expected to perform <10 lookups total on average, + /// so the call latency should be negligible wrt sending a TCP packet. + // TODO(gprusak): If not, consider precomputing the AccountKey -> Connection mapping. + pub fn get_tier1_proxy(&self, data: &SignedAccountData) -> Option> { + let tier1 = self.tier1.load(); + // Prefer direct connections. + if let Some(conn) = tier1.ready_by_account_key.get(&data.account_key) { + return Some(conn.clone()); + } + // In case there is no direct connection and our node is a TIER1 validator, use a proxy. + // TODO(gprusak): add a check that our node is actually a TIER1 validator. + for proxy in &data.proxies { + if let Some(conn) = tier1.ready.get(&proxy.peer_id) { + return Some(conn.clone()); + } + } + None + } } diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 33f3bda4248..351705fa8be 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -2,8 +2,8 @@ use crate::client; use crate::config; use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol::{ - AccountOrPeerIdOrHash, Edge, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, - SignedAccountData, StateResponseInfo, + AccountOrPeerIdOrHash, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, + RoutedMessageBody, SignedAccountData, StateResponseInfo, }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -15,8 +15,8 @@ use crate::tcp; use crate::time; use crate::types::{ ConnectedPeerInfo, GetNetworkInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, - NetworkRequests, NetworkResponses, PeerIdOrHash, PeerManagerMessageRequest, - PeerManagerMessageResponse, PeerType, SetChainInfo, + NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, + PeerType, SetChainInfo, }; use actix::fut::future::wrap_future; use actix::{Actor as _, AsyncContext as _}; @@ -25,9 +25,7 @@ use near_o11y::{handler_debug_span, handler_trace_span, OpenTelemetrySpanExt, Wi use near_performance_metrics_macros::perf; use near_primitives::block::GenesisId; use near_primitives::network::{AnnounceAccount, PeerId}; -use near_primitives::views::EdgeView; -use near_primitives::views::NetworkGraphView; -use near_primitives::views::{KnownPeerStateView, PeerStoreView}; +use near_primitives::views::{EdgeView, KnownPeerStateView, NetworkGraphView, PeerStoreView}; use rand::seq::IteratorRandom; use rand::thread_rng; use rand::Rng; @@ -78,6 +76,7 @@ pub struct PeerManagerActor { my_peer_id: PeerId, /// Flag that track whether we started attempts to establish outbound connections. started_connect_attempts: bool, + /// State that is shared between multiple threads (including PeerActors). pub(crate) state: Arc, } @@ -106,7 +105,7 @@ pub enum Event { // it is hard to pinpoint all the places when the processing of a message is // actually complete. Currently this event is reported only for some message types, // feel free to add support for more. - MessageProcessed(PeerMessage), + MessageProcessed(tcp::Tier, PeerMessage), // Reported every time a new list of proxies has been constructed. Tier1AdvertiseProxies(Vec>), // Reported when a handshake has been started. @@ -248,6 +247,19 @@ impl PeerManagerActor { } } }); + // Update TIER1 connections periodically. + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + let mut interval = tokio::time::interval(cfg.connect_interval.try_into().unwrap()); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + async move { + loop { + interval.tick().await; + state.tier1_connect(&clock).await; + } + } + }); } } }); @@ -516,7 +528,7 @@ impl PeerManagerActor { let clock = self.clock.clone(); async move { let result = async { - let stream = tcp::Stream::connect(&peer_info).await.context("tcp::Stream::connect()")?; + let stream = tcp::Stream::connect(&peer_info, tcp::Tier::T2).await.context("tcp::Stream::connect()")?; PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; anyhow::Ok(()) }.await; @@ -571,6 +583,7 @@ impl PeerManagerActor { self.state.send_message_to_peer( &self.clock, + tcp::Tier::T2, self.state.sign_message(&self.clock, RawRoutedMessage { target, body: msg }), ) } @@ -719,6 +732,7 @@ impl PeerManagerActor { }; if self.state.send_message_to_peer( &self.clock, + tcp::Tier::T2, self.state.sign_message( &self.clock, RawRoutedMessage { target: PeerIdOrHash::Hash(route_back), body }, @@ -776,6 +790,7 @@ impl PeerManagerActor { { if self.state.send_message_to_peer( &self.clock, + tcp::Tier::T2, self.state.sign_message( &self.clock, RawRoutedMessage { @@ -805,6 +820,7 @@ impl PeerManagerActor { NetworkRequests::PartialEncodedChunkResponse { route_back, response } => { if self.state.send_message_to_peer( &self.clock, + tcp::Tier::T2, self.state.sign_message( &self.clock, RawRoutedMessage { @@ -908,7 +924,7 @@ impl PeerManagerActor { } // TEST-ONLY PeerManagerMessageRequest::PingTo { nonce, target } => { - self.state.send_ping(&self.clock, nonce, target); + self.state.send_ping(&self.clock, tcp::Tier::T2, nonce, target); PeerManagerMessageResponse::PingTo } } diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index be51154dd98..653a0d92f73 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -15,7 +15,6 @@ use crate::testonly::fake_client; use crate::time; use crate::types::{ AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest, - SetChainInfo, }; use crate::PeerManagerActor; use near_o11y::WithSpanContextExt; @@ -57,7 +56,10 @@ pub(crate) struct ActorHandler { pub fn unwrap_sync_accounts_data_processed(ev: Event) -> Option { match ev { - Event::PeerManager(PME::MessageProcessed(PeerMessage::SyncAccountsData(msg))) => Some(msg), + Event::PeerManager(PME::MessageProcessed( + tcp::Tier::T2, + PeerMessage::SyncAccountsData(msg), + )) => Some(msg), _ => None, } } @@ -135,12 +137,16 @@ impl ActorHandler { } } - pub fn connect_to(&self, peer_info: &PeerInfo) -> impl 'static + Send + Future { + pub fn connect_to( + &self, + peer_info: &PeerInfo, + tier: tcp::Tier, + ) -> impl 'static + Send + Future { let addr = self.actix.addr.clone(); let events = self.events.clone(); let peer_info = peer_info.clone(); async move { - let stream = tcp::Stream::connect(&peer_info).await.unwrap(); + let stream = tcp::Stream::connect(&peer_info, tier).await.unwrap(); let mut events = events.from_now(); let stream_id = stream.id(); addr.do_send(PeerManagerMessageRequest::OutboundTcpConnect(stream).with_span_context()); @@ -186,7 +192,7 @@ impl ActorHandler { // 3. establish connection. let socket = tcp::Socket::bind_v4(); let events = self.events.from_now(); - let stream = socket.connect(&self.peer_info()).await; + let stream = socket.connect(&self.peer_info(), tcp::Tier::T2).await; let stream_id = stream.id(); let conn = RawConnection { events, @@ -218,8 +224,10 @@ impl ActorHandler { &self, chain: Arc, network_cfg: config::NetworkConfig, + tier: tcp::Tier, ) -> RawConnection { - let (outbound_stream, inbound_stream) = tcp::Stream::loopback(network_cfg.node_id()).await; + let (outbound_stream, inbound_stream) = + tcp::Stream::loopback(network_cfg.node_id(), tier).await; let stream_id = outbound_stream.id(); let events = self.events.from_now(); self.actix.addr.do_send( @@ -382,6 +390,15 @@ impl ActorHandler { .await; } } + + /// Executes `NetworkState::tier1_connect` method. + pub async fn tier1_connect(&self, clock: &time::Clock) { + let clock = clock.clone(); + self.with_state(move |s| async move { + s.tier1_connect(&clock).await; + }) + .await; + } } pub(crate) async fn start( @@ -405,6 +422,6 @@ pub(crate) async fn start( let mut h = ActorHandler { cfg, actix, events: recv }; // Wait for the server to start. assert_eq!(Event::PeerManager(PME::ServerStarted), h.events.recv().await); - h.actix.addr.send(SetChainInfo(chain.get_chain_info()).with_span_context()).await.unwrap(); + h.set_chain_info(chain.get_chain_info()).await; h } diff --git a/chain/network/src/peer_manager/tests/accounts_data.rs b/chain/network/src/peer_manager/tests/accounts_data.rs index babf946666c..1a1a7b62c97 100644 --- a/chain/network/src/peer_manager/tests/accounts_data.rs +++ b/chain/network/src/peer_manager/tests/accounts_data.rs @@ -5,6 +5,7 @@ use crate::peer; use crate::peer_manager; use crate::peer_manager::peer_manager_actor::Event as PME; use crate::peer_manager::testonly; +use crate::tcp; use crate::testonly::{make_rng, AsSet as _}; use crate::time; use crate::types::PeerMessage; @@ -42,15 +43,17 @@ async fn broadcast() { .await; let take_incremental_sync = |ev| match ev { - peer::testonly::Event::Network(PME::MessageProcessed(PeerMessage::SyncAccountsData( - msg, - ))) if msg.incremental => Some(msg), + peer::testonly::Event::Network(PME::MessageProcessed( + tcp::Tier::T2, + PeerMessage::SyncAccountsData(msg), + )) if msg.incremental => Some(msg), _ => None, }; let take_full_sync = |ev| match ev { - peer::testonly::Event::Network(PME::MessageProcessed(PeerMessage::SyncAccountsData( - msg, - ))) if !msg.incremental => Some(msg), + peer::testonly::Event::Network(PME::MessageProcessed( + tcp::Tier::T2, + PeerMessage::SyncAccountsData(msg), + )) if !msg.incremental => Some(msg), _ => None, }; @@ -130,8 +133,8 @@ async fn gradual_epoch_change() { // 0 <-> 1 <-> 2 let pm1 = pms[1].peer_info(); let pm2 = pms[2].peer_info(); - pms[0].connect_to(&pm1).await; - pms[1].connect_to(&pm2).await; + pms[0].connect_to(&pm1, tcp::Tier::T2).await; + pms[1].connect_to(&pm2, tcp::Tier::T2).await; // For every order of nodes. for ids in (0..pms.len()).permutations(pms.len()) { @@ -204,7 +207,7 @@ async fn rate_limiting() { for j in 0..m { for k in 0..m { let pi = pms[(i + 1) * m + k].peer_info(); - tasks.push(tokio::spawn(pms[i * m + j].connect_to(&pi))); + tasks.push(tokio::spawn(pms[i * m + j].connect_to(&pi, tcp::Tier::T2))); connections += 1; } } diff --git a/chain/network/src/peer_manager/tests/connection_pool.rs b/chain/network/src/peer_manager/tests/connection_pool.rs index 35b006f21a2..4258425964b 100644 --- a/chain/network/src/peer_manager/tests/connection_pool.rs +++ b/chain/network/src/peer_manager/tests/connection_pool.rs @@ -75,15 +75,15 @@ async fn loop_connection() { let mut cfg = chain.make_config(rng); cfg.node_key = pm.cfg.node_key.clone(); - // Starting an outbound loop connection should be stopped without sending the handshake. - let conn = pm.start_outbound(chain.clone(), cfg).await; + // Starting an outbound loop connection on TIER2 should be stopped without sending the handshake. + let conn = pm.start_outbound(chain.clone(), cfg, tcp::Tier::T2).await; assert_eq!( ClosingReason::OutboundNotAllowed(connection::PoolError::UnexpectedLoopConnection), conn.manager_fail_handshake(&clock.clock()).await ); // An inbound connection pretending to be a loop should be rejected. - let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info(), tcp::Tier::T2).await.unwrap(); let stream_id = stream.id(); let port = stream.local_addr.port(); let mut events = pm.events.from_now(); @@ -141,7 +141,7 @@ async fn owned_account_mismatch() { .await; // An inbound connection pretending to be a loop should be rejected. - let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info(), tcp::Tier::T2).await.unwrap(); let stream_id = stream.id(); let port = stream.local_addr.port(); let mut events = pm.events.from_now(); diff --git a/chain/network/src/peer_manager/tests/mod.rs b/chain/network/src/peer_manager/tests/mod.rs index f555e7d18f6..e6254d60f42 100644 --- a/chain/network/src/peer_manager/tests/mod.rs +++ b/chain/network/src/peer_manager/tests/mod.rs @@ -2,3 +2,4 @@ mod accounts_data; mod connection_pool; mod nonce; mod routing; +mod tier1; diff --git a/chain/network/src/peer_manager/tests/nonce.rs b/chain/network/src/peer_manager/tests/nonce.rs index fe094cb10e1..d447af00416 100644 --- a/chain/network/src/peer_manager/tests/nonce.rs +++ b/chain/network/src/peer_manager/tests/nonce.rs @@ -60,7 +60,7 @@ async fn test_nonces() { ) .await; - let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info(), tcp::Tier::T2).await.unwrap(); let mut stream = stream::Stream::new(Some(Encoding::Proto), stream); let peer_key = data::make_secret_key(rng); let peer_id = PeerId::new(peer_key.public_key()); @@ -129,7 +129,7 @@ async fn test_nonce_refresh() { ) .await; - pm2.connect_to(&pm.peer_info()).await; + pm2.connect_to(&pm.peer_info(), tcp::Tier::T2).await; let edge = wait_for_edge(&mut pm2).await; let start_time = clock.now_utc(); diff --git a/chain/network/src/peer_manager/tests/routing.rs b/chain/network/src/peer_manager/tests/routing.rs index 797722faeba..38b34291014 100644 --- a/chain/network/src/peer_manager/tests/routing.rs +++ b/chain/network/src/peer_manager/tests/routing.rs @@ -46,7 +46,7 @@ async fn simple() { pm1.wait_for_routing_table(&[]).await; tracing::info!(target:"test", "connect the nodes"); - pm0.connect_to(&pm1.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[(id1.clone(), vec![id1.clone()])]).await; @@ -68,8 +68,8 @@ async fn three_nodes_path() { let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; let pm2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; let id0 = pm0.cfg.node_id(); let id1 = pm1.cfg.node_id(); @@ -109,8 +109,8 @@ async fn three_nodes_star() { let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; let pm2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; let id0 = pm0.cfg.node_id(); let id1 = pm1.cfg.node_id(); @@ -136,7 +136,7 @@ async fn three_nodes_star() { .await; tracing::info!(target:"test", "connect {id0} and {id2}"); - pm0.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -179,8 +179,8 @@ async fn join_components() { let id2 = pm2.cfg.node_id(); let id3 = pm3.cfg.node_id(); - pm0.connect_to(&pm1.peer_info()).await; - pm2.connect_to(&pm3.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm2.connect_to(&pm3.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[(id1.clone(), vec![id1.clone()])]).await; @@ -193,8 +193,8 @@ async fn join_components() { pm3.wait_for_routing_table(&[(id2.clone(), vec![id2.clone()])]).await; tracing::info!(target:"test", "join the two components into a square"); - pm0.connect_to(&pm2.peer_info()).await; - pm3.connect_to(&pm1.peer_info()).await; + pm0.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; + pm3.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -244,8 +244,8 @@ async fn simple_remove() { let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; let pm2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; let id0 = pm0.cfg.node_id(); let id1 = pm1.cfg.node_id(); @@ -337,7 +337,7 @@ async fn ping_simple() { let id0 = pm0.cfg.node_id(); let id1 = pm1.cfg.node_id(); - pm0.connect_to(&pm1.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[(id1.clone(), vec![id1.clone()])]).await; @@ -375,8 +375,8 @@ async fn ping_jump() { let id2 = pm2.cfg.node_id(); tracing::info!(target:"test", "connect nodes in a line"); - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -432,8 +432,8 @@ async fn test_dont_drop_after_ttl() { let id2 = pm2.cfg.node_id(); tracing::info!(target:"test", "connect nodes in a line"); - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -489,8 +489,8 @@ async fn test_drop_after_ttl() { let id2 = pm2.cfg.node_id(); tracing::info!(target:"test", "connect nodes in a line"); - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -540,8 +540,8 @@ async fn test_dropping_duplicate_messages() { let id2 = pm2.cfg.node_id(); tracing::info!(target:"test", "connect nodes in a line"); - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; tracing::info!(target:"test", "wait for {id0} routing table"); pm0.wait_for_routing_table(&[ @@ -880,7 +880,7 @@ async fn ttl() { chain, force_encoding: Some(Encoding::Proto), }; - let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info(), tcp::Tier::T2).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; peer.complete_handshake().await; pm.wait_for_routing_table(&[(peer.cfg.id(), vec![peer.cfg.id()])]).await; @@ -901,9 +901,10 @@ async fn ttl() { let got = peer .events .recv_until(|ev| match ev { - peer::testonly::Event::Network(PME::MessageProcessed(PeerMessage::Routed( - msg, - ))) => Some(msg), + peer::testonly::Event::Network(PME::MessageProcessed( + tcp::Tier::T2, + PeerMessage::Routed(msg), + )) => Some(msg), _ => None, }) .await; @@ -934,7 +935,7 @@ async fn repeated_data_in_sync_routing_table() { chain, force_encoding: Some(Encoding::Proto), }; - let stream = tcp::Stream::connect(&pm.peer_info()).await.unwrap(); + let stream = tcp::Stream::connect(&pm.peer_info(), tcp::Tier::T2).await.unwrap(); let mut peer = peer::testonly::PeerHandle::start_endpoint(clock.clock(), cfg, stream).await; peer.complete_handshake().await; @@ -955,6 +956,7 @@ async fn repeated_data_in_sync_routing_table() { while edges_got != edges_want || accounts_got != accounts_want { match peer.events.recv().await { peer::testonly::Event::Network(PME::MessageProcessed( + tcp::Tier::T2, PeerMessage::SyncRoutingTable(got), )) => { for a in got.accounts { @@ -1002,6 +1004,7 @@ async fn wait_for_edges( while &got != want { match events.recv().await { peer::testonly::Event::Network(PME::MessageProcessed( + tcp::Tier::T2, PeerMessage::SyncRoutingTable(msg), )) => { tracing::info!(target: "test", "got edges: {:?}",msg.edges.iter().map(|e|e.hash()).collect::>()); @@ -1093,10 +1096,10 @@ async fn square() { let pm1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; let pm2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; let pm3 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; - pm0.connect_to(&pm1.peer_info()).await; - pm1.connect_to(&pm2.peer_info()).await; - pm2.connect_to(&pm3.peer_info()).await; - pm3.connect_to(&pm0.peer_info()).await; + pm0.connect_to(&pm1.peer_info(), tcp::Tier::T2).await; + pm1.connect_to(&pm2.peer_info(), tcp::Tier::T2).await; + pm2.connect_to(&pm3.peer_info(), tcp::Tier::T2).await; + pm3.connect_to(&pm0.peer_info(), tcp::Tier::T2).await; let id0 = pm0.cfg.node_id(); let id1 = pm1.cfg.node_id(); let id2 = pm2.cfg.node_id(); @@ -1165,7 +1168,7 @@ async fn fix_local_edges() { conn.send(msg.clone()).await; events .recv_until(|ev| match ev { - Event::PeerManager(PME::MessageProcessed(got)) if got == msg => Some(()), + Event::PeerManager(PME::MessageProcessed(tcp::Tier::T2, got)) if got == msg => Some(()), _ => None, }) .await; @@ -1202,7 +1205,7 @@ async fn do_not_block_announce_account_broadcast() { tracing::info!(target:"test", "spawn 2 nodes and announce the account."); let pm0 = start_pm(clock.clock(), db0.clone(), chain.make_config(rng), chain.clone()).await; let pm1 = start_pm(clock.clock(), db1.clone(), chain.make_config(rng), chain.clone()).await; - pm1.connect_to(&pm0.peer_info()).await; + pm1.connect_to(&pm0.peer_info(), tcp::Tier::T2).await; pm1.announce_account(aa.clone()).await; assert_eq!(&aa.peer_id, &pm0.wait_for_account_owner(&aa.account_id).await); drop(pm0); @@ -1214,8 +1217,8 @@ async fn do_not_block_announce_account_broadcast() { let pm0 = start_pm(clock.clock(), db0, chain.make_config(rng), chain.clone()).await; let pm1 = start_pm(clock.clock(), db1, chain.make_config(rng), chain.clone()).await; let pm2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; - pm1.connect_to(&pm0.peer_info()).await; - pm2.connect_to(&pm0.peer_info()).await; + pm1.connect_to(&pm0.peer_info(), tcp::Tier::T2).await; + pm2.connect_to(&pm0.peer_info(), tcp::Tier::T2).await; pm1.announce_account(aa.clone()).await; assert_eq!(&aa.peer_id, &pm2.wait_for_account_owner(&aa.account_id).await); } diff --git a/chain/network/src/peer_manager/tests/tier1.rs b/chain/network/src/peer_manager/tests/tier1.rs new file mode 100644 index 00000000000..2367fccb3e3 --- /dev/null +++ b/chain/network/src/peer_manager/tests/tier1.rs @@ -0,0 +1,343 @@ +use crate::config; +use crate::network_protocol::testonly as data; +use crate::network_protocol::{PeerAddr, PeerMessage, RoutedMessageBody}; +use crate::peer_manager; +use crate::peer_manager::peer_manager_actor::Event as PME; +use crate::peer_manager::testonly::start as start_pm; +use crate::peer_manager::testonly::Event; +use crate::tcp; +use crate::testonly::{make_rng, Rng}; +use crate::time; +use crate::types::{NetworkRequests, NetworkResponses, PeerManagerMessageRequest}; +use near_o11y::testonly::init_test_logger; +use near_o11y::WithSpanContextExt; +use near_primitives::block_header::{Approval, ApprovalInner, ApprovalMessage}; +use near_primitives::validator_signer::ValidatorSigner; +use near_store::db::TestDB; +use rand::Rng as _; +use std::collections::HashSet; +use std::sync::Arc; + +/// Constructs a random TIER1 message. +fn make_block_approval(rng: &mut Rng, signer: &dyn ValidatorSigner) -> Approval { + let inner = ApprovalInner::Endorsement(data::make_hash(rng)); + let target_height = rng.gen_range(0..100000); + Approval { + signature: signer.sign_approval(&inner, target_height), + account_id: signer.validator_id().clone(), + target_height, + inner, + } +} + +async fn establish_connections(clock: &time::Clock, pms: &[&peer_manager::testonly::ActorHandler]) { + // Make TIER1 validators connect to proxies. + let mut data = HashSet::new(); + for pm in pms { + data.extend(pm.tier1_advertise_proxies(clock).await); + } + tracing::info!(target:"test", "tier1_advertise_proxies() DONE"); + + // Wait for accounts data to propagate. + for pm in pms { + tracing::info!(target:"test", "{}: wait_for_accounts_data()",pm.cfg.node_id()); + pm.wait_for_accounts_data(&data).await; + tracing::info!(target:"test", "{}: wait_for_accounts_data() DONE",pm.cfg.node_id()); + pm.tier1_connect(clock).await; + tracing::info!(target:"test", "{}: tier1_connect() DONE",pm.cfg.node_id()); + } +} + +async fn send_tier1_message( + rng: &mut Rng, + from: &peer_manager::testonly::ActorHandler, + to: &peer_manager::testonly::ActorHandler, +) { + let from_signer = from.cfg.validator.as_ref().unwrap().signer.clone(); + let to_signer = to.cfg.validator.as_ref().unwrap().signer.clone(); + let target = to_signer.validator_id().clone(); + let want = make_block_approval(rng, from_signer.as_ref()); + let req = NetworkRequests::Approval { + approval_message: ApprovalMessage { approval: want.clone(), target }, + }; + let mut events = to.events.from_now(); + let resp = from + .actix + .addr + .send(PeerManagerMessageRequest::NetworkRequests(req).with_span_context()) + .await + .unwrap(); + assert_eq!(NetworkResponses::NoResponse, resp.as_network_response()); + let got = events + .recv_until(|ev| match ev { + Event::PeerManager(PME::MessageProcessed(tcp::Tier::T1, PeerMessage::Routed(got))) => { + Some(got) + } + _ => None, + }) + .await; + assert_eq!(from.cfg.node_id(), got.author); + assert_eq!(RoutedMessageBody::BlockApproval(want), got.body); +} + +/// Send a message over each connection. +async fn test_clique(rng: &mut Rng, pms: &[&peer_manager::testonly::ActorHandler]) { + for from in pms { + for to in pms { + if from.cfg.node_id() == to.cfg.node_id() { + continue; + } + send_tier1_message(rng, from, to).await; + } + } +} + +// In case a node is its own proxy, it should advertise its address as soon as +// it becomes a TIER1 node. +#[tokio::test] +async fn first_proxy_advertisement() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + let pm = peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await; + let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&pm]); + tracing::info!(target:"test", "set_chain_info()"); + // TODO(gprusak): The default config constructed via chain.make_config(), + // currently returns a validator config with its own server addr in the list of TIER1 proxies. + // You might want to set it explicitly within this test to not rely on defaults. + pm.set_chain_info(chain_info).await; + let got = pm.tier1_advertise_proxies(&clock.clock()).await; + tracing::info!(target:"test", "awaiting for Tier1AdvertiseProxies"); + assert_eq!( + got[0].proxies, + vec![PeerAddr { peer_id: pm.cfg.node_id(), addr: pm.cfg.node_addr.unwrap() }] + ); +} + +#[tokio::test] +async fn direct_connections() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let mut pms = vec![]; + for _ in 0..5 { + pms.push( + peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await, + ); + } + let pms: Vec<_> = pms.iter().collect(); + + tracing::info!(target:"test", "Connect peers serially."); + for i in 1..pms.len() { + pms[i - 1].connect_to(&pms[i].peer_info(), tcp::Tier::T2).await; + } + + tracing::info!(target:"test", "Set chain info."); + let chain_info = peer_manager::testonly::make_chain_info(&chain, &pms[..]); + for pm in &pms { + pm.set_chain_info(chain_info.clone()).await; + } + tracing::info!(target:"test", "Establish connections."); + establish_connections(&clock.clock(), &pms[..]).await; + tracing::info!(target:"test", "Test clique."); + test_clique(rng, &pms[..]).await; +} + +/// Test which spawns N validators, each with 1 proxy. +/// All the nodes are connected in TIER2 star topology. +/// Then all validators connect to the proxy of each other validator. +#[tokio::test] +async fn proxy_connections() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + const N: usize = 5; + + let mut proxies = vec![]; + for _ in 0..N { + proxies.push( + peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await, + ); + } + let proxies: Vec<_> = proxies.iter().collect(); + + let mut validators = vec![]; + for i in 0..N { + let mut cfg = chain.make_config(rng); + cfg.validator.as_mut().unwrap().proxies = + config::ValidatorProxies::Static(vec![PeerAddr { + peer_id: proxies[i].cfg.node_id(), + addr: proxies[i].cfg.node_addr.unwrap(), + }]); + validators.push( + peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + cfg, + chain.clone(), + ) + .await, + ); + } + let validators: Vec<_> = validators.iter().collect(); + + // Connect validators and proxies in a star topology. Any connected graph would do. + let hub = peer_manager::testonly::start( + clock.clock(), + near_store::db::TestDB::new(), + chain.make_config(rng), + chain.clone(), + ) + .await; + for pm in &validators { + pm.connect_to(&hub.peer_info(), tcp::Tier::T2).await; + } + for pm in &proxies { + pm.connect_to(&hub.peer_info(), tcp::Tier::T2).await; + } + + let mut all = vec![]; + all.extend(validators.clone()); + all.extend(proxies.clone()); + all.push(&hub); + + let chain_info = peer_manager::testonly::make_chain_info(&chain, &validators[..]); + for pm in &all { + pm.set_chain_info(chain_info.clone()).await; + } + establish_connections(&clock.clock(), &all[..]).await; + test_clique(rng, &validators[..]).await; +} + +#[tokio::test] +async fn account_keys_change() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + let v0 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + let v1 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + let v2 = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + let hub = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + hub.connect_to(&v0.peer_info(), tcp::Tier::T2).await; + hub.connect_to(&v1.peer_info(), tcp::Tier::T2).await; + hub.connect_to(&v2.peer_info(), tcp::Tier::T2).await; + + // TIER1 nodes in 1st epoch are {v0,v1}. + let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&v0, &v1]); + for pm in [&v0, &v1, &v2, &hub] { + pm.set_chain_info(chain_info.clone()).await; + } + establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await; + test_clique(rng, &[&v0, &v1]).await; + + // TIER1 nodes in 2nd epoch are {v0,v2}. + let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&v0, &v2]); + for pm in [&v0, &v1, &v2, &hub] { + pm.set_chain_info(chain_info.clone()).await; + } + establish_connections(&clock.clock(), &[&v0, &v1, &v2, &hub]).await; + test_clique(rng, &[&v0, &v2]).await; + + drop(v0); + drop(v1); + drop(v2); + drop(hub); +} + +// Let's say that a validator has 2 proxies configured. At first proxy0 is available and proxy1 is not, +// then proxy1 is available and proxy0 is not. In both situations validator should be reachable, +// as long as it manages to advertise the currently available proxy and the TIER1 nodes connect to +// that proxy. +#[tokio::test] +async fn proxy_change() { + init_test_logger(); + let mut rng = make_rng(921853233); + let rng = &mut rng; + let mut clock = time::FakeClock::default(); + let chain = Arc::new(data::Chain::make(&mut clock, rng, 10)); + + // v0 has proxies {p0,p1} + // v1 has no proxies. + let p0cfg = chain.make_config(rng); + let p1cfg = chain.make_config(rng); + let mut v0cfg = chain.make_config(rng); + v0cfg.validator.as_mut().unwrap().proxies = config::ValidatorProxies::Static(vec![ + PeerAddr { peer_id: p0cfg.node_id(), addr: p0cfg.node_addr.unwrap() }, + PeerAddr { peer_id: p1cfg.node_id(), addr: p1cfg.node_addr.unwrap() }, + ]); + let mut v1cfg = chain.make_config(rng); + v1cfg.validator.as_mut().unwrap().proxies = config::ValidatorProxies::Static(vec![]); + + tracing::info!(target:"test", "Start all nodes."); + let p0 = start_pm(clock.clock(), TestDB::new(), p0cfg.clone(), chain.clone()).await; + let p1 = start_pm(clock.clock(), TestDB::new(), p1cfg.clone(), chain.clone()).await; + let v0 = start_pm(clock.clock(), TestDB::new(), v0cfg.clone(), chain.clone()).await; + let v1 = start_pm(clock.clock(), TestDB::new(), v1cfg.clone(), chain.clone()).await; + let hub = start_pm(clock.clock(), TestDB::new(), chain.make_config(rng), chain.clone()).await; + hub.connect_to(&p0.peer_info(), tcp::Tier::T2).await; + hub.connect_to(&p1.peer_info(), tcp::Tier::T2).await; + hub.connect_to(&v0.peer_info(), tcp::Tier::T2).await; + hub.connect_to(&v1.peer_info(), tcp::Tier::T2).await; + tracing::info!(target:"dupa","p0 = {}",p0cfg.node_id()); + tracing::info!(target:"dupa","hub = {}",hub.cfg.node_id()); + + tracing::info!(target:"test", "p0 goes down"); + drop(p0); + tracing::info!(target:"test", "remaining nodes learn that [v0,v1] are TIER1 nodes"); + let chain_info = peer_manager::testonly::make_chain_info(&chain, &[&v0, &v1]); + for pm in [&v0, &v1, &p1, &hub] { + pm.set_chain_info(chain_info.clone()).await; + } + tracing::info!(target:"test", "TIER1 connections get established: v0 -> p1 <- v1."); + establish_connections(&clock.clock(), &[&v0, &v1, &p1, &hub]).await; + tracing::info!(target:"test", "Send message v1 -> v0 over TIER1."); + send_tier1_message(rng, &v1, &v0).await; + + // Advance time, so that the new AccountsData has newer timestamp. + clock.advance(time::Duration::hours(1)); + + tracing::info!(target:"test", "p1 goes down."); + drop(p1); + tracing::info!(target:"test", "p0 goes up and learns that [v0,v1] are TIER1 nodes."); + let p0 = start_pm(clock.clock(), TestDB::new(), p0cfg.clone(), chain.clone()).await; + p0.set_chain_info(chain_info).await; + hub.connect_to(&p0.peer_info(), tcp::Tier::T2).await; + tracing::info!(target:"test", "TIER1 connections get established: v0 -> p0 <- v1."); + establish_connections(&clock.clock(), &[&v0, &v1, &p0, &hub]).await; + tracing::info!(target:"test", "Send message v1 -> v0 over TIER1."); + send_tier1_message(rng, &v1, &v0).await; + + drop(hub); + drop(v0); + drop(v1); + drop(p0); +} diff --git a/chain/network/src/private_actix.rs b/chain/network/src/private_actix.rs index 89eccca7478..aa2aa08a806 100644 --- a/chain/network/src/private_actix.rs +++ b/chain/network/src/private_actix.rs @@ -11,6 +11,8 @@ pub(crate) enum RegisterPeerError { Banned, PoolError(connection::PoolError), ConnectionLimitExceeded, + NotTier1Peer, + Tier1InboundDisabled, InvalidEdge, } diff --git a/chain/network/src/tcp.rs b/chain/network/src/tcp.rs index 125fe74a5b5..8b330ebcfe0 100644 --- a/chain/network/src/tcp.rs +++ b/chain/network/src/tcp.rs @@ -2,10 +2,24 @@ use crate::network_protocol::PeerInfo; use anyhow::{anyhow, Context as _}; use near_primitives::network::PeerId; +/// TCP connections established by a node belong to different logical networks (aka tiers), +/// which serve different purpose. +// TODO(gprusak): add a link to the design on github docs (but first write those docs). +#[derive(Clone, Copy, Debug, PartialEq, Eq, strum::AsRefStr)] +pub enum Tier { + /// Tier1 connections are established between the BFT consensus participants (or their proxies) + /// and are reserved exclusively for exchanging BFT consensus messages. + T1, + /// Tier2 connections form a P2P gossip network, which is used for everything, except the BFT + /// consensus messages. Also, Tier1 peer discovery actually happens on Tier2 network, i.e. + /// Tier2 network is necessary to bootstrap Tier1 connections. + T2, +} + #[derive(Clone, Debug)] pub(crate) enum StreamType { Inbound, - Outbound { peer_id: PeerId }, + Outbound { peer_id: PeerId, tier: Tier }, } #[derive(Debug)] @@ -42,13 +56,13 @@ impl Socket { Self(socket) } - pub async fn connect(self, peer_info: &PeerInfo) -> Stream { + pub async fn connect(self, peer_info: &PeerInfo, tier: Tier) -> Stream { // TODO(gprusak): this could replace Stream::connect, // however this means that we will have to replicate everything // that tokio::net::TcpStream sets on the socket. // As long as Socket::connect is test-only we may ignore that. let stream = self.0.connect(peer_info.addr.unwrap()).await.unwrap(); - Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone() }).unwrap() + Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone(), tier }).unwrap() } } @@ -57,7 +71,7 @@ impl Stream { Ok(Self { peer_addr: stream.peer_addr()?, local_addr: stream.local_addr()?, stream, type_ }) } - pub async fn connect(peer_info: &PeerInfo) -> anyhow::Result { + pub async fn connect(peer_info: &PeerInfo, tier: Tier) -> anyhow::Result { let addr = peer_info.addr.ok_or(anyhow!("Trying to connect to peer with no public address"))?; // The `connect` may take several minutes. This happens when the @@ -74,13 +88,13 @@ impl Stream { ) .await? .context("TcpStream::connect()")?; - Ok(Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone() })?) + Ok(Stream::new(stream, StreamType::Outbound { peer_id: peer_info.id.clone(), tier })?) } /// Establishes a loopback TCP connection to localhost with random ports. /// Returns a pair of streams: (outbound,inbound). #[cfg(test)] - pub async fn loopback(peer_id: PeerId) -> (Stream, Stream) { + pub async fn loopback(peer_id: PeerId, tier: Tier) -> (Stream, Stream) { let localhost = std::net::SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 0); let mut listener = Listener::bind(localhost).await.unwrap(); let peer_info = PeerInfo { @@ -88,7 +102,8 @@ impl Stream { addr: Some(listener.0.local_addr().unwrap()), account_id: None, }; - let (outbound, inbound) = tokio::join!(Stream::connect(&peer_info), listener.accept(),); + let (outbound, inbound) = + tokio::join!(Stream::connect(&peer_info, tier), listener.accept()); (outbound.unwrap(), inbound.unwrap()) } diff --git a/chain/network/src/testonly/fake_client.rs b/chain/network/src/testonly/fake_client.rs index 36c41d49644..051b8c86b1f 100644 --- a/chain/network/src/testonly/fake_client.rs +++ b/chain/network/src/testonly/fake_client.rs @@ -20,6 +20,7 @@ pub enum Event { AnnounceAccount(Vec<(AnnounceAccount, Option)>), Block(Block), BlockHeaders(Vec), + BlockApproval(Approval, PeerId), BlockHeadersRequest(Vec), BlockRequest(CryptoHash), Challenge(Challenge), @@ -73,8 +74,8 @@ impl client::Client for Fake { unimplemented!(); } - async fn block_approval(&self, _approval: Approval, _peer_id: PeerId) { - unimplemented!(); + async fn block_approval(&self, approval: Approval, peer_id: PeerId) { + self.event_sink.push(Event::BlockApproval(approval, peer_id)); } async fn transaction(&self, transaction: SignedTransaction, _is_forwarded: bool) { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 47f7fdf4ec4..dd47517b116 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -366,7 +366,7 @@ pub struct NetworkInfo { pub tier1_accounts: Vec>, } -#[derive(Debug, actix::MessageResponse)] +#[derive(Debug, actix::MessageResponse, PartialEq, Eq)] pub enum NetworkResponses { NoResponse, PingPongInfo { pings: Vec, pongs: Vec }, @@ -488,7 +488,6 @@ mod tests { fn test_enum_size() { assert_size!(PeerType); assert_size!(RoutedMessageBody); - assert_size!(PeerIdOrHash); assert_size!(KnownPeerStatus); assert_size!(ReasonForBan); } diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 72d16c3a2a4..c23a01b7d8c 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -223,7 +223,7 @@ impl StateMachine { debug!(target: "network", num_prev_actions, action = ?action_clone, "runner.rs: Action"); let pm = info.get_node(from)?.actix.addr.clone(); let peer_info = info.runner.test_config[to].peer_info(); - match tcp::Stream::connect(&peer_info).await { + match tcp::Stream::connect(&peer_info, tcp::Tier::T2).await { Ok(stream) => { pm.send(PeerManagerMessageRequest::OutboundTcpConnect(stream).with_span_context()).await?; }, Err(err) => tracing::debug!("tcp::Stream::connect({peer_info}): {err}"), }