Skip to content

Commit

Permalink
Merge pull request #905 from subspace/dsn-metrics
Browse files Browse the repository at this point in the history
DSN. Add libp2p metrics.
  • Loading branch information
shamil-gadelshin authored Nov 17, 2022
2 parents c6ddade + 6fd4a63 commit 10adfd4
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 7 deletions.
329 changes: 328 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/sc-piece-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub trait PieceCache: Clone {
fn get_piece_by_key(&self, key: Vec<u8>) -> Result<Option<Piece>, Box<dyn Error>>;
}

// TODO: Refactor AuxPieceCache once we remove RPC endpoint.
/// Cache of pieces in aux storage
#[derive(Debug)]
pub struct AuxPieceCache<AS> {
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
]

[dependencies]
actix-web = "4.2.1"
anyhow = "1.0.61"
async-trait = "0.1.57"
bytes = "1.2.1"
Expand All @@ -31,6 +32,7 @@ parity-db = "0.3.17"
parity-scale-codec = "3.1.5"
parking_lot = "0.12.1"
pin-project = "1.0.11"
prometheus-client = "0.18.1"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
Expand All @@ -48,6 +50,7 @@ features = [
"gossipsub",
"identify",
"kad",
"metrics",
"mplex",
"noise",
"ping",
Expand Down
27 changes: 25 additions & 2 deletions crates/subspace-networking/examples/requests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use futures::channel::oneshot;
use libp2p::metrics::Metrics;
use libp2p::multiaddr::Protocol;
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{
BootstrappedNetworkingParameters, Config, GenericRequest, GenericRequestHandler,
start_prometheus_metrics_server, BootstrappedNetworkingParameters, Config, GenericRequest,
GenericRequestHandler,
};
use tracing::error;

#[derive(Encode, Decode)]
struct ExampleRequest;
Expand All @@ -24,17 +28,36 @@ struct ExampleResponse;
async fn main() {
tracing_subscriber::fmt::init();

let mut metric_registry = Registry::default();
let metrics = Metrics::new(&mut metric_registry);

let config_1 = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_globals_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::create(|&ExampleRequest| {
println!("Request handler for request");
Some(ExampleResponse)
})],
metrics: Some(metrics),
..Config::with_generated_keypair()
};
let (node_1, mut node_runner_1) = subspace_networking::create(config_1).await.unwrap();

// Init prometheus
let prometheus_metrics_server_address = "127.0.0.1:63000".parse().unwrap();
tokio::task::spawn(async move {
if let Err(err) =
start_prometheus_metrics_server(prometheus_metrics_server_address, metric_registry)
.await
{
error!(
?prometheus_metrics_server_address,
?err,
"Prometheus metrics server failed to start."
)
}
});

println!("Node 1 ID is {}", node_1.id());

let (node_1_address_sender, node_1_address_receiver) = oneshot::channel();
Expand Down Expand Up @@ -88,5 +111,5 @@ async fn main() {
println!("Response: {:?}", resp);
});

tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(50)).await;
}
11 changes: 8 additions & 3 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use libp2p::gossipsub::{
use libp2p::identify::Config as IdentifyConfig;
use libp2p::identity::Keypair;
use libp2p::kad::{KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaStoreInserts};
use libp2p::metrics::Metrics;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig;
Expand All @@ -34,7 +35,7 @@ use std::time::Duration;
use std::{fmt, io};
use subspace_core_primitives::{crypto, PIECE_SIZE};
use thiserror::Error;
use tracing::info;
use tracing::{error, info};

const KADEMLIA_PROTOCOL: &[u8] = b"/subspace/kad/0.1.0";
const GOSSIPSUB_PROTOCOL_PREFIX: &str = "subspace/gossipsub";
Expand Down Expand Up @@ -82,7 +83,6 @@ impl RelayMode {
}

/// [`Node`] configuration.
#[derive(Clone)]
pub struct Config<RecordStore = CustomRecordStore> {
/// Identity keypair of a node used for authenticated connections.
pub keypair: identity::Keypair,
Expand Down Expand Up @@ -119,6 +119,8 @@ pub struct Config<RecordStore = CustomRecordStore> {
pub max_established_incoming_connections: u32,
/// Outgoing swarm connection limit.
pub max_established_outgoing_connections: u32,
/// Optional external prometheus metrics. None will disable metrics gathering.
pub metrics: Option<Metrics>,
}

impl fmt::Debug for Config {
Expand Down Expand Up @@ -174,8 +176,8 @@ impl Config {
.expect("Default config for gossipsub is always correct; qed");

let keypair = identity::Keypair::Ed25519(keypair);

let identify = IdentifyConfig::new("ipfs/0.1.0".to_string(), keypair.public());

Self {
keypair,
listen_on: vec![],
Expand All @@ -194,6 +196,7 @@ impl Config {
reserved_peers: Vec::new(),
max_established_incoming_connections: SWARM_MAX_ESTABLISHED_INCOMING_CONNECTIONS,
max_established_outgoing_connections: SWARM_MAX_ESTABLISHED_OUTGOING_CONNECTIONS,
metrics: None,
}
}
}
Expand Down Expand Up @@ -246,6 +249,7 @@ where
reserved_peers,
max_established_incoming_connections,
max_established_outgoing_connections,
metrics,
} = config;
let local_peer_id = peer_id(&keypair);

Expand Down Expand Up @@ -305,6 +309,7 @@ where
reserved_peers: convert_multiaddresses(reserved_peers).into_iter().collect(),
max_established_incoming_connections,
max_established_outgoing_connections,
metrics,
});

Ok((node, node_runner))
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ pub use request_handlers::pieces_by_range::{
PiecesByRangeRequest, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot,
};
pub use utils::multihash::ToMultihash;
pub use utils::prometheus::start_prometheus_metrics_server;
36 changes: 35 additions & 1 deletion crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use libp2p::kad::{
GetProvidersOk, GetRecordError, GetRecordOk, InboundRequest, KademliaEvent, PutRecordOk,
QueryId, QueryResult, Quorum, Record,
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{futures, Multiaddr, PeerId, Swarm};
use nohash_hasher::IntMap;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Weak;
use std::time::Duration;
Expand Down Expand Up @@ -77,6 +79,8 @@ where
max_established_incoming_connections: u32,
/// Outgoing swarm connection limit.
max_established_outgoing_connections: u32,
/// Prometheus metrics.
metrics: Option<Metrics>,
}

// Helper struct for NodeRunner configuration (clippy requirement).
Expand All @@ -93,6 +97,7 @@ where
pub reserved_peers: HashMap<PeerId, Multiaddr>,
pub max_established_incoming_connections: u32,
pub max_established_outgoing_connections: u32,
pub metrics: Option<Metrics>,
}

impl<RecordStore> NodeRunner<RecordStore>
Expand All @@ -110,6 +115,7 @@ where
reserved_peers,
max_established_incoming_connections,
max_established_outgoing_connections,
metrics,
}: NodeRunnerConfig<RecordStore>,
) -> Self {
Self {
Expand All @@ -129,6 +135,7 @@ where
reserved_peers,
max_established_incoming_connections,
max_established_outgoing_connections,
metrics,
}
}

Expand All @@ -145,6 +152,7 @@ where
},
swarm_event = self.swarm.next() => {
if let Some(swarm_event) = swarm_event {
self.register_event_metrics(&swarm_event);
self.handle_swarm_event(swarm_event).await;
} else {
break;
Expand Down Expand Up @@ -255,7 +263,7 @@ where
.get_closest_peers(random_peer_id);
}

async fn handle_swarm_event<E: std::fmt::Debug>(&mut self, swarm_event: SwarmEvent<Event, E>) {
async fn handle_swarm_event<E: Debug>(&mut self, swarm_event: SwarmEvent<Event, E>) {
match swarm_event {
SwarmEvent::Behaviour(Event::Identify(event)) => {
self.handle_identify_event(event).await;
Expand Down Expand Up @@ -862,4 +870,30 @@ where
}
}
}

fn register_event_metrics<E: Debug>(&mut self, swarm_event: &SwarmEvent<Event, E>) {
if let Some(ref mut metrics) = self.metrics {
match swarm_event {
SwarmEvent::Behaviour(Event::Ping(ping_event)) => {
metrics.record(ping_event);
}
SwarmEvent::Behaviour(Event::Identify(identify_event)) => {
metrics.record(identify_event);
}
SwarmEvent::Behaviour(Event::Kademlia(kademlia_event)) => {
metrics.record(kademlia_event);
}
SwarmEvent::Behaviour(Event::Gossipsub(gossipsub_event)) => {
metrics.record(gossipsub_event);
}
// TODO: implement in the upstream repository
// SwarmEvent::Behaviour(Event::RequestResponse(request_response_event)) => {
// self.metrics.record(request_response_event);
// }
swarm_event => {
metrics.record(swarm_event);
}
}
}
}
}
1 change: 1 addition & 0 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod multihash;
pub(crate) mod prometheus;
#[cfg(test)]
mod tests;

Expand Down
53 changes: 53 additions & 0 deletions crates/subspace-networking/src/utils/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use actix_web::http::StatusCode;
use actix_web::web::Data;
use actix_web::{get, App, HttpResponse, HttpServer};
use parking_lot::Mutex;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use tracing::{error, info};

type SharedRegistry = Arc<Mutex<Registry>>;

#[get("/metrics")]
async fn metrics(registry: Data<SharedRegistry>) -> Result<HttpResponse, Box<dyn Error>> {
let mut encoded: Vec<u8> = Vec::new();
encode(&mut encoded, &registry.lock())?;

let resp = HttpResponse::build(StatusCode::OK).body(encoded);

Ok(resp)
}

/// Start prometheus metrics server on the provided address.
pub async fn start_prometheus_metrics_server(
address: SocketAddr,
registry: Registry,
) -> std::io::Result<()> {
let shared_registry = Arc::new(Mutex::new(registry));
let data = Data::new(shared_registry);

info!("Starting metrics server on {} ...", address);

let server = HttpServer::new(move || App::new().app_data(data.clone()).service(metrics))
.bind(address)?
.run();

// Actix-web will reuse existing tokio runtime.
let runtime = tokio::runtime::Runtime::new()?;

// We need a dedicated thread because actix-web App is !Send and won't work with tokio.
thread::spawn(move || {
if let Err(err) = runtime.block_on(server) {
error!(
?err,
"block_on returns an error for prometheus metrics server"
)
}
});

Ok(())
}
1 change: 1 addition & 0 deletions crates/subspace-service/src/dsn/piece_record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub type SegmentIndexGetter = Arc<dyn Fn() -> u64 + Send + Sync + 'static>;

pub struct AuxRecordStorage<AS> {
piece_cache: AuxPieceCache<AS>,
// TODO: Remove it when we delete RPC-endpoint for farmers.
last_segment_index_getter: SegmentIndexGetter,
}

Expand Down

0 comments on commit 10adfd4

Please sign in to comment.