Skip to content

feat(apollo_monitoring_endpoint): add getting mempool snapshot from the monitoring #5088

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/apollo_monitoring_endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
[dependencies]
apollo_config.workspace = true
apollo_infra_utils.workspace = true
apollo_mempool_types.workspace = true
apollo_sequencer_infra.workspace = true
apollo_sequencer_metrics.workspace = true
axum.workspace = true
Expand All @@ -28,9 +29,11 @@ tracing.workspace = true
validator.workspace = true

[dev-dependencies]
apollo_mempool_types = { workspace = true, features = ["testing"] }
metrics.workspace = true
num-traits.workspace = true
pretty_assertions.workspace = true
starknet_api = { workspace = true, features = ["testing"] }
thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
41 changes: 36 additions & 5 deletions crates/apollo_monitoring_endpoint/src/monitoring_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::net::SocketAddr;

use apollo_infra_utils::type_name::short_type_name;
use apollo_mempool_types::communication::SharedMempoolClient;
use apollo_mempool_types::mempool_types::MempoolSnapshot;
use apollo_sequencer_infra::component_definitions::ComponentStarter;
use apollo_sequencer_metrics::metrics::COLLECT_SEQUENCER_PROFILING_METRICS;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::{async_trait, Router, Server};
use axum::{async_trait, Json, Router, Server};
use hyper::Error;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use tracing::{info, instrument};
use tracing::{error, info, instrument};

use crate::config::MonitoringEndpointConfig;

Expand All @@ -22,15 +24,21 @@ pub(crate) const ALIVE: &str = "alive";
pub(crate) const READY: &str = "ready";
pub(crate) const VERSION: &str = "nodeVersion";
pub(crate) const METRICS: &str = "metrics";
pub(crate) const MEMPOOL_SNAPSHOT: &str = "mempoolSnapshot";

pub struct MonitoringEndpoint {
config: MonitoringEndpointConfig,
version: &'static str,
prometheus_handle: Option<PrometheusHandle>,
mempool_client: Option<SharedMempoolClient>,
}

impl MonitoringEndpoint {
pub fn new(config: MonitoringEndpointConfig, version: &'static str) -> Self {
pub fn new(
config: MonitoringEndpointConfig,
version: &'static str,
mempool_client: Option<SharedMempoolClient>,
) -> Self {
// TODO(Tsabary): consider error handling
let prometheus_handle = if config.collect_metrics {
// TODO(Lev): add tests that show the metrics are collected / not collected based on the
Expand All @@ -47,7 +55,7 @@ impl MonitoringEndpoint {
} else {
None
};
MonitoringEndpoint { config, version, prometheus_handle }
MonitoringEndpoint { config, version, prometheus_handle, mempool_client }
}

#[instrument(
Expand All @@ -70,6 +78,7 @@ impl MonitoringEndpoint {
fn app(&self) -> Router {
let version = self.version.to_string();
let prometheus_handle = self.prometheus_handle.clone();
let mempool_client = self.mempool_client.clone();

Router::new()
.route(
Expand All @@ -88,14 +97,19 @@ impl MonitoringEndpoint {
format!("/{MONITORING_PREFIX}/{METRICS}").as_str(),
get(move || metrics(prometheus_handle)),
)
.route(
format!("/{MONITORING_PREFIX}/{MEMPOOL_SNAPSHOT}").as_str(),
get(move || mempool_snapshot(mempool_client)),
)
}
}

pub fn create_monitoring_endpoint(
config: MonitoringEndpointConfig,
version: &'static str,
mempool_client: Option<SharedMempoolClient>,
) -> MonitoringEndpoint {
MonitoringEndpoint::new(config, version)
MonitoringEndpoint::new(config, version, mempool_client)
}

#[async_trait]
Expand All @@ -117,3 +131,20 @@ async fn metrics(prometheus_handle: Option<PrometheusHandle>) -> Response {
None => StatusCode::METHOD_NOT_ALLOWED.into_response(),
}
}

// Returns Mempool snapshot
#[instrument(level = "debug", skip(mempool_client))]
async fn mempool_snapshot(
mempool_client: Option<SharedMempoolClient>,
) -> Result<Json<MempoolSnapshot>, StatusCode> {
match mempool_client {
Some(client) => match client.get_mempool_snapshot().await {
Ok(snapshot) => Ok(snapshot.into()),
Err(err) => {
error!("Failed to get mempool snapshot: {:?}", err);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
},
None => Err(StatusCode::METHOD_NOT_ALLOWED),
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::net::IpAddr;
use std::sync::Arc;

use apollo_mempool_types::communication::{MempoolClientResult, MockMempoolClient};
use apollo_mempool_types::mempool_types::MempoolSnapshot;
use axum::http::StatusCode;
use axum::response::Response;
use axum::Router;
use hyper::body::to_bytes;
use hyper::Client;
use metrics::{counter, describe_counter};
use pretty_assertions::assert_eq;
use starknet_api::tx_hash;
use tokio::spawn;
use tokio::task::yield_now;
use tower::ServiceExt;
Expand All @@ -17,6 +21,7 @@ use crate::monitoring_endpoint::{
create_monitoring_endpoint,
MonitoringEndpoint,
ALIVE,
MEMPOOL_SNAPSHOT,
METRICS,
READY,
VERSION,
Expand All @@ -36,7 +41,7 @@ const CONFIG_WITHOUT_METRICS: MonitoringEndpointConfig = MonitoringEndpointConfi

fn setup_monitoring_endpoint(config: Option<MonitoringEndpointConfig>) -> MonitoringEndpoint {
let config = config.unwrap_or(CONFIG_WITHOUT_METRICS);
create_monitoring_endpoint(config, TEST_VERSION)
create_monitoring_endpoint(config, TEST_VERSION, None)
}

async fn request_app(app: Router, method: &str) -> Response {
Expand Down Expand Up @@ -108,3 +113,41 @@ async fn endpoint_as_server() {
let body = to_bytes(response.into_body()).await.unwrap();
assert_eq!(&body[..], TEST_VERSION.as_bytes());
}

fn setup_monitoring_endpoint_with_mempool_client() -> MonitoringEndpoint {
let mut mock_mempool_client = MockMempoolClient::new();
mock_mempool_client.expect_get_mempool_snapshot().returning(return_mempool_snapshot);
let shared_mock_mempool_client = Arc::new(mock_mempool_client);

create_monitoring_endpoint(
CONFIG_WITHOUT_METRICS,
TEST_VERSION,
Some(shared_mock_mempool_client),
)
}

fn return_mempool_snapshot() -> MempoolClientResult<MempoolSnapshot> {
let expected_chronological_hashes = (1..10).map(|i| tx_hash!(i)).collect::<Vec<_>>();
Ok(MempoolSnapshot { transactions: expected_chronological_hashes })
}

#[tokio::test]
async fn mempool_snapshot() {
let app = setup_monitoring_endpoint_with_mempool_client().app();

let response = request_app(app, MEMPOOL_SNAPSHOT).await;
assert_eq!(response.status(), StatusCode::OK);
let body_bytes = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body_string = String::from_utf8(body_bytes.to_vec()).unwrap();
let expected_prefix =
String::from(r#"{"transactions":["0x1","0x2","0x3","0x4","0x5","0x6","0x7","0x8","0x9"]}"#);

assert!(body_string.starts_with(&expected_prefix));
}

#[tokio::test]
async fn mempool_not_present() {
let app = setup_monitoring_endpoint(None).app();
let response = request_app(app, MEMPOOL_SNAPSHOT).await;
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
}
20 changes: 16 additions & 4 deletions crates/apollo_sequencer_node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,22 @@ pub async fn create_node_components(
};

let monitoring_endpoint = match config.components.monitoring_endpoint.execution_mode {
ActiveComponentExecutionMode::Enabled => Some(create_monitoring_endpoint(
config.monitoring_endpoint_config.clone(),
VERSION_FULL,
)),
ActiveComponentExecutionMode::Enabled => {
let mempool_client = if mempool.is_some() {
Some(
clients
.get_mempool_shared_client()
.expect("Mempool Client should be available"),
)
} else {
None
};
Some(create_monitoring_endpoint(
config.monitoring_endpoint_config.clone(),
VERSION_FULL,
mempool_client,
))
}
ActiveComponentExecutionMode::Disabled => None,
};

Expand Down
Loading