Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ENV PATH="${PATH}:/root/.local/bin"
RUN poetry config virtualenvs.in-project true

# Install Solana Tool Suite
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.11/install)"
RUN sh -c "$(curl -sSfL https://release.solana.com/v1.14.17/install)"
ENV PATH="${PATH}:/root/.local/share/solana/install/active_release/bin"

ADD . /agent
Expand Down
43 changes: 14 additions & 29 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,29 @@ impl Agent {
// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
mpsc::channel(self.config.channel_capacities.secondary_oracle_updates);
let (global_store_lookup_tx, global_store_lookup_rx) =
mpsc::channel(self.config.channel_capacities.global_store_lookup);
let (local_store_tx, local_store_rx) =
mpsc::channel(self.config.channel_capacities.local_store);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter = Arc::new(
pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
local_store_tx.clone(),
logger.clone(),
)
.await,
);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
local_store_tx.clone(),
global_store_lookup_tx.clone(),
primary_oracle_updates_tx,
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
adapter.clone(),
)?);

// Spawn the secondary network, if needed
Expand All @@ -146,53 +149,35 @@ impl Agent {
config.clone(),
network::Network::Secondary,
local_store_tx.clone(),
global_store_lookup_tx.clone(),
secondary_oracle_updates_tx,
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
adapter.clone(),
)?);
}

// Create the Pythd Adapter.
let adapter = Arc::new(pythd::adapter::Adapter::new(
self.config.pythd_adapter.clone(),
global_store_lookup_tx.clone(),
local_store_tx.clone(),
logger.clone(),
));

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(
adapter.clone(),
shutdown_tx.subscribe(),
)));

// Spawn the Global Store
jhs.push(store::global::spawn_store(
global_store_lookup_rx,
primary_oracle_updates_rx,
secondary_oracle_updates_rx,
adapter.clone(),
logger.clone(),
));

// Spawn the Local Store
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
adapter,
adapter.clone(),
shutdown_tx.subscribe(),
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
self.config.metrics_server.bind_address,
local_store_tx,
global_store_lookup_tx,
logger.clone(),
adapter,
)));

// Spawn the remote keypair loader endpoint for both networks
Expand Down
47 changes: 18 additions & 29 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use {
super::{
solana::oracle::PriceEntry,
store::{
global::{
AllAccountsData,
AllAccountsMetadata,
Lookup,
PriceAccountMetadata,
},
local::{
Message,
PriceInfo,
},
pythd::adapter::global::GlobalStore,
solana::{
network::Network,
oracle::PriceEntry,
},
store::local::{
Message,
PriceInfo,
},
},
crate::agent::{
metrics::MetricsServer,
pythd::adapter::global::{
AllAccountsData,
AllAccountsMetadata,
PriceAccountMetadata,
},
},
crate::agent::metrics::MetricsServer,
chrono::DateTime,
pyth_sdk::{
Identifier,
Expand Down Expand Up @@ -44,8 +47,6 @@ impl MetricsServer {
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
// Prepare response channel for requests
let (local_tx, local_rx) = oneshot::channel();
let (global_data_tx, global_data_rx) = oneshot::channel();
let (global_metadata_tx, global_metadata_rx) = oneshot::channel();

// Request price data from local and global store
self.local_store_tx
Expand All @@ -54,23 +55,11 @@ impl MetricsServer {
})
.await?;

self.global_store_lookup_tx
.send(Lookup::LookupAllAccountsData {
network: super::solana::network::Network::Primary,
result_tx: global_data_tx,
})
.await?;

self.global_store_lookup_tx
.send(Lookup::LookupAllAccountsMetadata {
result_tx: global_metadata_tx,
})
.await?;
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;

// Await the results
let local_data = local_rx.await?;
let global_data = global_data_rx.await??;
let global_metadata = global_metadata_rx.await??;

let symbol_view =
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);
Expand Down
20 changes: 10 additions & 10 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
super::store::{
global::Lookup,
local::Message,
super::{
pythd::adapter::Adapter,
store::local::Message,
},
crate::agent::{
solana::oracle::PriceEntry,
Expand Down Expand Up @@ -74,25 +74,25 @@ lazy_static! {
/// dashboard and metrics.
pub struct MetricsServer {
/// Used to pull the state of all symbols in local store
pub local_store_tx: mpsc::Sender<Message>,
pub global_store_lookup_tx: mpsc::Sender<Lookup>,
pub start_time: Instant,
pub logger: Logger,
pub local_store_tx: mpsc::Sender<Message>,
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<Adapter>,
}

impl MetricsServer {
/// Instantiate a metrics API with a dashboard
pub async fn spawn(
addr: impl Into<SocketAddr> + 'static,
local_store_tx: mpsc::Sender<Message>,
global_store_lookup_tx: mpsc::Sender<Lookup>,
logger: Logger,
adapter: Arc<Adapter>,
) {
let server = MetricsServer {
local_store_tx,
global_store_lookup_tx,
start_time: Instant::now(),
logger,
adapter,
};

let shared_state = Arc::new(Mutex::new(server));
Expand All @@ -109,7 +109,7 @@ impl MetricsServer {
.await
.unwrap_or_else(|e| {
// Add logging here
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());

// Withhold failure details from client
"Could not render dashboard! See the logs for details".to_owned()
Expand Down
Loading