Skip to content

feat(argus): internal interfaces and actor model #2659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 175 additions & 61 deletions apps/argus/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions apps/argus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ hex = "0.4.3"
prometheus-client = { version = "0.21.2" }
pythnet-sdk = { path = "../../pythnet/pythnet_sdk", features = ["strum"] }
rand = "0.8.5"
ractor = "0.15.3"
reqwest = { version = "0.11.22", features = ["json", "blocking"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_qs = { version = "0.12.0", features = ["axum"] }
Expand All @@ -41,6 +42,7 @@ backoff = { version = "0.4.0", features = ["futures", "tokio"] }
thiserror = "1.0.61"
futures-locks = "0.7.1"
async-trait = "0.1.88"
tokio-stream = "0.1.17"


[dev-dependencies]
Expand Down
13 changes: 13 additions & 0 deletions apps/argus/src/actors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub mod subscription_listener;
pub mod pyth_price_listener;
pub mod chain_price_listener;
pub mod controller;
pub mod price_pusher;
pub mod types;

pub use subscription_listener::SubscriptionListener;
pub use pyth_price_listener::PythPriceListener;
pub use chain_price_listener::ChainPriceListener;
pub use controller::Controller;
pub use price_pusher::PricePusher;
pub use types::*;
96 changes: 96 additions & 0 deletions apps/argus/src/actors/chain_price_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use {
crate::{
actors::types::*,
adapters::{
contract::GetChainPrices,
types::{Price, PriceId},
},
},
anyhow::Result,
ractor::{Actor, ActorProcessingErr, ActorRef},
std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
},
tokio::{sync::RwLock, time},
tracing,
};

#[allow(dead_code)]
pub struct ChainPriceListenerState {
chain_id: String,
contract: Arc<dyn GetChainPrices + Send + Sync>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we make a type alias for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and for ChainId also)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've got one actually, let me use that here

feed_ids: HashSet<PriceId>,
latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using DashMap as a thread-safe hashmap (instead of RwLock<HashMap<...>>)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oo very cool, thanks for sharing

poll_interval: Duration,
}

pub struct ChainPriceListener;
impl Actor for ChainPriceListener {
type Msg = ChainPriceListenerMessage;
type State = ChainPriceListenerState;
type Arguments = (String, Arc<dyn GetChainPrices + Send + Sync>, Duration);

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
(chain_id, contract, poll_interval): Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let state = ChainPriceListenerState {
chain_id: chain_id.clone(),
contract,
feed_ids: HashSet::new(),
latest_prices: Arc::new(RwLock::new(HashMap::new())),
poll_interval,
};

if let Err(e) = state.contract.subscribe_to_price_events().await {
tracing::error!(
chain_id = state.chain_id,
error = %e,
"Failed to subscribe to price events"
);
}

let poll_interval = state.poll_interval;
tokio::spawn(async move {
let mut interval = time::interval(poll_interval);
loop {
interval.tick().await;
tracing::debug!(
chain_id = chain_id.clone(),
"Polling for on-chain price updates"
);
// todo!()
}
});

Ok(state)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ChainPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => {
let price = _state.get_latest_price(&feed_id).await;
reply_port.send(price)?;
}
ChainPriceListenerMessage::UpdateFeedIdSet(_) => {
todo!()
}
}
Ok(())
}
}

impl ChainPriceListenerState {
pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> {
let latest_prices = self.latest_prices.read().await;
latest_prices.get(feed_id).cloned()
}
}
94 changes: 94 additions & 0 deletions apps/argus/src/actors/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use {
crate::{
actors::types::*,
adapters::ethereum::pyth_pulse::SubscriptionParams,
adapters::types::{PriceId, SubscriptionId},
},
anyhow::Result,
ractor::{Actor, ActorProcessingErr, ActorRef},
std::{
collections::{HashMap, HashSet},
time::Duration,
},
tokio::sync::watch,
};

#[allow(dead_code)]
pub struct ControllerState {
chain_id: String,
subscription_listener: ActorRef<SubscriptionListenerMessage>,
pyth_price_listener: ActorRef<PythPriceListenerMessage>,
chain_price_listener: ActorRef<ChainPriceListenerMessage>,
price_pusher: ActorRef<PricePusherMessage>,
update_interval: Duration,
update_loop_running: bool,
stop_sender: Option<watch::Sender<bool>>,
active_subscriptions: HashMap<SubscriptionId, SubscriptionParams>,
feed_ids: HashSet<PriceId>,
}

pub struct Controller;
impl Actor for Controller {
type Msg = ControllerMessage;
type State = ControllerState;
type Arguments = (
String,
ActorRef<SubscriptionListenerMessage>,
ActorRef<PythPriceListenerMessage>,
ActorRef<ChainPriceListenerMessage>,
ActorRef<PricePusherMessage>,
Duration,
);

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
(
chain_id,
subscription_listener,
pyth_price_listener,
chain_price_listener,
price_pusher,
update_interval,
): Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let state = ControllerState {
chain_id,
subscription_listener,
pyth_price_listener,
chain_price_listener,
price_pusher,
update_interval,
update_loop_running: false,
stop_sender: None,
active_subscriptions: HashMap::new(),
feed_ids: HashSet::new(),
};

// Start the update loop
tokio::spawn(async move {
let mut interval = tokio::time::interval(update_interval);
loop {
interval.tick().await;
let _ = myself.cast(ControllerMessage::PerformUpdate);
}
});

Ok(state)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
ControllerMessage::PerformUpdate => {
// Main processing logic. Keep active subscriptions up-to-date, check for price updates, and push them to the chain.
tracing::info!("Performing update (todo)");
}
}
Ok(())
}
}
120 changes: 120 additions & 0 deletions apps/argus/src/actors/price_pusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use {
super::PricePusherMessage,
crate::adapters::types::{ReadPythPrices, UpdateChainPrices},
anyhow::Result,
backoff::{backoff::Backoff, ExponentialBackoff},
ractor::{Actor, ActorProcessingErr, ActorRef},
std::sync::Arc,
tokio::time,
tracing,
};

pub struct PricePusherState {
chain_name: String,
contract: Arc<dyn UpdateChainPrices + Send + Sync>,
pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>,
backoff_policy: ExponentialBackoff,
}

pub struct PricePusher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're going to get in trouble with this Actor for two reasons: (1) the controller is running on a separate frequency, and so its checks for when to push an update are not synchronized with the actions of PricePusher. That is, the PricePusher can be trying to push an update (possibly stuck in backoff) and meanwhile the Controller is repeatedly detecting that the update needs to occur and is sending more PushPriceUpdate messages. (2) this approach serializes the updates for all subscriptions on a chain, so if we can't update subscription 1 for some reason, that blocks subscription 2 as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point. We can definitely handle this better in PricePusher. I'm thinking we can spawn a task to attempt the update with backoff and track the handles. If a newer push for a subscription comes in while the last one is stuck in backoff, we can cancel the outstanding task and spawn a new one. This also means the actor's handle function doesn't block, and we can multiple push attempts for different subscriptions active at a time.

impl Actor for PricePusher {
type Msg = PricePusherMessage;
type State = PricePusherState;
type Arguments = (
String,
Arc<dyn UpdateChainPrices + Send + Sync>,
Arc<dyn ReadPythPrices + Send + Sync>,
ExponentialBackoff,
);

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
(chain_name, contract, hermes_client, backoff_policy): Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let state = PricePusherState {
chain_name,
contract,
pyth_price_client: hermes_client,
backoff_policy,
};

Ok(state)
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
PricePusherMessage::PushPriceUpdates(push_request) => {
let price_ids = push_request.price_ids.clone();
match state.pyth_price_client.get_latest_prices(&price_ids).await {
Ok(update_data) => {
let mut backoff = state.backoff_policy.clone();
let mut attempt = 0;

// TODO: gas escalation policy
loop {
attempt += 1;

match state
.contract
.update_price_feeds(
push_request.subscription_id,
&price_ids,
&update_data,
)
.await
{
Ok(tx_hash) => {
tracing::info!(
chain_id = state.chain_name,
subscription_id = push_request.subscription_id.to_string(),
tx_hash = tx_hash.to_string(),
attempt = attempt,
"Successfully pushed price updates"
);
break;
}
Err(e) => {
if let Some(duration) = backoff.next_backoff() {
tracing::warn!(
chain_id = state.chain_name,
subscription_id = push_request.subscription_id.to_string(),
error = %e,
attempt = attempt,
retry_after_ms = duration.as_millis(),
"Failed to push price updates, retrying"
);
time::sleep(duration).await;
} else {
tracing::error!(
chain_id = state.chain_name,
subscription_id = push_request.subscription_id.to_string(),
error = %e,
attempt = attempt,
"Failed to push price updates, giving up"
);
break;
}
}
}
}
}
Err(e) => {
tracing::error!(
chain_id = state.chain_name,
subscription_id = push_request.subscription_id.to_string(),
error = %e,
"Failed to get Pyth price update data"
);
}
}
}
}
Ok(())
}
}
Loading