Skip to content

Commit

Permalink
Auto exchange transactions relay metrics (paritytech#237)
Browse files Browse the repository at this point in the history
* auto exchange tx relay dashboard

* cargo fmt --all

* single metrics startup fn
  • Loading branch information
svyatonik authored and serban300 committed Apr 8, 2024
1 parent 67eeeac commit cbd2ee2
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 50 deletions.
3 changes: 3 additions & 0 deletions bridges/relays/ethereum/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,6 @@ subcommands:
- sub-port: *sub-port
- sub-signer: *sub-signer
- sub-signer-password: *sub-signer-password
- no-prometheus: *no-prometheus
- prometheus-host: *prometheus-host
- prometheus-port: *prometheus-port
5 changes: 5 additions & 0 deletions bridges/relays/ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::exchange::{
TransactionProofPipeline,
};
use crate::exchange_loop::{run as run_loop, InMemoryStorage};
use crate::metrics::MetricsParams;
use crate::rpc::{EthereumRpc, SubstrateRpc};
use crate::rpc_errors::RpcError;
use crate::substrate_client::{
Expand Down Expand Up @@ -64,6 +65,8 @@ pub struct EthereumExchangeParams {
pub sub_sign: SubstrateSigningParams,
/// Relay working mode.
pub mode: ExchangeRelayMode,
/// Metrics parameters.
pub metrics_params: Option<MetricsParams>,
}

/// Ethereum to Substrate exchange pipeline.
Expand Down Expand Up @@ -257,6 +260,7 @@ impl Default for EthereumExchangeParams {
sub: Default::default(),
sub_sign: Default::default(),
mode: ExchangeRelayMode::Auto(None),
metrics_params: Some(Default::default()),
}
}
}
Expand Down Expand Up @@ -335,6 +339,7 @@ fn run_auto_transactions_relay_loop(params: EthereumExchangeParams, eth_start_wi
client: sub_client,
sign_params: params.sub_sign,
},
params.metrics_params,
futures::future::pending(),
);

Expand Down
9 changes: 8 additions & 1 deletion bridges/relays/ethereum/src/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ pub trait SourceBlock {
/// Block hash type.
type Hash: Clone + Debug + Display;
/// Block number type.
type Number: Debug + Display + Clone + Copy + std::cmp::Ord + std::ops::Add<Output = Self::Number> + num_traits::One;
type Number: Debug
+ Display
+ Clone
+ Copy
+ Into<u64>
+ std::cmp::Ord
+ std::ops::Add<Output = Self::Number>
+ num_traits::One;
/// Block transaction.
type Transaction: SourceTransaction;

Expand Down
30 changes: 29 additions & 1 deletion bridges/relays/ethereum/src/exchange_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::exchange::{
relay_block_transactions, BlockNumberOf, RelayedBlockTransactions, SourceClient, TargetClient,
TransactionProofPipeline,
};
use crate::exchange_loop_metrics::ExchangeLoopMetrics;
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
use crate::utils::retry_backoff;

use backoff::backoff::Backoff;
Expand Down Expand Up @@ -83,6 +85,7 @@ pub fn run<P: TransactionProofPipeline>(
mut storage: impl TransactionProofsRelayStorage<BlockNumber = BlockNumberOf<P>>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
let mut local_pool = futures::executor::LocalPool::new();
Expand All @@ -92,6 +95,11 @@ pub fn run<P: TransactionProofPipeline>(
let mut state = storage.state();
let mut current_finalized_block = None;

let mut metrics_global = GlobalMetrics::new();
let mut metrics_exch = ExchangeLoopMetrics::new();
let metrics_enabled = metrics_params.is_some();
metrics_start(metrics_params, &metrics_global, &metrics_exch);

let exit_signal = exit_signal.fuse();

futures::pin_mut!(exit_signal);
Expand All @@ -103,9 +111,14 @@ pub fn run<P: TransactionProofPipeline>(
&target_client,
&mut state,
&mut current_finalized_block,
if metrics_enabled { Some(&mut metrics_exch) } else { None },
)
.await;

if metrics_enabled {
metrics_global.update();
}

match iteration_result {
Ok(_) => {
retry_backoff.reset();
Expand Down Expand Up @@ -135,6 +148,7 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
target_client: &impl TargetClient<P>,
state: &mut TransactionProofsRelayState<BlockNumberOf<P>>,
current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>,
mut exchange_loop_metrics: Option<&mut ExchangeLoopMetrics>,
) -> Result<(), ()> {
let best_finalized_header_id = match target_client.best_finalized_header_id().await {
Ok(best_finalized_header_id) => {
Expand Down Expand Up @@ -181,6 +195,14 @@ async fn run_loop_iteration<P: TransactionProofPipeline>(
state.best_processed_header_number = state.best_processed_header_number + One::one();
storage.set_state(state);

if let Some(exchange_loop_metrics) = exchange_loop_metrics.as_mut() {
exchange_loop_metrics.update::<P>(
state.best_processed_header_number,
best_finalized_header_id.0,
relayed_transactions,
);
}

// we have just updated state => proceed to next block retrieval
}
Err(relayed_transactions) => {
Expand Down Expand Up @@ -262,6 +284,12 @@ mod tests {
}
}));

run(storage, source, target, exit_receiver.into_future().map(|(_, _)| ()));
run(
storage,
source,
target,
None,
exit_receiver.into_future().map(|(_, _)| ()),
);
}
}
84 changes: 84 additions & 0 deletions bridges/relays/ethereum/src/exchange_loop_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofPipeline};
use crate::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64};

/// Exchange transactions relay metrics.
pub struct ExchangeLoopMetrics {
/// Best finalized block numbers - "processed" and "known".
best_block_numbers: GaugeVec<U64>,
/// Number of processed blocks ("total").
processed_blocks: Counter<U64>,
/// Number of processed transactions ("total", "relayed" and "failed").
processed_transactions: CounterVec<U64>,
}

impl Metrics for ExchangeLoopMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.best_block_numbers.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_blocks.clone(), registry).map_err(|e| e.to_string())?;
register(self.processed_transactions.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}

impl ExchangeLoopMetrics {
/// Creates sync loop metrics.
pub fn new() -> Self {
ExchangeLoopMetrics {
best_block_numbers: GaugeVec::new(
Opts::new("best_block_numbers", "Best finalized block numbers"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
processed_blocks: Counter::new("processed_blocks", "Total number of processed blocks")
.expect("metric is static and thus valid; qed"),
processed_transactions: CounterVec::new(
Opts::new("processed_transactions", "Total number of processed transactions"),
&["type"],
)
.expect("metric is static and thus valid; qed"),
}
}

/// Update metrics when single block is relayed.
pub fn update<P: TransactionProofPipeline>(
&mut self,
best_processed_block_number: BlockNumberOf<P>,
best_known_block_number: BlockNumberOf<P>,
relayed_transactions: RelayedBlockTransactions,
) {
self.best_block_numbers
.with_label_values(&["processed"])
.set(best_processed_block_number.into());
self.best_block_numbers
.with_label_values(&["known"])
.set(best_known_block_number.into());

self.processed_blocks.inc();

self.processed_transactions
.with_label_values(&["total"])
.inc_by(relayed_transactions.processed as _);
self.processed_transactions
.with_label_values(&["relayed"])
.inc_by(relayed_transactions.relayed as _);
self.processed_transactions
.with_label_values(&["failed"])
.inc_by(relayed_transactions.failed as _);
}
}
2 changes: 2 additions & 0 deletions bridges/relays/ethereum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod ethereum_sync_loop;
mod ethereum_types;
mod exchange;
mod exchange_loop;
mod exchange_loop_metrics;
mod headers;
mod metrics;
mod rpc;
Expand Down Expand Up @@ -260,6 +261,7 @@ fn ethereum_exchange_params(matches: &clap::ArgMatches) -> Result<ethereum_excha
params.eth = ethereum_connection_params(matches)?;
params.sub = substrate_connection_params(matches)?;
params.sub_sign = substrate_signing_params(matches)?;
params.metrics_params = metrics_params(matches)?;

params.mode = match matches.value_of("eth-tx-hash") {
Some(eth_tx_hash) => ethereum_exchange::ExchangeRelayMode::Single(
Expand Down
62 changes: 45 additions & 17 deletions bridges/relays/ethereum/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

pub use substrate_prometheus_endpoint::{register, Gauge, GaugeVec, Opts, Registry, F64, U64};
pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64};

use std::net::SocketAddr;
use substrate_prometheus_endpoint::init_prometheus;
Expand All @@ -29,6 +29,12 @@ pub struct MetricsParams {
pub port: u16,
}

/// Metrics API.
pub trait Metrics {
/// Register metrics in the registry.
fn register(&self, registry: &Registry) -> Result<(), String>;
}

/// Global Prometheus metrics.
#[derive(Debug)]
pub struct GlobalMetrics {
Expand All @@ -39,19 +45,40 @@ pub struct GlobalMetrics {
}

/// Start Prometheus endpoint with given metrics registry.
pub async fn start(params: MetricsParams, registry: Registry) -> Result<(), String> {
init_prometheus(
SocketAddr::new(
pub fn start(params: Option<MetricsParams>, global_metrics: &GlobalMetrics, extra_metrics: &impl Metrics) {
let params = match params {
Some(params) => params,
None => return,
};

let do_start = move || {
let prometheus_socket_addr = SocketAddr::new(
params
.host
.parse()
.map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?,
params.port,
),
registry,
)
.await
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
);
let metrics_registry = Registry::new();
global_metrics.register(&metrics_registry)?;
extra_metrics.register(&metrics_registry)?;
async_std::task::spawn(async move {
init_prometheus(prometheus_socket_addr, metrics_registry)
.await
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
});

Ok(())
};

let result: Result<(), String> = do_start();
if let Err(err) = result {
log::warn!(
target: "bridge",
"Failed to expose metrics: {}",
err,
);
}
}

impl Default for MetricsParams {
Expand All @@ -63,6 +90,15 @@ impl Default for MetricsParams {
}
}

impl Metrics for GlobalMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}

impl GlobalMetrics {
/// Creates global metrics.
pub fn new() -> Self {
Expand All @@ -80,14 +116,6 @@ impl GlobalMetrics {
}
}

/// Registers global metrics in the metrics registry.
pub fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}

/// Update metrics.
pub fn update(&mut self) {
// update system-wide metrics
Expand Down
25 changes: 2 additions & 23 deletions bridges/relays/ethereum/src/sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.

use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams, Registry as MetricsRegistry};
use crate::metrics::{start as metrics_start, GlobalMetrics, MetricsParams};
use crate::sync::HeadersSyncParams;
use crate::sync_loop_metrics::SyncLoopMetrics;
use crate::sync_types::{
Expand Down Expand Up @@ -124,15 +124,7 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
let mut metrics_global = GlobalMetrics::new();
let mut metrics_sync = SyncLoopMetrics::new();
let metrics_enabled = metrics_params.is_some();
if let Some(metrics_params) = metrics_params {
if let Err(err) = expose_metrics(metrics_params, &metrics_global, &metrics_sync).await {
log::warn!(
target: "bridge",
"Failed to expose metrics: {}",
err,
);
}
}
metrics_start(metrics_params, &metrics_global, &metrics_sync);

let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = false;
Expand Down Expand Up @@ -546,19 +538,6 @@ pub fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
});
}

/// Expose sync loop metrics.
async fn expose_metrics(
metrics_params: MetricsParams,
metrics_global: &GlobalMetrics,
metrics_sync: &SyncLoopMetrics,
) -> Result<(), String> {
let metrics_registry = MetricsRegistry::new();
metrics_global.register(&metrics_registry)?;
metrics_sync.register(&metrics_registry)?;
async_std::task::spawn(metrics_start(metrics_params, metrics_registry));
Ok(())
}

/// Stream that emits item every `timeout_ms` milliseconds.
fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
Expand Down
Loading

0 comments on commit cbd2ee2

Please sign in to comment.