Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix peer count metrics #5404

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ pub use libp2p::identity::Keypair;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod peerdb;

use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
use strum::IntoEnumIterator;

pub mod config;
mod network_behaviour;

Expand Down Expand Up @@ -464,19 +468,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
"observed_address" => ?info.observed_addr,
"protocols" => ?info.protocols
);

// update the peer client kind metric if the peer is connected
if matches!(
peer_info.connection_status(),
PeerConnectionStatus::Connected { .. }
| PeerConnectionStatus::Disconnecting { .. }
) {
metrics::inc_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[peer_info.client().kind.as_ref()],
);
metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]);
}
}
} else {
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
Expand Down Expand Up @@ -812,11 +803,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);

let connected_peers = self.network_globals.connected_peers() as i64;

// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);

true
}
Expand Down Expand Up @@ -1267,6 +1255,70 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
);
}
}

// Update peer count related metrics.
fn update_peer_count_metrics(&self) {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();

for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;

*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;

let direction = match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => "inbound",
Some(ConnectionDirection::Outgoing) => "outbound",
None => "none",
};
// Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty.
// This situation occurs when the peer is initially registered in PeerDB, but the peer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for researching this Akihito! But since we get the Multiaddress as soon as the peer connects, can't we add it immediately? Don't worry doesn't need to be address in this PR, I think I plan to submit a subsequent PR cause we also know the direction as soon as the peer connects, so it doesn't make sense connection_direction() be None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since we get the Multiaddress as soon as the peer connects, can't we add it immediately?

Yes, I believe we can do that, but I'm not very familiar with PeerDB. However I've delved a little into the PeerDB implementation recently. During this exploration, I discovered that the Multiaddress obtained when a peer connects is stored in PeerInfo::seen_multiaddrs. The seen_multiaddrs is stored into PeerInfo here.

/// These are the multiaddrs we have physically seen and is what we use for banning/un-banning
/// peers.
seen_multiaddrs: HashSet<Multiaddr>,

Given this understanding, would it be appropriate to iterate over seen_multiaddrs in cases where listening_addresses() returns empty? This approach could potentially ensure that we always have an address, even immediately after a peer connects.
cc @AgeManning

// info has not yet been updated at `PeerManager::identify`.
let transport = peer_info
.listening_addresses()
.iter()
.find_map(|addr| {
addr.iter().find_map(|proto| match proto {
multiaddr::Protocol::QuicV1 => Some("quic"),
multiaddr::Protocol::Tcp(_) => Some("tcp"),
_ => None,
})
})
.unwrap_or("unknown");
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;
}

// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);

// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[client_kind.as_ref()],
*value as i64,
);
}

// PEERS_CONNECTED_MULTI
for direction in ["inbound", "outbound", "none"] {
for transport in ["quic", "tcp", "unknown"] {
metrics::set_gauge_vec(
&metrics::PEERS_CONNECTED_MULTI,
&[direction, transport],
*peers_connected_mutli
.get(&(direction, transport))
.unwrap_or(&0) as i64,
);
}
}
}
}

enum ConnectingType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::IpAddr;
use std::task::{Context, Poll};

use futures::StreamExt;
use libp2p::core::{multiaddr, ConnectedPoint};
use libp2p::core::ConnectedPoint;
use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
Expand Down Expand Up @@ -243,35 +243,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// increment prometheus metrics
// Update the prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr)
}
};

metrics::inc_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we need to revert back into iterating all the connected peers, as @AgeManning suggested let's have all the client metrics updates in the same place, i.e. remove the dec and inc of PEERS_CONNECTED and related wdyt @ackintosh? this way we only don't incur in similar situations as this PR is fixing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs Thanks for your comment, that makes sense. I will make an update.

self.update_peer_count_metrics();
}

// Count dialing peers in the limit if the peer dialed us.
Expand Down Expand Up @@ -309,7 +285,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
fn on_connection_closed(
&mut self,
peer_id: PeerId,
endpoint: &ConnectedPoint,
_endpoint: &ConnectedPoint,
remaining_established: usize,
) {
if remaining_established > 0 {
Expand Down Expand Up @@ -337,33 +313,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id);

let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
};
// Legacy standard metrics.
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);

self.update_peer_count_metrics();
}
}

Expand Down