Skip to content

Commit 348b1af

Browse files
feat(argus): internal interfaces and shared memory model (#2682)
* feat: argus skeleton and internal interfaces * doc: add module docs * feat: use dashmap instead of rwlock<hashmap> * fix: pr comments * fix: make interval values configurable * fix: remove streaming in GetChainPrices
1 parent 7330900 commit 348b1af

26 files changed

+2520
-1298
lines changed

apps/argus/Cargo.lock

Lines changed: 1293 additions & 866 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/argus/Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ anyhow = "1.0.75"
88
axum = { version = "0.6.20", features = ["json", "ws", "macros"] }
99
axum-macros = { version = "0.3.8" }
1010
base64 = { version = "0.21.0" }
11-
bincode = "1.3.3"
12-
byteorder = "1.5.0"
1311
clap = { version = "4.4.6", features = ["derive", "cargo", "env"] }
14-
ethabi = "18.0.0"
1512
ethers = { version = "2.0.14", features = ["ws"] }
1613
fortuna = { path = "../fortuna" }
1714
futures = { version = "0.3.28" }
@@ -21,7 +18,6 @@ pythnet-sdk = { path = "../../pythnet/pythnet_sdk", features = ["strum"] }
2118
rand = "0.8.5"
2219
reqwest = { version = "0.11.22", features = ["json", "blocking"] }
2320
serde = { version = "1.0.188", features = ["derive"] }
24-
serde_qs = { version = "0.12.0", features = ["axum"] }
2521
serde_json = "1.0.107"
2622
serde_with = { version = "3.4.0", features = ["hex", "base64"] }
2723
serde_yaml = "0.9.25"
@@ -41,8 +37,11 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
4137
thiserror = "1.0.61"
4238
futures-locks = "0.7.1"
4339
async-trait = "0.1.88"
40+
tokio-stream = "0.1.17"
41+
dashmap = "6.1.0"
42+
pyth-sdk = "0.8.0"
43+
humantime-serde = "1.1.1"
4444

4545

4646
[dev-dependencies]
4747
mockall = "0.13.1"
48-
axum-test = "13.1.1"

apps/argus/config.sample.yaml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ chains:
33
geth_rpc_addr: https://replicator.pegasus.lightlink.io/rpc/v1
44
contract_addr: 0x8250f4aF4B972684F7b336503E2D6dFeDeB1487a
55

6-
76
# Multiplier for the priority fee estimate, as a percentage (i.e., 100 = no change).
87
# Defaults to 100 if the field is omitted.
98
priority_fee_multiplier_pct: 100
@@ -33,3 +32,16 @@ keeper:
3332
value: 0xabcd
3433
# For production, you can store the private key in a file.
3534
# file: keeper-key.txt
35+
36+
# Service polling intervals (e.g. 1m, 1min, 1m30s, 1min 30sec). See format here: https://docs.rs/humantime/latest/humantime/
37+
# These control how frequently different services poll for updates
38+
subscription_poll_interval: 1m # How often to check for new subscriptions
39+
chain_price_poll_interval: 10s # How often to check chain prices
40+
pyth_price_poll_interval: 10s # How often to check Pyth prices
41+
controller_update_interval: 10s # How often to update the controller
42+
43+
# Backoff policy configuration for retrying failed operations
44+
backoff_initial_interval: 1s # Initial wait time between retries
45+
backoff_max_interval: 60s # Maximum wait time between retries
46+
backoff_multiplier: 2.0 # Multiply wait time by this factor on each retry
47+
backoff_max_elapsed_time: 300s # Maximum total time to keep retrying

apps/argus/src/adapters.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pub mod contract;
2+
pub mod ethereum;
3+
pub mod hermes;
4+
pub mod types;

apps/argus/src/adapters/contract.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use super::ethereum::PythPulse;
2+
use super::types::*;
3+
use crate::adapters::ethereum::SubscriptionParams;
4+
use anyhow::Result;
5+
use async_trait::async_trait;
6+
use ethers::providers::Middleware;
7+
use ethers::types::H256;
8+
use pyth_sdk::Price;
9+
use std::collections::HashMap;
10+
11+
#[async_trait]
12+
pub trait GetChainPrices {
13+
async fn get_price_unsafe(
14+
&self,
15+
subscription_id: SubscriptionId,
16+
feed_id: &PriceId,
17+
) -> Result<Option<Price>>;
18+
}
19+
20+
#[async_trait]
21+
impl<M: Middleware + 'static> GetChainPrices for PythPulse<M> {
22+
async fn get_price_unsafe(
23+
&self,
24+
_subscription_id: SubscriptionId,
25+
_feed_id: &PriceId,
26+
) -> Result<Option<Price>> {
27+
todo!()
28+
}
29+
}
30+
#[async_trait]
31+
pub trait UpdateChainPrices {
32+
async fn update_price_feeds(
33+
&self,
34+
subscription_id: SubscriptionId,
35+
price_ids: &[PriceId],
36+
update_data: &[Vec<u8>],
37+
) -> Result<H256>;
38+
}
39+
#[async_trait]
40+
impl<M: Middleware + 'static> UpdateChainPrices for PythPulse<M> {
41+
async fn update_price_feeds(
42+
&self,
43+
subscription_id: SubscriptionId,
44+
price_ids: &[PriceId],
45+
update_data: &[Vec<u8>],
46+
) -> Result<H256> {
47+
tracing::debug!(
48+
subscription_id = subscription_id.to_string(),
49+
price_ids_count = price_ids.len(),
50+
update_data_count = update_data.len(),
51+
"Updating price feeds on-chain via PythPulse"
52+
);
53+
todo!()
54+
}
55+
}
56+
#[async_trait]
57+
pub trait ReadChainSubscriptions {
58+
async fn get_active_subscriptions(&self)
59+
-> Result<HashMap<SubscriptionId, SubscriptionParams>>;
60+
}
61+
62+
#[async_trait]
63+
impl<M: Middleware + 'static> ReadChainSubscriptions for PythPulse<M> {
64+
async fn get_active_subscriptions(
65+
&self,
66+
) -> Result<HashMap<SubscriptionId, SubscriptionParams>> {
67+
tracing::debug!("Getting active subscriptions via PythPulse");
68+
Ok(HashMap::new())
69+
}
70+
}

apps/argus/src/chain/ethereum.rs renamed to apps/argus/src/adapters/ethereum.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use {
2-
crate::{
3-
api::ChainId,
4-
chain::reader::{BlockNumber, BlockStatus},
5-
config::EthereumConfig,
6-
},
2+
crate::config::EthereumConfig,
3+
crate::state::ChainName,
74
anyhow::{Error, Result},
85
ethers::{
96
contract::abigen,
@@ -92,7 +89,7 @@ impl InstrumentedSignablePythContract {
9289
pub async fn from_config(
9390
chain_config: &EthereumConfig,
9491
private_key: &str,
95-
chain_id: ChainId,
92+
chain_id: ChainName,
9693
metrics: Arc<RpcMetrics>,
9794
) -> Result<Self> {
9895
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
@@ -114,7 +111,7 @@ impl PythContract {
114111
impl InstrumentedPythContract {
115112
pub fn from_config(
116113
chain_config: &EthereumConfig,
117-
chain_id: ChainId,
114+
chain_id: ChainName,
118115
metrics: Arc<RpcMetrics>,
119116
) -> Result<Self> {
120117
let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?;
@@ -144,3 +141,30 @@ impl<M: Middleware + 'static> PythPulse<M> {
144141
.as_u64())
145142
}
146143
}
144+
145+
// TODO: extract to a SDK
146+
147+
pub type BlockNumber = u64;
148+
149+
#[derive(
150+
Copy, Clone, Debug, Default, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize,
151+
)]
152+
pub enum BlockStatus {
153+
/// Latest block
154+
#[default]
155+
Latest,
156+
/// Finalized block accepted as canonical
157+
Finalized,
158+
/// Safe head block
159+
Safe,
160+
}
161+
162+
impl From<BlockStatus> for EthersBlockNumber {
163+
fn from(val: BlockStatus) -> Self {
164+
match val {
165+
BlockStatus::Latest => EthersBlockNumber::Latest,
166+
BlockStatus::Finalized => EthersBlockNumber::Finalized,
167+
BlockStatus::Safe => EthersBlockNumber::Safe,
168+
}
169+
}
170+
}

apps/argus/src/adapters/hermes.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use super::types::*;
2+
use anyhow::Result;
3+
use async_trait::async_trait;
4+
5+
pub struct HermesClient;
6+
7+
#[async_trait]
8+
pub trait ReadPythPrices {
9+
async fn get_latest_prices(&self, feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>>;
10+
async fn subscribe_to_price_updates(&self, feed_ids: &[PriceId]) -> Result<()>; // TODO: return a stream
11+
}
12+
#[async_trait]
13+
impl ReadPythPrices for HermesClient {
14+
async fn get_latest_prices(&self, _feed_ids: &[PriceId]) -> Result<Vec<Vec<u8>>> {
15+
todo!()
16+
}
17+
18+
async fn subscribe_to_price_updates(&self, _feed_ids: &[PriceId]) -> Result<()> {
19+
todo!()
20+
}
21+
}

apps/argus/src/adapters/types.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
use ethers::types::U256;
2+
use pyth_sdk::PriceIdentifier;
3+
4+
pub type PriceId = PriceIdentifier;
5+
pub type SubscriptionId = U256;

apps/argus/src/api.rs

Lines changed: 45 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,61 @@
1+
//! API server for Prometheus metrics and health checks
2+
13
use {
2-
crate::chain::reader::BlockStatus,
3-
axum::{
4-
body::Body,
5-
http::StatusCode,
6-
response::{IntoResponse, Response},
7-
routing::get,
8-
Router,
9-
},
10-
prometheus_client::{
11-
encoding::EncodeLabelSet,
12-
metrics::{counter::Counter, family::Family},
13-
registry::Registry,
14-
},
15-
std::sync::Arc,
16-
tokio::sync::RwLock,
4+
anyhow::{anyhow, Result},
5+
axum::{body::Body, routing::get, Router},
6+
index::index,
7+
live::live,
8+
metrics::metrics,
9+
prometheus_client::registry::Registry,
10+
ready::ready,
11+
std::{net::SocketAddr, sync::Arc},
12+
tokio::sync::{watch, RwLock},
13+
tower_http::cors::CorsLayer,
1714
};
18-
pub use {index::*, live::*, metrics::*, ready::*};
19-
2015
mod index;
2116
mod live;
2217
mod metrics;
2318
mod ready;
24-
25-
pub type ChainId = String;
26-
27-
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
28-
pub struct RequestLabel {
29-
pub value: String,
30-
}
31-
32-
pub struct ApiMetrics {
33-
pub http_requests: Family<RequestLabel, Counter>,
34-
}
35-
3619
#[derive(Clone)]
3720
pub struct ApiState {
3821
pub metrics_registry: Arc<RwLock<Registry>>,
39-
40-
/// Prometheus metrics
41-
pub metrics: Arc<ApiMetrics>,
42-
}
43-
44-
impl ApiState {
45-
pub async fn new(metrics_registry: Arc<RwLock<Registry>>) -> ApiState {
46-
let metrics = ApiMetrics {
47-
http_requests: Family::default(),
48-
};
49-
50-
let http_requests = metrics.http_requests.clone();
51-
metrics_registry.write().await.register(
52-
"http_requests",
53-
"Number of HTTP requests received",
54-
http_requests,
55-
);
56-
57-
ApiState {
58-
metrics: Arc::new(metrics),
59-
metrics_registry,
60-
}
61-
}
62-
}
63-
64-
/// The state of the service for a single blockchain.
65-
#[derive(Clone)]
66-
pub struct BlockchainState {
67-
/// The chain id for this blockchain, useful for logging
68-
pub id: ChainId,
69-
/// The BlockStatus of the block that is considered to be confirmed on the blockchain.
70-
/// For eg., Finalized, Safe
71-
pub confirmed_block_status: BlockStatus,
7222
}
7323

74-
pub enum RestError {
75-
/// The server cannot currently communicate with the blockchain, so is not able to verify
76-
/// which random values have been requested.
77-
TemporarilyUnavailable,
78-
/// A catch-all error for all other types of errors that could occur during processing.
79-
Unknown,
80-
}
81-
82-
impl IntoResponse for RestError {
83-
fn into_response(self) -> Response {
84-
match self {
85-
RestError::TemporarilyUnavailable => (
86-
StatusCode::SERVICE_UNAVAILABLE,
87-
"This service is temporarily unavailable",
88-
)
89-
.into_response(),
90-
RestError::Unknown => (
91-
StatusCode::INTERNAL_SERVER_ERROR,
92-
"An unknown error occurred processing the request",
93-
)
94-
.into_response(),
95-
}
96-
}
97-
}
98-
99-
pub fn routes(state: ApiState) -> Router<(), Body> {
24+
pub fn routes(api_state: ApiState) -> Router<(), Body> {
10025
Router::new()
10126
.route("/", get(index))
10227
.route("/live", get(live))
103-
.route("/metrics", get(metrics))
10428
.route("/ready", get(ready))
105-
.with_state(state)
29+
.route("/metrics", get(metrics))
30+
.with_state(api_state)
31+
}
32+
33+
pub async fn run_api_server(
34+
socket_addr: SocketAddr,
35+
metrics_registry: Arc<RwLock<Registry>>,
36+
mut exit_rx: watch::Receiver<bool>,
37+
) -> Result<()> {
38+
let api_state = ApiState {
39+
metrics_registry: metrics_registry.clone(),
40+
};
41+
42+
let app = Router::new();
43+
let app = app
44+
.merge(routes(api_state))
45+
// Permissive CORS layer to allow all origins
46+
.layer(CorsLayer::permissive());
47+
48+
tracing::info!("Starting API server on: {:?}", &socket_addr);
49+
axum::Server::try_bind(&socket_addr)
50+
.map_err(|e| anyhow!("Failed to bind to address {}: {}", &socket_addr, e))?
51+
.serve(app.into_make_service())
52+
.with_graceful_shutdown(async {
53+
// Ctrl+c signal received
54+
let _ = exit_rx.changed().await;
55+
56+
tracing::info!("Shutting down API server...");
57+
})
58+
.await?;
59+
60+
Ok(())
10661
}

apps/argus/src/chain.rs

Lines changed: 0 additions & 2 deletions
This file was deleted.

0 commit comments

Comments
 (0)