From 0c2a0fd9e75530356656ab8027b4d7f665cc48fc Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Mon, 27 Sep 2021 16:30:02 +0200 Subject: [PATCH] Log info about low connectivity and unreachable validators (#3916) * Attempt to add log stats to gossip-support. * WIP: Keep track of connected validators. * Clarify metric. * WIP: Make gossip support report connectivity. * WIP: Fixing tests. * Fix network bridge + integrate in overseer. * Consistent naming. * Fix logic error * cargo fmt * Pretty logs. * cargo fmt * Use `Delay` to trigger periodic checks. * fmt * Fix warning for authority set size of 1. * More correct ratio report if there are no resolved validators. * Prettier rendering of empty set. * Fix typo. * Another typo. * Don't check on every leaf update. * Make compatible with older rustc. * Fix tests. * Demote warning. --- Cargo.lock | 5 + node/network/bridge/src/lib.rs | 23 +- node/network/bridge/src/tests.rs | 16 +- .../network/bridge/src/validator_discovery.rs | 66 ++- node/network/gossip-support/Cargo.toml | 5 + node/network/gossip-support/src/lib.rs | 454 +++++++++++------- node/network/gossip-support/src/tests.rs | 342 ++++++++----- node/network/protocol/src/lib.rs | 20 + node/overseer/src/lib.rs | 2 +- node/service/src/overseer.rs | 7 +- node/subsystem-types/src/messages.rs | 18 +- 11 files changed, 639 insertions(+), 319 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 995711fabae8..6d3227cc50f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6063,7 +6063,10 @@ name = "polkadot-gossip-support" version = "0.9.9" dependencies = [ "assert_matches", + "async-trait", "futures 0.3.17", + "futures-timer 3.0.2", + "lazy_static", "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -6071,11 +6074,13 @@ dependencies = [ "polkadot-primitives", "rand 0.8.4", "rand_chacha 0.3.1", + "sc-network", "sp-application-crypto", "sp-consensus-babe", "sp-core", "sp-keyring", "sp-keystore", + "sp-tracing", "tracing", ] diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 608a790d2931..0ca63f027fa2 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -198,7 +198,7 @@ impl metrics::Metrics for Metrics { prometheus::GaugeVec::new( prometheus::Opts::new( "parachain_desired_peer_count", - "The number of peers that the local node is expected to connect to on a parachain-related peer-set", + "The number of peers that the local node is expected to connect to on a parachain-related peer-set (either including or not including unresolvable authorities, depending on whether `ConnectToValidators` or `ConnectToValidatorsResolved` was used.)", ), &["protocol"] )?, @@ -552,6 +552,27 @@ where network_service = ns; authority_discovery_service = ads; } + NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set, + } => { + tracing::trace!( + target: LOG_TARGET, + action = "ConnectToPeers", + peer_set = ?peer_set, + ?validator_addrs, + "Received a resolved validator connection request", + ); + + metrics.note_desired_peer_count(peer_set, validator_addrs.len()); + + let all_addrs = validator_addrs.into_iter().flatten().collect(); + network_service = validator_discovery.on_resolved_request( + all_addrs, + peer_set, + network_service, + ).await; + } NetworkBridgeMessage::NewGossipTopology { our_neighbors, } => { diff --git a/node/network/bridge/src/tests.rs b/node/network/bridge/src/tests.rs index 89551de89c25..40ade08df2b0 100644 --- a/node/network/bridge/src/tests.rs +++ b/node/network/bridge/src/tests.rs @@ -37,7 +37,8 @@ use polkadot_primitives::v1::AuthorityDiscoveryId; use polkadot_subsystem::{ jaeger, messages::{ - ApprovalDistributionMessage, BitfieldDistributionMessage, StatementDistributionMessage, + ApprovalDistributionMessage, BitfieldDistributionMessage, GossipSupportMessage, + StatementDistributionMessage, }, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, }; @@ -337,6 +338,13 @@ async fn assert_sends_validation_event_to_all( ApprovalDistributionMessage::NetworkBridgeUpdateV1(e) ) if e == event.focus().expect("could not focus message") ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::GossipSupport( + GossipSupportMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); } async fn assert_sends_collation_event_to_all( @@ -1189,7 +1197,7 @@ fn send_messages_to_peers() { fn spread_event_to_subsystems_is_up_to_date() { // Number of subsystems expected to be interested in a network event, // and hence the network event broadcasted to. - const EXPECTED_COUNT: usize = 3; + const EXPECTED_COUNT: usize = 4; let mut cnt = 0_usize; for msg in AllMessages::dispatch_iter(NetworkBridgeEvent::PeerDisconnected(PeerId::random())) { @@ -1219,7 +1227,9 @@ fn spread_event_to_subsystems_is_up_to_date() { AllMessages::ApprovalDistribution(_) => { cnt += 1; }, - AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"), + AllMessages::GossipSupport(_) => { + cnt += 1; + }, AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"), AllMessages::DisputeParticipation(_) => unreachable!("Not interested in network events"), diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs index 2d6d21668983..7f768044ad50 100644 --- a/node/network/bridge/src/validator_discovery.rs +++ b/node/network/bridge/src/validator_discovery.rs @@ -47,6 +47,44 @@ impl Service { Self { state: Default::default(), _phantom: PhantomData } } + /// Connect to already resolved addresses: + pub async fn on_resolved_request( + &mut self, + newly_requested: HashSet, + peer_set: PeerSet, + mut network_service: N, + ) -> N { + let state = &mut self.state[peer_set]; + // clean up revoked requests + let multiaddr_to_remove: HashSet<_> = + state.previously_requested.difference(&newly_requested).cloned().collect(); + let multiaddr_to_add: HashSet<_> = + newly_requested.difference(&state.previously_requested).cloned().collect(); + state.previously_requested = newly_requested; + + tracing::debug!( + target: LOG_TARGET, + ?peer_set, + added = multiaddr_to_add.len(), + removed = multiaddr_to_remove.len(), + "New ConnectToValidators resolved request", + ); + // ask the network to connect to these nodes and not disconnect + // from them until removed from the set + if let Err(e) = network_service + .add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add) + .await + { + tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); + } + // the addresses are known to be valid + let _ = network_service + .remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove) + .await; + + network_service + } + /// On a new connection request, a peer set update will be issued. /// It will ask the network to connect to the validators and not disconnect /// from them at least until the next request is issued for the same peer set. @@ -59,7 +97,7 @@ impl Service { validator_ids: Vec, peer_set: PeerSet, failed: oneshot::Sender, - mut network_service: N, + network_service: N, mut authority_discovery_service: AD, ) -> (N, AD) { // collect multiaddress of validators @@ -82,39 +120,19 @@ impl Service { } } - let state = &mut self.state[peer_set]; - // clean up revoked requests - let multiaddr_to_remove: HashSet<_> = - state.previously_requested.difference(&newly_requested).cloned().collect(); - let multiaddr_to_add: HashSet<_> = - newly_requested.difference(&state.previously_requested).cloned().collect(); - state.previously_requested = newly_requested; - tracing::debug!( target: LOG_TARGET, ?peer_set, ?requested, - added = multiaddr_to_add.len(), - removed = multiaddr_to_remove.len(), ?failed_to_resolve, "New ConnectToValidators request", ); - // ask the network to connect to these nodes and not disconnect - // from them until removed from the set - if let Err(e) = network_service - .add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add) - .await - { - tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress"); - } - // the addresses are known to be valid - let _ = network_service - .remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove) - .await; + + let r = self.on_resolved_request(newly_requested, peer_set, network_service).await; let _ = failed.send(failed_to_resolve); - (network_service, authority_discovery_service) + (r, authority_discovery_service) } } diff --git a/node/network/gossip-support/Cargo.toml b/node/network/gossip-support/Cargo.toml index cd31b66cc16f..398f5532125d 100644 --- a/node/network/gossip-support/Cargo.toml +++ b/node/network/gossip-support/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-subsystem = { path = "../../subsystem" } @@ -15,6 +16,7 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-primitives = { path = "../../../primitives" } futures = "0.3.17" +futures-timer = "3.0.2" rand = { version = "0.8.3", default-features = false } rand_chacha = { version = "0.3.1", default-features = false } tracing = "0.1.28" @@ -22,7 +24,10 @@ tracing = "0.1.28" [dev-dependencies] sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } assert_matches = "1.4.0" +async-trait = "0.1.51" +lazy_static = "1.4.0" diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index 52c6ffc165be..65af6cb12756 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -24,20 +24,35 @@ //! in this graph will be forwarded to the network bridge with //! the `NetworkBridgeMessage::NewGossipTopology` message. -use futures::{channel::oneshot, FutureExt as _}; -use polkadot_node_network_protocol::peer_set::PeerSet; +use std::{ + collections::HashMap, + fmt, + time::{Duration, Instant}, +}; + +use futures::{channel::oneshot, select, FutureExt as _}; +use futures_timer::Delay; +use rand::{seq::SliceRandom as _, SeedableRng}; +use rand_chacha::ChaCha20Rng; + +use sc_network::Multiaddr; +use sp_application_crypto::{AppKey, Public}; +use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; + +use polkadot_node_network_protocol::{ + authority_discovery::AuthorityDiscovery, peer_set::PeerSet, v1::GossipSuppportNetworkMessage, + PeerId, +}; use polkadot_node_subsystem::{ - messages::{GossipSupportMessage, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest}, + messages::{ + GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage, + RuntimeApiRequest, + }, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, SubsystemError, }; use polkadot_node_subsystem_util as util; use polkadot_primitives::v1::{AuthorityDiscoveryId, Hash, SessionIndex}; -use rand::{seq::SliceRandom as _, SeedableRng}; -use rand_chacha::ChaCha20Rng; -use sp_application_crypto::{AppKey, Public}; -use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; -use std::time::{Duration, Instant}; #[cfg(test)] mod tests; @@ -56,13 +71,13 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5); /// https://github.com/paritytech/substrate/blob/fc49802f263529160635471c8a17888846035f5d/client/authority-discovery/src/lib.rs#L88 const LOW_CONNECTIVITY_WARN_DELAY: Duration = Duration::from_secs(600); +/// If connectivity is lower than this in percent, issue warning in logs. +const LOW_CONNECTIVITY_WARN_THRESHOLD: usize = 90; + /// The Gossip Support subsystem. -pub struct GossipSupport { +pub struct GossipSupport { keystore: SyncCryptoStorePtr, -} -#[derive(Default)] -struct State { last_session_index: Option, // Some(timestamp) if we failed to resolve // at least a third of authorities the last time. @@ -75,43 +90,73 @@ struct State { /// potential sequence of failed attempts. It will be cleared once we reached >2/3 /// connectivity. failure_start: Option, + + /// Successfully resolved connections + /// + /// waiting for actual connection. + resolved_authorities: HashMap>, + + /// Actually connected authorities. + connected_authorities: HashMap, + /// By `PeerId`. + /// + /// Needed for efficient handling of disconnect events. + connected_authorities_by_peer_id: HashMap, + /// Authority discovery service. + authority_discovery: AD, } -impl GossipSupport { +impl GossipSupport +where + AD: AuthorityDiscovery, +{ /// Create a new instance of the [`GossipSupport`] subsystem. - pub fn new(keystore: SyncCryptoStorePtr) -> Self { - Self { keystore } - } - - async fn run(self, ctx: Context) - where - Context: SubsystemContext, - Context: overseer::SubsystemContext, - { - let mut state = State::default(); - self.run_inner(ctx, &mut state).await; + pub fn new(keystore: SyncCryptoStorePtr, authority_discovery: AD) -> Self { + Self { + keystore, + last_session_index: None, + last_failure: None, + failure_start: None, + resolved_authorities: HashMap::new(), + connected_authorities: HashMap::new(), + connected_authorities_by_peer_id: HashMap::new(), + authority_discovery, + } } - async fn run_inner(self, mut ctx: Context, state: &mut State) + async fn run(mut self, mut ctx: Context) -> Self where Context: SubsystemContext, Context: overseer::SubsystemContext, { - let Self { keystore } = self; + fn get_connectivity_check_delay() -> Delay { + Delay::new(LOW_CONNECTIVITY_WARN_DELAY) + } + let mut next_connectivity_check = get_connectivity_check_delay().fuse(); loop { - let message = match ctx.recv().await { - Ok(message) => message, - Err(e) => { - tracing::debug!( - target: LOG_TARGET, - err = ?e, - "Failed to receive a message from Overseer, exiting", - ); - return - }, - }; + let message = select!( + _ = next_connectivity_check => { + self.check_connectivity(); + next_connectivity_check = get_connectivity_check_delay().fuse(); + continue + } + result = ctx.recv().fuse() => + match result { + Ok(message) => message, + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + err = ?e, + "Failed to receive a message from Overseer, exiting", + ); + return self + }, + } + ); match message { - FromOverseer::Communication { .. } => {}, + FromOverseer::Communication { + msg: GossipSupportMessage::NetworkBridgeUpdateV1(ev), + } => self.handle_connect_disconnect(ev), FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. @@ -119,14 +164,190 @@ impl GossipSupport { tracing::trace!(target: LOG_TARGET, "active leaves signal"); let leaves = activated.into_iter().map(|a| a.hash); - if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await { + if let Err(e) = self.handle_active_leaves(&mut ctx, leaves).await { tracing::debug!(target: LOG_TARGET, error = ?e); } }, FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {}, - FromOverseer::Signal(OverseerSignal::Conclude) => return, + FromOverseer::Signal(OverseerSignal::Conclude) => return self, + } + } + } + + /// 1. Determine if the current session index has changed. + /// 2. If it has, determine relevant validators + /// and issue a connection request. + async fn handle_active_leaves( + &mut self, + ctx: &mut Context, + leaves: impl Iterator, + ) -> Result<(), util::Error> + where + Context: SubsystemContext, + Context: overseer::SubsystemContext, + { + for leaf in leaves { + let current_index = + util::request_session_index_for_child(leaf, ctx.sender()).await.await??; + let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default(); + let force_request = since_failure >= BACKOFF_DURATION; + let leaf_session = Some((current_index, leaf)); + let maybe_new_session = match self.last_session_index { + Some(i) if current_index <= i => None, + _ => leaf_session, + }; + + let maybe_issue_connection = + if force_request { leaf_session } else { maybe_new_session }; + + if let Some((session_index, relay_parent)) = maybe_issue_connection { + let is_new_session = maybe_new_session.is_some(); + if is_new_session { + tracing::debug!( + target: LOG_TARGET, + %session_index, + "New session detected", + ); + } + + let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?; + let our_index = ensure_i_am_an_authority(&self.keystore, &all_authorities).await?; + let other_authorities = { + let mut authorities = all_authorities.clone(); + authorities.swap_remove(our_index); + authorities + }; + + self.issue_connection_request(ctx, other_authorities).await?; + + if is_new_session { + self.last_session_index = Some(session_index); + update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?; + } + } + } + + Ok(()) + } + + async fn issue_connection_request( + &mut self, + ctx: &mut Context, + authorities: Vec, + ) -> Result<(), util::Error> + where + Context: SubsystemContext, + Context: overseer::SubsystemContext, + { + let num = authorities.len(); + let mut validator_addrs = Vec::with_capacity(authorities.len()); + let mut failures = 0; + let mut resolved = HashMap::with_capacity(authorities.len()); + for authority in authorities { + if let Some(addrs) = + self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await + { + validator_addrs.push(addrs.clone()); + resolved.insert(authority, addrs); + } else { + failures += 1; + tracing::debug!( + target: LOG_TARGET, + "Couldn't resolve addresses of authority: {:?}", + authority + ); } } + self.resolved_authorities = resolved; + tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); + + ctx.send_message(NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, + peer_set: PeerSet::Validation, + }) + .await; + + // issue another request for the same session + // if at least a third of the authorities were not resolved. + if 3 * failures >= num { + let timestamp = Instant::now(); + match self.failure_start { + None => self.failure_start = Some(timestamp), + Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => { + tracing::warn!( + target: LOG_TARGET, + connected = ?(num - failures), + target = ?num, + "Low connectivity - authority lookup failed for too many validators." + ); + }, + Some(_) => { + tracing::debug!( + target: LOG_TARGET, + connected = ?(num - failures), + target = ?num, + "Low connectivity (due to authority lookup failures) - expected on startup." + ); + }, + } + self.last_failure = Some(timestamp); + } else { + self.last_failure = None; + self.failure_start = None; + }; + + Ok(()) + } + + fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent) { + match ev { + NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => { + if let Some(authority) = o_authority { + self.connected_authorities.insert(authority.clone(), peer_id); + self.connected_authorities_by_peer_id.insert(peer_id, authority); + } + }, + NetworkBridgeEvent::PeerDisconnected(peer_id) => { + if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) { + self.connected_authorities.remove(&authority); + } + }, + NetworkBridgeEvent::OurViewChange(_) => {}, + NetworkBridgeEvent::PeerViewChange(_, _) => {}, + NetworkBridgeEvent::NewGossipTopology(_) => {}, + NetworkBridgeEvent::PeerMessage(_, v) => { + match v {}; + }, + } + } + + /// Check connectivity and report on it in logs. + fn check_connectivity(&mut self) { + let absolute_connected = self.connected_authorities.len(); + let absolute_resolved = self.resolved_authorities.len(); + let connected_ratio = + (100 * absolute_connected).checked_div(absolute_resolved).unwrap_or(100); + let unconnected_authorities = self + .resolved_authorities + .iter() + .filter(|(a, _)| !self.connected_authorities.contains_key(a)); + // TODO: Make that warning once connectivity issues are fixed (no point in warning, if + // we already know it is broken. + // https://github.com/paritytech/polkadot/issues/3921 + if connected_ratio <= LOW_CONNECTIVITY_WARN_THRESHOLD { + tracing::debug!( + target: LOG_TARGET, + "Connectivity seems low, we are only connected to {}% of available validators (see debug logs for details)", connected_ratio + ); + } + tracing::debug!( + target: LOG_TARGET, + ?connected_ratio, + ?absolute_connected, + ?absolute_resolved, + unconnected_authorities = %PrettyAuthorities(unconnected_authorities), + "Connectivity Report" + ); } } @@ -161,22 +382,6 @@ async fn ensure_i_am_an_authority( Err(util::Error::NotAValidator) } -/// A helper function for making a `ConnectToValidators` request. -async fn connect_to_authorities( - ctx: &mut Context, - validator_ids: Vec, - peer_set: PeerSet, -) -> oneshot::Receiver -where - Context: SubsystemContext, - Context: overseer::SubsystemContext, -{ - let (failed, failed_rx) = oneshot::channel(); - ctx.send_message(NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, failed }) - .await; - failed_rx -} - /// We partition the list of all sorted `authorities` into `sqrt(len)` groups of `sqrt(len)` size /// and form a matrix where each validator is connected to all validators in its row and column. /// This is similar to `[web3]` research proposed topology, except for the groups are not parachain @@ -253,119 +458,11 @@ fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator row_neighbors.chain(column_neighbors).filter(move |i| *i != our_index) } -impl State { - /// 1. Determine if the current session index has changed. - /// 2. If it has, determine relevant validators - /// and issue a connection request. - async fn handle_active_leaves( - &mut self, - ctx: &mut Context, - keystore: &SyncCryptoStorePtr, - leaves: impl Iterator, - ) -> Result<(), util::Error> - where - Context: SubsystemContext, - Context: overseer::SubsystemContext, - { - for leaf in leaves { - let current_index = - util::request_session_index_for_child(leaf, ctx.sender()).await.await??; - let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default(); - let force_request = since_failure >= BACKOFF_DURATION; - let leaf_session = Some((current_index, leaf)); - let maybe_new_session = match self.last_session_index { - Some(i) if current_index <= i => None, - _ => leaf_session, - }; - - let maybe_issue_connection = - if force_request { leaf_session } else { maybe_new_session }; - - if let Some((session_index, relay_parent)) = maybe_issue_connection { - let is_new_session = maybe_new_session.is_some(); - if is_new_session { - tracing::debug!( - target: LOG_TARGET, - %session_index, - "New session detected", - ); - } - - let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?; - let our_index = ensure_i_am_an_authority(keystore, &all_authorities).await?; - let other_authorities = { - let mut authorities = all_authorities.clone(); - authorities.swap_remove(our_index); - authorities - }; - - self.issue_connection_request(ctx, other_authorities).await?; - - if is_new_session { - self.last_session_index = Some(session_index); - update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?; - } - } - } - - Ok(()) - } - - async fn issue_connection_request( - &mut self, - ctx: &mut Context, - authorities: Vec, - ) -> Result<(), util::Error> - where - Context: SubsystemContext, - Context: overseer::SubsystemContext, - { - let num = authorities.len(); - tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request"); - - let failures = connect_to_authorities(ctx, authorities, PeerSet::Validation).await; - - // we await for the request to be processed - // this is fine, it should take much less time than one session - let failures = failures.await.unwrap_or(num); - - // issue another request for the same session - // if at least a third of the authorities were not resolved - if failures >= num / 3 { - let timestamp = Instant::now(); - match self.failure_start { - None => self.failure_start = Some(timestamp), - Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => { - tracing::warn!( - target: LOG_TARGET, - connected = ?(num - failures), - target = ?num, - "Low connectivity - authority lookup failed for too many validators." - ); - }, - Some(_) => { - tracing::debug!( - target: LOG_TARGET, - connected = ?(num - failures), - target = ?num, - "Low connectivity (due to authority lookup failures) - expected on startup." - ); - }, - } - self.last_failure = Some(timestamp); - } else { - self.last_failure = None; - self.failure_start = None; - }; - - Ok(()) - } -} - -impl overseer::Subsystem for GossipSupport +impl overseer::Subsystem for GossipSupport where Context: SubsystemContext, Context: overseer::SubsystemContext, + AD: AuthorityDiscovery + Clone, { fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self.run(ctx).map(|_| Ok(())).boxed(); @@ -373,3 +470,28 @@ where SpawnedSubsystem { name: "gossip-support-subsystem", future } } } + +/// Helper struct to get a nice rendering of unreachable authorities. +struct PrettyAuthorities(I); + +impl<'a, I> fmt::Display for PrettyAuthorities +where + I: Iterator)> + Clone, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut authorities = self.0.clone().peekable(); + if authorities.peek().is_none() { + write!(f, "None")?; + } else { + write!(f, "\n")?; + } + for (authority, addrs) in authorities { + write!(f, "{}:\n", authority)?; + for addr in addrs { + write!(f, " {}\n", addr)?; + } + write!(f, "\n")?; + } + Ok(()) + } +} diff --git a/node/network/gossip-support/src/tests.rs b/node/network/gossip-support/src/tests.rs index 1ca76728adb9..fc25b137d3c7 100644 --- a/node/network/gossip-support/src/tests.rs +++ b/node/network/gossip-support/src/tests.rs @@ -16,7 +16,17 @@ //! Unit tests for Gossip Support Subsystem. -use super::*; +use std::{sync::Arc, time::Duration}; + +use assert_matches::assert_matches; +use async_trait::async_trait; +use futures::{executor, future, Future}; +use lazy_static::lazy_static; + +use sc_network::multiaddr::Protocol; +use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch}; +use sp_keyring::Sr25519Keyring; + use polkadot_node_subsystem::{ jaeger, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest}, @@ -24,47 +34,124 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; -use sp_consensus_babe::{AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch}; -use sp_keyring::Sr25519Keyring; use test_helpers::mock::make_ferdie_keystore; -use assert_matches::assert_matches; -use futures::{executor, future, Future}; -use std::{sync::Arc, time::Duration}; +use super::*; + +lazy_static! { + static ref MOCK_AUTHORITY_DISCOVERY: MockAuthorityDiscovery = MockAuthorityDiscovery::new(); + static ref AUTHORITIES: Vec = { + let mut authorities = OTHER_AUTHORITIES.clone(); + authorities.push(Sr25519Keyring::Ferdie.public().into()); + authorities + }; + static ref OTHER_AUTHORITIES: Vec = vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + Sr25519Keyring::Eve.public().into(), + Sr25519Keyring::One.public().into(), + Sr25519Keyring::Two.public().into(), + ]; + static ref NEIGHBORS: Vec = vec![ + Sr25519Keyring::Two.public().into(), + Sr25519Keyring::Charlie.public().into(), + Sr25519Keyring::Eve.public().into(), + ]; +} type VirtualOverseer = test_helpers::TestSubsystemContextHandle; -fn test_harness>( - mut state: State, +#[derive(Debug, Clone)] +struct MockAuthorityDiscovery { + addrs: HashMap>, + authorities: HashMap, +} + +impl MockAuthorityDiscovery { + fn new() -> Self { + let authorities: HashMap<_, _> = + AUTHORITIES.clone().into_iter().map(|a| (PeerId::random(), a)).collect(); + let addrs = authorities + .clone() + .into_iter() + .map(|(p, a)| { + let multiaddr = Multiaddr::empty().with(Protocol::P2p(p.into())); + (a, vec![multiaddr]) + }) + .collect(); + Self { addrs, authorities } + } +} + +#[async_trait] +impl AuthorityDiscovery for MockAuthorityDiscovery { + async fn get_addresses_by_authority_id( + &mut self, + authority: polkadot_primitives::v1::AuthorityDiscoveryId, + ) -> Option> { + self.addrs.get(&authority).cloned() + } + async fn get_authority_id_by_peer_id( + &mut self, + peer_id: polkadot_node_network_protocol::PeerId, + ) -> Option { + self.authorities.get(&peer_id).cloned() + } +} + +async fn get_other_authorities_addrs() -> Vec> { + let mut addrs = Vec::with_capacity(OTHER_AUTHORITIES.len()); + let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + for authority in OTHER_AUTHORITIES.iter().cloned() { + if let Some(addr) = discovery.get_addresses_by_authority_id(authority).await { + addrs.push(addr); + } + } + addrs +} + +async fn get_other_authorities_addrs_map() -> HashMap> { + let mut addrs = HashMap::with_capacity(OTHER_AUTHORITIES.len()); + let mut discovery = MOCK_AUTHORITY_DISCOVERY.clone(); + for authority in OTHER_AUTHORITIES.iter().cloned() { + if let Some(addr) = discovery.get_addresses_by_authority_id(authority.clone()).await { + addrs.insert(authority, addr); + } + } + addrs +} + +fn make_subsystem() -> GossipSupport { + GossipSupport::new(make_ferdie_keystore(), MOCK_AUTHORITY_DISCOVERY.clone()) +} + +fn test_harness, AD: AuthorityDiscovery>( + subsystem: GossipSupport, test_fn: impl FnOnce(VirtualOverseer) -> T, -) -> State { +) -> GossipSupport { let pool = sp_core::testing::TaskExecutor::new(); let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let keystore = make_ferdie_keystore(); - let subsystem = GossipSupport::new(keystore); - { - let subsystem = subsystem.run_inner(context, &mut state); - - let test_fut = test_fn(virtual_overseer); - - futures::pin_mut!(test_fut); - futures::pin_mut!(subsystem); - - executor::block_on(future::join( - async move { - let mut overseer = test_fut.await; - overseer - .send(FromOverseer::Signal(OverseerSignal::Conclude)) - .timeout(TIMEOUT) - .await - .expect("Conclude send timeout"); - }, - subsystem, - )); - } - - state + let subsystem = subsystem.run(context); + + let test_fut = test_fn(virtual_overseer); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + let (_, subsystem) = executor::block_on(future::join( + async move { + let mut overseer = test_fut.await; + overseer + .send(FromOverseer::Signal(OverseerSignal::Conclude)) + .timeout(TIMEOUT) + .await + .expect("Conclude send timeout"); + }, + subsystem, + )); + subsystem } const TIMEOUT: Duration = Duration::from_millis(100); @@ -91,32 +178,6 @@ async fn overseer_recv(overseer: &mut VirtualOverseer) -> AllMessages { msg } -fn authorities() -> Vec { - let mut authorities = other_authorities(); - authorities.push(Sr25519Keyring::Ferdie.public().into()); - authorities -} - -// Authorities other than ourselves: -fn other_authorities() -> Vec { - vec![ - Sr25519Keyring::Alice.public().into(), - Sr25519Keyring::Bob.public().into(), - Sr25519Keyring::Charlie.public().into(), - Sr25519Keyring::Eve.public().into(), - Sr25519Keyring::One.public().into(), - Sr25519Keyring::Two.public().into(), - ] -} - -fn neighbors() -> Vec { - vec![ - Sr25519Keyring::Two.public().into(), - Sr25519Keyring::Charlie.public().into(), - Sr25519Keyring::Eve.public().into(), - ] -} - async fn test_neighbors(overseer: &mut VirtualOverseer) { assert_matches!( overseer_recv(overseer).await, @@ -145,7 +206,7 @@ async fn test_neighbors(overseer: &mut VirtualOverseer) { }) => { let mut got: Vec<_> = our_neighbors.into_iter().collect(); got.sort(); - assert_eq!(got, neighbors()); + assert_eq!(got, NEIGHBORS.clone()); } ); } @@ -153,7 +214,7 @@ async fn test_neighbors(overseer: &mut VirtualOverseer) { #[test] fn issues_a_connection_request_on_new_session() { let hash = Hash::repeat_byte(0xAA); - let state = test_harness(State::default(), |mut virtual_overseer| async move { + let state = test_harness(make_subsystem(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; overseer_signal_active_leaves(overseer, hash).await; assert_matches!( @@ -173,20 +234,18 @@ fn issues_a_connection_request_on_new_session() { RuntimeApiRequest::Authorities(tx), )) => { assert_eq!(relay_parent, hash); - tx.send(Ok(authorities())).unwrap(); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); } ); assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, peer_set, - failed, }) => { - assert_eq!(validator_ids, other_authorities()); + assert_eq!(validator_addrs, get_other_authorities_addrs().await); assert_eq!(peer_set, PeerSet::Validation); - failed.send(0).unwrap(); } ); @@ -241,20 +300,18 @@ fn issues_a_connection_request_on_new_session() { RuntimeApiRequest::Authorities(tx), )) => { assert_eq!(relay_parent, hash); - tx.send(Ok(authorities())).unwrap(); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); } ); assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + validator_addrs, peer_set, - failed, }) => { - assert_eq!(validator_ids, other_authorities()); + assert_eq!(validator_addrs, get_other_authorities_addrs().await); assert_eq!(peer_set, PeerSet::Validation); - failed.send(0).unwrap(); } ); @@ -266,54 +323,96 @@ fn issues_a_connection_request_on_new_session() { assert!(state.last_failure.is_none()); } +#[test] +fn test_log_output() { + sp_tracing::try_init_simple(); + let alice: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into(); + let bob = Sr25519Keyring::Bob.public().into(); + let unconnected_authorities = { + let mut m = HashMap::new(); + let peer_id = PeerId::random(); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + let addrs = vec![addr.clone(), addr]; + m.insert(alice, addrs); + let peer_id = PeerId::random(); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + let addrs = vec![addr.clone(), addr]; + m.insert(bob, addrs); + m + }; + tracing::debug!( + target: LOG_TARGET, + unconnected_authorities = %PrettyAuthorities(unconnected_authorities.iter()), + "Connectivity Report" + ); +} + #[test] fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { let hash = Hash::repeat_byte(0xAA); - let mut state = test_harness(State::default(), |mut virtual_overseer| async move { - let overseer = &mut virtual_overseer; - overseer_signal_active_leaves(overseer, hash).await; - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::SessionIndexForChild(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(1)).unwrap(); - } - ); - assert_matches!( - overseer_recv(overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - relay_parent, - RuntimeApiRequest::Authorities(tx), - )) => { - assert_eq!(relay_parent, hash); - tx.send(Ok(authorities())).unwrap(); - } - ); - - assert_matches!( - overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - failed, - }) => { - assert_eq!(validator_ids, other_authorities()); - assert_eq!(peer_set, PeerSet::Validation); - failed.send(2).unwrap(); - } - ); - - test_neighbors(overseer).await; - - virtual_overseer - }); + let mut state = make_subsystem(); + // There will be two lookup failures: + let alice = Sr25519Keyring::Alice.public().into(); + let bob = Sr25519Keyring::Bob.public().into(); + let alice_addr = state.authority_discovery.addrs.remove(&alice); + state.authority_discovery.addrs.remove(&bob); + + let mut state = { + let alice = alice.clone(); + let bob = bob.clone(); + + test_harness(state, |mut virtual_overseer| async move { + let overseer = &mut virtual_overseer; + overseer_signal_active_leaves(overseer, hash).await; + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(1)).unwrap(); + } + ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Authorities(tx), + )) => { + assert_eq!(relay_parent, hash); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + mut validator_addrs, + peer_set, + }) => { + let mut expected = get_other_authorities_addrs_map().await; + expected.remove(&alice); + expected.remove(&bob); + let mut expected: Vec> = expected.into_iter().map(|(_,v)| v).collect(); + validator_addrs.sort(); + expected.sort(); + assert_eq!(validator_addrs, expected); + assert_eq!(peer_set, PeerSet::Validation); + } + ); + + test_neighbors(overseer).await; + + virtual_overseer + }) + }; assert_eq!(state.last_session_index, Some(1)); assert!(state.last_failure.is_some()); state.last_failure = state.last_failure.and_then(|i| i.checked_sub(BACKOFF_DURATION)); + // One error less: + state.authority_discovery.addrs.insert(alice, alice_addr.unwrap()); let hash = Hash::repeat_byte(0xBB); let state = test_harness(state, |mut virtual_overseer| async move { @@ -336,20 +435,23 @@ fn issues_a_connection_request_when_last_request_was_mostly_unresolved() { RuntimeApiRequest::Authorities(tx), )) => { assert_eq!(relay_parent, hash); - tx.send(Ok(authorities())).unwrap(); + tx.send(Ok(AUTHORITIES.clone())).unwrap(); } ); assert_matches!( overseer_recv(overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators { - validator_ids, + AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToResolvedValidators { + mut validator_addrs, peer_set, - failed, }) => { - assert_eq!(validator_ids, other_authorities()); + let mut expected = get_other_authorities_addrs_map().await; + expected.remove(&bob); + let mut expected: Vec> = expected.into_iter().map(|(_,v)| v).collect(); + expected.sort(); + validator_addrs.sort(); + assert_eq!(validator_addrs, expected); assert_eq!(peer_set, PeerSet::Validation); - failed.send(1).unwrap(); } ); diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 1ed915f1879d..e0fb2d2849d1 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -294,6 +294,8 @@ pub mod v1 { UncheckedSignedFullStatement, }; + use crate::WrongVariant; + /// Network messages used by the bitfield distribution subsystem. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum BitfieldDistributionMessage { @@ -386,6 +388,10 @@ pub mod v1 { Approvals(Vec), } + /// Dummy network message type, so we will receive connect/disconnect events. + #[derive(Debug, Clone, PartialEq, Eq)] + pub enum GossipSuppportNetworkMessage {} + /// Network messages used by the collator protocol subsystem #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum CollatorProtocolMessage { @@ -420,6 +426,20 @@ pub mod v1 { impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); impl_try_from!(ValidationProtocol, ApprovalDistribution, ApprovalDistributionMessage); + impl TryFrom for GossipSuppportNetworkMessage { + type Error = WrongVariant; + fn try_from(_: ValidationProtocol) -> Result { + Err(WrongVariant) + } + } + + impl<'a> TryFrom<&'a ValidationProtocol> for &'a GossipSuppportNetworkMessage { + type Error = WrongVariant; + fn try_from(_: &'a ValidationProtocol) -> Result { + Err(WrongVariant) + } + } + /// All network messages on the collation peer-set. #[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] pub enum CollationProtocol { diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 2fb9ffd359b2..f8e935a7eda8 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -403,7 +403,7 @@ pub struct Overseer { #[subsystem(no_dispatch, ApprovalVotingMessage)] approval_voting: ApprovalVoting, - #[subsystem(no_dispatch, GossipSupportMessage)] + #[subsystem(GossipSupportMessage)] gossip_support: GossipSupport, #[subsystem(no_dispatch, DisputeCoordinatorMessage)] diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 66156fd18298..1751812babdc 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -147,7 +147,7 @@ pub fn create_default_subsystems<'a, Spawner, RuntimeClient>( CollatorProtocolSubsystem, ApprovalDistributionSubsystem, ApprovalVotingSubsystem, - GossipSupportSubsystem, + GossipSupportSubsystem, DisputeCoordinatorSubsystem, DisputeParticipationSubsystem, DisputeDistributionSubsystem, @@ -236,7 +236,10 @@ where Box::new(network_service.clone()), Metrics::register(registry)?, ), - gossip_support: GossipSupportSubsystem::new(keystore.clone()), + gossip_support: GossipSupportSubsystem::new( + keystore.clone(), + authority_discovery_service.clone(), + ), dispute_coordinator: DisputeCoordinatorSubsystem::new( parachains_db.clone(), dispute_coordinator_config, diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 691289614d14..3822591c0fa9 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -23,6 +23,7 @@ //! Subsystems' APIs are defined separately from their implementation, leading to easier mocking. use futures::channel::oneshot; +use sc_network::Multiaddr; use thiserror::Error; pub use sc_network::IfDisconnected; @@ -345,6 +346,14 @@ pub enum NetworkBridgeMessage { /// authority discovery has failed to resolve. failed: oneshot::Sender, }, + /// Alternative to `ConnectToValidators` in case you already know the `Multiaddrs` you want to be + /// connected to. + ConnectToResolvedValidators { + /// Each entry corresponds to the addresses of an already resolved validator. + validator_addrs: Vec>, + /// The peer set we want the connection on. + peer_set: PeerSet, + }, /// Inform the distribution subsystems about the new /// gossip network topology formed. NewGossipTopology { @@ -365,6 +374,7 @@ impl NetworkBridgeMessage { Self::SendValidationMessages(_) => None, Self::SendCollationMessages(_) => None, Self::ConnectToValidators { .. } => None, + Self::ConnectToResolvedValidators { .. } => None, Self::SendRequests { .. } => None, Self::NewGossipTopology { .. } => None, } @@ -850,5 +860,9 @@ pub enum ApprovalDistributionMessage { } /// Message to the Gossip Support subsystem. -#[derive(Debug)] -pub enum GossipSupportMessage {} +#[derive(Debug, derive_more::From)] +pub enum GossipSupportMessage { + /// Dummy constructor, so we can receive networking events. + #[from] + NetworkBridgeUpdateV1(NetworkBridgeEvent), +}