Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metabased-sequencer/interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ serde = "1.0.210"
hyper-util = "0.1.8"
alloy-primitives = "0.8.7"
url = { version = "2", features = ["serde"] }
prometheus-client = "0.22"
87 changes: 87 additions & 0 deletions metabased-sequencer/interceptor/src/application/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
pub use prometheus::PrometheusMetrics;

use std::fmt::Write;

pub fn metrics(metrics: &impl Metrics) -> String {
let mut response = String::new();

metrics
.encode(&mut response)
.expect("Formatting to string should be infallible");

response
}

pub trait Metrics {
fn inc_send_raw_transaction(&self);
fn encode(&self, writer: &mut impl Write) -> std::fmt::Result;
}

mod noop {
use super::*;

impl Metrics for () {
fn inc_send_raw_transaction(&self) {}

fn encode(&self, _writer: &mut impl Write) -> std::fmt::Result {
Ok(())
}
}
}

mod prometheus {
use super::*;
use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::registry::Registry;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct Labels {
rpc_method: &'static str,
}

#[derive(Debug)]
pub struct PrometheusMetrics {
registry: Registry,
rpc_calls: Family<Labels, Counter>,
}

impl Default for PrometheusMetrics {
fn default() -> Self {
Self::new()
}
}

impl PrometheusMetrics {
pub fn new() -> Self {
let mut registry = <Registry>::default();
let rpc_calls = Family::<Labels, Counter>::default();
registry.register(
"rpc_calls",
"Number of RPC method calls received",
rpc_calls.clone(),
);

Self {
registry,
rpc_calls,
}
}
}

impl Metrics for PrometheusMetrics {
fn inc_send_raw_transaction(&self) {
self.rpc_calls
.get_or_create(&Labels {
rpc_method: "eth_sendRawTransaction",
})
.inc();
}

fn encode(&self, writer: &mut impl Write) -> std::fmt::Result {
encode(writer, &self.registry)
}
}
}
2 changes: 2 additions & 0 deletions metabased-sequencer/interceptor/src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod metrics;
mod send_raw_transaction;
#[cfg(test)]
mod tests;

pub use metrics::{metrics, Metrics, PrometheusMetrics};
pub use send_raw_transaction::send_raw_transaction;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::application::Metrics;
use crate::domain::primitives::Bytes;
use crate::domain::MetabasedSequencerChainService;
use crate::presentation::json_rpc_errors::Error;
Expand All @@ -10,11 +11,17 @@ use alloy::primitives::TxHash;
use alloy::primitives::U256;

/// Sends serialized and signed transaction `tx` using `chain`.
pub async fn send_raw_transaction<Chain>(encoded: Bytes, chain: &Chain) -> Result<TxHash, Error>
pub async fn send_raw_transaction<Chain, M: Metrics>(
encoded: Bytes,
chain: &Chain,
metrics: &M,
) -> Result<TxHash, Error>
where
Chain: MetabasedSequencerChainService,
Error: From<<Chain as MetabasedSequencerChainService>::Error>,
{
metrics.inc_send_raw_transaction();

// TODO remove or move up to function comment
// 1. Decoding
// 2. Validation
Expand Down
2 changes: 1 addition & 1 deletion metabased-sequencer/interceptor/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn test_send_raw_transaction_writes_tx_to_metabased_chain() {
let transactions = Arc::new(RwLock::new(Vec::new()));
let chain = InMemoryMetabasedSequencerChain::new(transactions.clone());

super::send_raw_transaction(encoded_tx.clone(), &chain)
super::send_raw_transaction(encoded_tx.clone(), &chain, &())
.await
.unwrap();

Expand Down
23 changes: 20 additions & 3 deletions metabased-sequencer/interceptor/src/presentation/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::application;
use crate::application::Metrics;
use crate::domain::primitives::Bytes;
use crate::domain::MetabasedSequencerChainService;
use crate::presentation::json_rpc_errors::Error;
Expand Down Expand Up @@ -88,13 +89,14 @@ impl JsonRpcError<()> {
///
/// * Data: hex encoded string that contains signed and serialized transaction with an optional `0x`
/// prefix.
pub async fn send_raw_transaction<Chain>(
pub async fn send_raw_transaction<Chain, M>(
params: Params<'static>,
ctx: Arc<Services<Chain>>,
ctx: Arc<Services<Chain, M>>,
_ext: http::Extensions,
) -> Result<String, JsonRpcError<()>>
where
Chain: MetabasedSequencerChainService,
M: Metrics,
Error: From<<Chain as MetabasedSequencerChainService>::Error>,
{
let mut json: serde_json::Value = serde_json::from_str(params.as_str().unwrap())?;
Expand All @@ -107,8 +109,23 @@ where
let bytes = hex::decode(str)?;
let bytes = Bytes::from(bytes);
let chain = ctx.chain_service();
let metrics = ctx.metrics_service();

Ok(application::send_raw_transaction(bytes, chain)
Ok(application::send_raw_transaction(bytes, chain, metrics)
.await?
.encode_hex_with_prefix())
}

/// The JSON-RPC endpoint for Prometheus metrics scraper to collect data from.
pub fn metrics<Chain, M>(
_params: Params,
ctx: &Services<Chain, M>,
_ext: &http::Extensions,
) -> String
where
Chain: MetabasedSequencerChainService,
M: Metrics,
Error: From<<Chain as MetabasedSequencerChainService>::Error>,
{
application::metrics(ctx.metrics_service())
}
19 changes: 15 additions & 4 deletions metabased-sequencer/interceptor/src/presentation/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::application::{Metrics, PrometheusMetrics};
use crate::domain::primitives::Address;
use crate::domain::MetabasedSequencerChainService;
use crate::infrastructure::SolMetabasedSequencerChainService;
Expand All @@ -22,30 +23,40 @@ pub async fn run(
let addr = server.local_addr()?;
let rpc = ProviderBuilder::new().on_http(chain_rpc_address);
let chain = SolMetabasedSequencerChainService::new(chain_contract_address, rpc);
let services = Services { chain };
let metrics = PrometheusMetrics::new();
let services = Services { chain, metrics };
let module = create_eth_module(services)?;
let handle = server.start(module);

Ok((addr, handle))
}

fn create_eth_module<Chain>(services: Services<Chain>) -> anyhow::Result<RpcModule<Services<Chain>>>
fn create_eth_module<Chain, M>(
services: Services<Chain, M>,
) -> anyhow::Result<RpcModule<Services<Chain, M>>>
where
Chain: MetabasedSequencerChainService + Send + Sync + 'static,
M: Metrics + Send + Sync + 'static,
Error: From<<Chain as MetabasedSequencerChainService>::Error>,
{
let mut module = RpcModule::new(services);
module.register_async_method("eth_sendRawTransaction", jsonrpc::send_raw_transaction)?;
module.register_method("metrics", jsonrpc::metrics)?;
Ok(module)
}

#[derive(Debug)]
pub struct Services<Chain> {
pub struct Services<Chain, M> {
chain: Chain,
metrics: M,
}

impl<Chain: MetabasedSequencerChainService> Services<Chain> {
impl<Chain: MetabasedSequencerChainService, M: Metrics> Services<Chain, M> {
pub fn chain_service(&self) -> &Chain {
&self.chain
}

pub fn metrics_service(&self) -> &M {
&self.metrics
}
}
Loading