Skip to content

feat(argus): internal interfaces and shared memory model #2682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,159 changes: 1,293 additions & 866 deletions apps/argus/Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions apps/argus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ anyhow = "1.0.75"
axum = { version = "0.6.20", features = ["json", "ws", "macros"] }
axum-macros = { version = "0.3.8" }
base64 = { version = "0.21.0" }
bincode = "1.3.3"
byteorder = "1.5.0"
clap = { version = "4.4.6", features = ["derive", "cargo", "env"] }
ethabi = "18.0.0"
ethers = { version = "2.0.14", features = ["ws"] }
fortuna = { path = "../fortuna" }
futures = { version = "0.3.28" }
Expand All @@ -21,7 +18,6 @@ pythnet-sdk = { path = "../../pythnet/pythnet_sdk", features = ["strum"] }
rand = "0.8.5"
reqwest = { version = "0.11.22", features = ["json", "blocking"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_qs = { version = "0.12.0", features = ["axum"] }
serde_json = "1.0.107"
serde_with = { version = "3.4.0", features = ["hex", "base64"] }
serde_yaml = "0.9.25"
Expand All @@ -41,8 +37,11 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
thiserror = "1.0.61"
futures-locks = "0.7.1"
async-trait = "0.1.88"
tokio-stream = "0.1.17"
dashmap = "6.1.0"
pyth-sdk = "0.8.0"
humantime-serde = "1.1.1"


[dev-dependencies]
mockall = "0.13.1"
axum-test = "13.1.1"
14 changes: 13 additions & 1 deletion apps/argus/config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ chains:
geth_rpc_addr: https://replicator.pegasus.lightlink.io/rpc/v1
contract_addr: 0x8250f4aF4B972684F7b336503E2D6dFeDeB1487a


# Multiplier for the priority fee estimate, as a percentage (i.e., 100 = no change).
# Defaults to 100 if the field is omitted.
priority_fee_multiplier_pct: 100
Expand Down Expand Up @@ -33,3 +32,16 @@ keeper:
value: 0xabcd
# For production, you can store the private key in a file.
# file: keeper-key.txt

# Service polling intervals (e.g. 1m, 1min, 1m30s, 1min 30sec). See format here: https://docs.rs/humantime/latest/humantime/
# These control how frequently different services poll for updates
subscription_poll_interval: 1m # How often to check for new subscriptions
chain_price_poll_interval: 10s # How often to check chain prices
pyth_price_poll_interval: 10s # How often to check Pyth prices
controller_update_interval: 10s # How often to update the controller

# Backoff policy configuration for retrying failed operations
backoff_initial_interval: 1s # Initial wait time between retries
backoff_max_interval: 60s # Maximum wait time between retries
backoff_multiplier: 2.0 # Multiply wait time by this factor on each retry
backoff_max_elapsed_time: 300s # Maximum total time to keep retrying
4 changes: 4 additions & 0 deletions apps/argus/src/adapters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod contract;
pub mod ethereum;
pub mod hermes;
pub mod types;
70 changes: 70 additions & 0 deletions apps/argus/src/adapters/contract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use super::ethereum::PythPulse;
use super::types::*;
use crate::adapters::ethereum::SubscriptionParams;
use anyhow::Result;
use async_trait::async_trait;
use ethers::providers::Middleware;
use ethers::types::H256;
use pyth_sdk::Price;
use std::collections::HashMap;

#[async_trait]
pub trait GetChainPrices {
async fn get_price_unsafe(
&self,
subscription_id: SubscriptionId,
feed_id: &PriceId,
) -> Result<Option<Price>>;
}

#[async_trait]
impl<M: Middleware + 'static> GetChainPrices for PythPulse<M> {
async fn get_price_unsafe(
&self,
_subscription_id: SubscriptionId,
_feed_id: &PriceId,
) -> Result<Option<Price>> {
todo!()
}
}
#[async_trait]
pub trait UpdateChainPrices {
async fn update_price_feeds(
&self,
subscription_id: SubscriptionId,
price_ids: &[PriceId],
update_data: &[Vec<u8>],
) -> Result<H256>;
}
#[async_trait]
impl<M: Middleware + 'static> UpdateChainPrices for PythPulse<M> {
async fn update_price_feeds(
&self,
subscription_id: SubscriptionId,
price_ids: &[PriceId],
update_data: &[Vec<u8>],
) -> Result<H256> {
tracing::debug!(
subscription_id = subscription_id.to_string(),
price_ids_count = price_ids.len(),
update_data_count = update_data.len(),
"Updating price feeds on-chain via PythPulse"
);
todo!()
}
}
#[async_trait]
pub trait ReadChainSubscriptions {
async fn get_active_subscriptions(&self)
-> Result<HashMap<SubscriptionId, SubscriptionParams>>;
}

#[async_trait]
impl<M: Middleware + 'static> ReadChainSubscriptions for PythPulse<M> {
async fn get_active_subscriptions(
&self,
) -> Result<HashMap<SubscriptionId, SubscriptionParams>> {
tracing::debug!("Getting active subscriptions via PythPulse");
Ok(HashMap::new())
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use {
crate::{
api::ChainId,
chain::reader::{BlockNumber, BlockStatus},
config::EthereumConfig,
},
crate::config::EthereumConfig,
crate::state::ChainName,
anyhow::{Error, Result},
ethers::{
contract::abigen,
Expand Down Expand Up @@ -92,7 +89,7 @@ impl InstrumentedSignablePythContract {
pub async fn from_config(
chain_config: &EthereumConfig,
private_key: &str,
chain_id: ChainId,
chain_id: ChainName,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
Expand All @@ -114,7 +111,7 @@ impl PythContract {
impl InstrumentedPythContract {
pub fn from_config(
chain_config: &EthereumConfig,
chain_id: ChainId,
chain_id: ChainName,
metrics: Arc<RpcMetrics>,
) -> Result<Self> {
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
Expand Down Expand Up @@ -144,3 +141,30 @@ impl<M: Middleware + 'static> PythPulse<M> {
.as_u64())
}
}

// TODO: extract to a SDK

pub type BlockNumber = u64;

#[derive(
Copy, Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
)]
pub enum BlockStatus {
/// Latest block
#[default]
Latest,
/// Finalized block accepted as canonical
Finalized,
/// Safe head block
Safe,
}

impl From<BlockStatus> for EthersBlockNumber {
fn from(val: BlockStatus) -> Self {
match val {
BlockStatus::Latest => EthersBlockNumber::Latest,
BlockStatus::Finalized => EthersBlockNumber::Finalized,
BlockStatus::Safe => EthersBlockNumber::Safe,
}
}
}
21 changes: 21 additions & 0 deletions apps/argus/src/adapters/hermes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use super::types::*;
use anyhow::Result;
use async_trait::async_trait;

pub struct HermesClient;

#[async_trait]
pub trait ReadPythPrices {
async fn get_latest_prices(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
async fn subscribe_to_price_updates(&self, feed_ids: &[PriceId]) -> Result<()>; // TODO: return a stream
}
#[async_trait]
impl ReadPythPrices for HermesClient {
async fn get_latest_prices(&self, _feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>> {
todo!()
}

async fn subscribe_to_price_updates(&self, _feed_ids: &[PriceId]) -> Result<()> {
todo!()
}
}
5 changes: 5 additions & 0 deletions apps/argus/src/adapters/types.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i recommend moving some of these in to the files to reduce indirection if they are only used in one place.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use ethers::types::U256;
use pyth_sdk::PriceIdentifier;

pub type PriceId = PriceIdentifier;
pub type SubscriptionId = U256;
135 changes: 45 additions & 90 deletions apps/argus/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,106 +1,61 @@
//! API server for Prometheus metrics and health checks

use {
crate::chain::reader::BlockStatus,
axum::{
body::Body,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Router,
},
prometheus_client::{
encoding::EncodeLabelSet,
metrics::{counter::Counter, family::Family},
registry::Registry,
},
std::sync::Arc,
tokio::sync::RwLock,
anyhow::{anyhow, Result},
axum::{body::Body, routing::get, Router},
index::index,
live::live,
metrics::metrics,
prometheus_client::registry::Registry,
ready::ready,
std::{net::SocketAddr, sync::Arc},
tokio::sync::{watch, RwLock},
tower_http::cors::CorsLayer,
};
pub use {index::*, live::*, metrics::*, ready::*};

mod index;
mod live;
mod metrics;
mod ready;

pub type ChainId = String;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct RequestLabel {
pub value: String,
}

pub struct ApiMetrics {
pub http_requests: Family<RequestLabel, Counter>,
}

#[derive(Clone)]
pub struct ApiState {
pub metrics_registry: Arc<RwLock<Registry>>,

/// Prometheus metrics
pub metrics: Arc<ApiMetrics>,
}

impl ApiState {
pub async fn new(metrics_registry: Arc<RwLock<Registry>>) -> ApiState {
let metrics = ApiMetrics {
http_requests: Family::default(),
};

let http_requests = metrics.http_requests.clone();
metrics_registry.write().await.register(
"http_requests",
"Number of HTTP requests received",
http_requests,
);

ApiState {
metrics: Arc::new(metrics),
metrics_registry,
}
}
}

/// The state of the service for a single blockchain.
#[derive(Clone)]
pub struct BlockchainState {
/// The chain id for this blockchain, useful for logging
pub id: ChainId,
/// The BlockStatus of the block that is considered to be confirmed on the blockchain.
/// For eg., Finalized, Safe
pub confirmed_block_status: BlockStatus,
}

pub enum RestError {
/// The server cannot currently communicate with the blockchain, so is not able to verify
/// which random values have been requested.
TemporarilyUnavailable,
/// A catch-all error for all other types of errors that could occur during processing.
Unknown,
}

impl IntoResponse for RestError {
fn into_response(self) -> Response {
match self {
RestError::TemporarilyUnavailable => (
StatusCode::SERVICE_UNAVAILABLE,
"This service is temporarily unavailable",
)
.into_response(),
RestError::Unknown => (
StatusCode::INTERNAL_SERVER_ERROR,
"An unknown error occurred processing the request",
)
.into_response(),
}
}
}

pub fn routes(state: ApiState) -> Router<(), Body> {
pub fn routes(api_state: ApiState) -> Router<(), Body> {
Router::new()
.route("/", get(index))
.route("/live", get(live))
.route("/metrics", get(metrics))
.route("/ready", get(ready))
.with_state(state)
.route("/metrics", get(metrics))
.with_state(api_state)
}

pub async fn run_api_server(
socket_addr: SocketAddr,
metrics_registry: Arc<RwLock<Registry>>,
mut exit_rx: watch::Receiver<bool>,
) -> Result<()> {
let api_state = ApiState {
metrics_registry: metrics_registry.clone(),
};

let app = Router::new();
let app = app
.merge(routes(api_state))
// Permissive CORS layer to allow all origins
.layer(CorsLayer::permissive());

tracing::info!("Starting API server on: {:?}", &socket_addr);
axum::Server::try_bind(&socket_addr)
.map_err(|e| anyhow!("Failed to bind to address {}: {}", &socket_addr, e))?
.serve(app.into_make_service())
.with_graceful_shutdown(async {
// Ctrl+c signal received
let _ = exit_rx.changed().await;

tracing::info!("Shutting down API server...");
})
.await?;

Ok(())
}
2 changes: 0 additions & 2 deletions apps/argus/src/chain.rs

This file was deleted.

Loading