-
Notifications
You must be signed in to change notification settings - Fork 262
feat(argus): internal interfaces and actor model #2659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f152bb
970b63c
8a997c6
a767fee
5a66d0d
b7b9706
61f0030
12b5998
11a0dc6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
pub mod subscription_listener; | ||
pub mod pyth_price_listener; | ||
pub mod chain_price_listener; | ||
pub mod controller; | ||
pub mod price_pusher; | ||
pub mod types; | ||
|
||
pub use subscription_listener::SubscriptionListener; | ||
pub use pyth_price_listener::PythPriceListener; | ||
pub use chain_price_listener::ChainPriceListener; | ||
pub use controller::Controller; | ||
pub use price_pusher::PricePusher; | ||
pub use types::*; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
use { | ||
crate::{ | ||
actors::types::*, | ||
adapters::{ | ||
contract::GetChainPrices, | ||
types::{Price, PriceId}, | ||
}, | ||
}, | ||
anyhow::Result, | ||
ractor::{Actor, ActorProcessingErr, ActorRef}, | ||
std::{ | ||
collections::{HashMap, HashSet}, | ||
sync::Arc, | ||
time::Duration, | ||
}, | ||
tokio::{sync::RwLock, time}, | ||
tracing, | ||
}; | ||
|
||
#[allow(dead_code)] | ||
pub struct ChainPriceListenerState { | ||
chain_id: String, | ||
contract: Arc<dyn GetChainPrices + Send + Sync>, | ||
feed_ids: HashSet<PriceId>, | ||
latest_prices: Arc<RwLock<HashMap<PriceId, Price>>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider using DashMap as a thread-safe hashmap (instead of RwLock<HashMap<...>>) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oo very cool, thanks for sharing |
||
poll_interval: Duration, | ||
} | ||
|
||
pub struct ChainPriceListener; | ||
impl Actor for ChainPriceListener { | ||
type Msg = ChainPriceListenerMessage; | ||
type State = ChainPriceListenerState; | ||
type Arguments = (String, Arc<dyn GetChainPrices + Send + Sync>, Duration); | ||
|
||
async fn pre_start( | ||
&self, | ||
_myself: ActorRef<Self::Msg>, | ||
(chain_id, contract, poll_interval): Self::Arguments, | ||
) -> Result<Self::State, ActorProcessingErr> { | ||
let state = ChainPriceListenerState { | ||
chain_id: chain_id.clone(), | ||
contract, | ||
feed_ids: HashSet::new(), | ||
latest_prices: Arc::new(RwLock::new(HashMap::new())), | ||
poll_interval, | ||
}; | ||
|
||
if let Err(e) = state.contract.subscribe_to_price_events().await { | ||
tracing::error!( | ||
chain_id = state.chain_id, | ||
error = %e, | ||
"Failed to subscribe to price events" | ||
); | ||
} | ||
|
||
let poll_interval = state.poll_interval; | ||
tokio::spawn(async move { | ||
let mut interval = time::interval(poll_interval); | ||
loop { | ||
interval.tick().await; | ||
tracing::debug!( | ||
chain_id = chain_id.clone(), | ||
"Polling for on-chain price updates" | ||
); | ||
// todo!() | ||
} | ||
}); | ||
|
||
Ok(state) | ||
} | ||
|
||
async fn handle( | ||
&self, | ||
_myself: ActorRef<Self::Msg>, | ||
message: Self::Msg, | ||
_state: &mut Self::State, | ||
) -> Result<(), ActorProcessingErr> { | ||
match message { | ||
ChainPriceListenerMessage::GetLatestPrice(feed_id, reply_port) => { | ||
let price = _state.get_latest_price(&feed_id).await; | ||
reply_port.send(price)?; | ||
} | ||
ChainPriceListenerMessage::UpdateFeedIdSet(_) => { | ||
todo!() | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl ChainPriceListenerState { | ||
pub async fn get_latest_price(&self, feed_id: &PriceId) -> Option<Price> { | ||
let latest_prices = self.latest_prices.read().await; | ||
latest_prices.get(feed_id).cloned() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
use { | ||
crate::{ | ||
actors::types::*, | ||
adapters::ethereum::pyth_pulse::SubscriptionParams, | ||
adapters::types::{PriceId, SubscriptionId}, | ||
}, | ||
anyhow::Result, | ||
ractor::{Actor, ActorProcessingErr, ActorRef}, | ||
std::{ | ||
collections::{HashMap, HashSet}, | ||
time::Duration, | ||
}, | ||
tokio::sync::watch, | ||
}; | ||
|
||
#[allow(dead_code)] | ||
pub struct ControllerState { | ||
chain_id: String, | ||
subscription_listener: ActorRef<SubscriptionListenerMessage>, | ||
pyth_price_listener: ActorRef<PythPriceListenerMessage>, | ||
chain_price_listener: ActorRef<ChainPriceListenerMessage>, | ||
price_pusher: ActorRef<PricePusherMessage>, | ||
update_interval: Duration, | ||
update_loop_running: bool, | ||
stop_sender: Option<watch::Sender<bool>>, | ||
active_subscriptions: HashMap<SubscriptionId, SubscriptionParams>, | ||
feed_ids: HashSet<PriceId>, | ||
} | ||
|
||
pub struct Controller; | ||
impl Actor for Controller { | ||
type Msg = ControllerMessage; | ||
type State = ControllerState; | ||
type Arguments = ( | ||
String, | ||
ActorRef<SubscriptionListenerMessage>, | ||
ActorRef<PythPriceListenerMessage>, | ||
ActorRef<ChainPriceListenerMessage>, | ||
ActorRef<PricePusherMessage>, | ||
Duration, | ||
); | ||
|
||
async fn pre_start( | ||
&self, | ||
myself: ActorRef<Self::Msg>, | ||
( | ||
chain_id, | ||
subscription_listener, | ||
pyth_price_listener, | ||
chain_price_listener, | ||
price_pusher, | ||
update_interval, | ||
): Self::Arguments, | ||
) -> Result<Self::State, ActorProcessingErr> { | ||
let state = ControllerState { | ||
chain_id, | ||
subscription_listener, | ||
pyth_price_listener, | ||
chain_price_listener, | ||
price_pusher, | ||
update_interval, | ||
update_loop_running: false, | ||
stop_sender: None, | ||
active_subscriptions: HashMap::new(), | ||
feed_ids: HashSet::new(), | ||
}; | ||
|
||
// Start the update loop | ||
tokio::spawn(async move { | ||
let mut interval = tokio::time::interval(update_interval); | ||
loop { | ||
interval.tick().await; | ||
let _ = myself.cast(ControllerMessage::PerformUpdate); | ||
} | ||
}); | ||
|
||
Ok(state) | ||
} | ||
|
||
async fn handle( | ||
&self, | ||
_myself: ActorRef<Self::Msg>, | ||
message: Self::Msg, | ||
_state: &mut Self::State, | ||
) -> Result<(), ActorProcessingErr> { | ||
match message { | ||
ControllerMessage::PerformUpdate => { | ||
// Main processing logic. Keep active subscriptions up-to-date, check for price updates, and push them to the chain. | ||
tracing::info!("Performing update (todo)"); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
use { | ||
super::PricePusherMessage, | ||
crate::adapters::types::{ReadPythPrices, UpdateChainPrices}, | ||
anyhow::Result, | ||
backoff::{backoff::Backoff, ExponentialBackoff}, | ||
ractor::{Actor, ActorProcessingErr, ActorRef}, | ||
std::sync::Arc, | ||
tokio::time, | ||
tracing, | ||
}; | ||
|
||
pub struct PricePusherState { | ||
chain_name: String, | ||
contract: Arc<dyn UpdateChainPrices + Send + Sync>, | ||
pyth_price_client: Arc<dyn ReadPythPrices + Send + Sync>, | ||
backoff_policy: ExponentialBackoff, | ||
} | ||
|
||
pub struct PricePusher; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're going to get in trouble with this Actor for two reasons: (1) the controller is running on a separate frequency, and so its checks for when to push an update are not synchronized with the actions of PricePusher. That is, the PricePusher can be trying to push an update (possibly stuck in backoff) and meanwhile the Controller is repeatedly detecting that the update needs to occur and is sending more PushPriceUpdate messages. (2) this approach serializes the updates for all subscriptions on a chain, so if we can't update subscription 1 for some reason, that blocks subscription 2 as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good point. We can definitely handle this better in PricePusher. I'm thinking we can |
||
impl Actor for PricePusher { | ||
type Msg = PricePusherMessage; | ||
type State = PricePusherState; | ||
type Arguments = ( | ||
String, | ||
Arc<dyn UpdateChainPrices + Send + Sync>, | ||
Arc<dyn ReadPythPrices + Send + Sync>, | ||
ExponentialBackoff, | ||
); | ||
|
||
async fn pre_start( | ||
&self, | ||
_myself: ActorRef<Self::Msg>, | ||
(chain_name, contract, hermes_client, backoff_policy): Self::Arguments, | ||
) -> Result<Self::State, ActorProcessingErr> { | ||
let state = PricePusherState { | ||
chain_name, | ||
contract, | ||
pyth_price_client: hermes_client, | ||
backoff_policy, | ||
}; | ||
|
||
Ok(state) | ||
} | ||
|
||
async fn handle( | ||
&self, | ||
_myself: ActorRef<Self::Msg>, | ||
message: Self::Msg, | ||
state: &mut Self::State, | ||
) -> Result<(), ActorProcessingErr> { | ||
match message { | ||
PricePusherMessage::PushPriceUpdates(push_request) => { | ||
let price_ids = push_request.price_ids.clone(); | ||
match state.pyth_price_client.get_latest_prices(&price_ids).await { | ||
Ok(update_data) => { | ||
let mut backoff = state.backoff_policy.clone(); | ||
let mut attempt = 0; | ||
|
||
// TODO: gas escalation policy | ||
loop { | ||
attempt += 1; | ||
|
||
match state | ||
.contract | ||
.update_price_feeds( | ||
push_request.subscription_id, | ||
&price_ids, | ||
&update_data, | ||
) | ||
.await | ||
{ | ||
Ok(tx_hash) => { | ||
tracing::info!( | ||
chain_id = state.chain_name, | ||
subscription_id = push_request.subscription_id.to_string(), | ||
tx_hash = tx_hash.to_string(), | ||
attempt = attempt, | ||
"Successfully pushed price updates" | ||
); | ||
break; | ||
} | ||
Err(e) => { | ||
if let Some(duration) = backoff.next_backoff() { | ||
tracing::warn!( | ||
chain_id = state.chain_name, | ||
subscription_id = push_request.subscription_id.to_string(), | ||
error = %e, | ||
attempt = attempt, | ||
retry_after_ms = duration.as_millis(), | ||
"Failed to push price updates, retrying" | ||
); | ||
time::sleep(duration).await; | ||
} else { | ||
tracing::error!( | ||
chain_id = state.chain_name, | ||
subscription_id = push_request.subscription_id.to_string(), | ||
error = %e, | ||
attempt = attempt, | ||
"Failed to push price updates, giving up" | ||
); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
Err(e) => { | ||
tracing::error!( | ||
chain_id = state.chain_name, | ||
subscription_id = push_request.subscription_id.to_string(), | ||
error = %e, | ||
"Failed to get Pyth price update data" | ||
); | ||
} | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we make a type alias for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and for ChainId also)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've got one actually, let me use that here