Skip to content

Commit 36523e7

Browse files
committed
refactor(agent): convert global store to an Api
1 parent dfebb4d commit 36523e7

File tree

11 files changed

+511
-581
lines changed

11 files changed

+511
-581
lines changed

src/agent.rs

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ pub mod store;
7373
use {
7474
self::{
7575
config::Config,
76+
metrics::PROMETHEUS_REGISTRY,
7677
pythd::{
7778
adapter::notifier,
7879
api::rpc,
@@ -118,26 +119,29 @@ impl Agent {
118119
// Create the channels
119120
// TODO: make all components listen to shutdown signal
120121
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
121-
let (primary_oracle_updates_tx, primary_oracle_updates_rx) =
122-
mpsc::channel(self.config.channel_capacities.primary_oracle_updates);
123-
let (secondary_oracle_updates_tx, secondary_oracle_updates_rx) =
124-
mpsc::channel(self.config.channel_capacities.secondary_oracle_updates);
125-
let (global_store_lookup_tx, global_store_lookup_rx) =
126-
mpsc::channel(self.config.channel_capacities.global_store_lookup);
127122
let (local_store_tx, local_store_rx) =
128123
mpsc::channel(self.config.channel_capacities.local_store);
129124
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
130125
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);
131126

127+
let mut registry = PROMETHEUS_REGISTRY.lock().await;
128+
129+
// Create the Pythd Adapter.
130+
let adapter = Arc::new(pythd::adapter::Adapter::new(
131+
self.config.pythd_adapter.clone(),
132+
local_store_tx.clone(),
133+
logger.clone(),
134+
&mut registry,
135+
));
136+
132137
// Spawn the primary network
133138
jhs.extend(network::spawn_network(
134139
self.config.primary_network.clone(),
135140
network::Network::Primary,
136141
local_store_tx.clone(),
137-
global_store_lookup_tx.clone(),
138-
primary_oracle_updates_tx,
139142
primary_keypair_loader_tx,
140143
logger.new(o!("primary" => true)),
144+
adapter.clone(),
141145
)?);
142146

143147
// Spawn the secondary network, if needed
@@ -146,53 +150,35 @@ impl Agent {
146150
config.clone(),
147151
network::Network::Secondary,
148152
local_store_tx.clone(),
149-
global_store_lookup_tx.clone(),
150-
secondary_oracle_updates_tx,
151153
secondary_keypair_loader_tx,
152154
logger.new(o!("primary" => false)),
155+
adapter.clone(),
153156
)?);
154157
}
155158

156-
// Create the Pythd Adapter.
157-
let adapter = Arc::new(pythd::adapter::Adapter::new(
158-
self.config.pythd_adapter.clone(),
159-
global_store_lookup_tx.clone(),
160-
local_store_tx.clone(),
161-
logger.clone(),
162-
));
163-
164159
// Create the Notifier task for the Pythd RPC.
165160
jhs.push(tokio::spawn(notifier(
166161
adapter.clone(),
167162
shutdown_tx.subscribe(),
168163
)));
169164

170-
// Spawn the Global Store
171-
jhs.push(store::global::spawn_store(
172-
global_store_lookup_rx,
173-
primary_oracle_updates_rx,
174-
secondary_oracle_updates_rx,
175-
adapter.clone(),
176-
logger.clone(),
177-
));
178-
179165
// Spawn the Local Store
180166
jhs.push(store::local::spawn_store(local_store_rx, logger.clone()));
181167

182168
// Spawn the Pythd API Server
183169
jhs.push(tokio::spawn(rpc::run(
184170
self.config.pythd_api_server.clone(),
185171
logger.clone(),
186-
adapter,
172+
adapter.clone(),
187173
shutdown_tx.subscribe(),
188174
)));
189175

190176
// Spawn the metrics server
191177
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
192178
self.config.metrics_server.bind_address,
193179
local_store_tx,
194-
global_store_lookup_tx,
195180
logger.clone(),
181+
adapter,
196182
)));
197183

198184
// Spawn the remote keypair loader endpoint for both networks

src/agent/dashboard.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
use {
22
super::{
3-
solana::oracle::PriceEntry,
4-
store::{
5-
global::{
6-
AllAccountsData,
7-
AllAccountsMetadata,
8-
Lookup,
9-
PriceAccountMetadata,
10-
},
11-
local::{
12-
Message,
13-
PriceInfo,
14-
},
3+
pythd::adapter::global::GlobalStore,
4+
solana::{
5+
network::Network,
6+
oracle::PriceEntry,
7+
},
8+
store::local::{
9+
Message,
10+
PriceInfo,
11+
},
12+
},
13+
crate::agent::{
14+
metrics::MetricsServer,
15+
pythd::adapter::global::{
16+
AllAccountsData,
17+
AllAccountsMetadata,
18+
PriceAccountMetadata,
1519
},
1620
},
17-
crate::agent::metrics::MetricsServer,
1821
chrono::DateTime,
1922
pyth_sdk::{
2023
Identifier,
@@ -44,8 +47,6 @@ impl MetricsServer {
4447
pub async fn render_dashboard(&self) -> Result<String, Box<dyn std::error::Error>> {
4548
// Prepare response channel for requests
4649
let (local_tx, local_rx) = oneshot::channel();
47-
let (global_data_tx, global_data_rx) = oneshot::channel();
48-
let (global_metadata_tx, global_metadata_rx) = oneshot::channel();
4950

5051
// Request price data from local and global store
5152
self.local_store_tx
@@ -54,23 +55,11 @@ impl MetricsServer {
5455
})
5556
.await?;
5657

57-
self.global_store_lookup_tx
58-
.send(Lookup::LookupAllAccountsData {
59-
network: super::solana::network::Network::Primary,
60-
result_tx: global_data_tx,
61-
})
62-
.await?;
63-
64-
self.global_store_lookup_tx
65-
.send(Lookup::LookupAllAccountsMetadata {
66-
result_tx: global_metadata_tx,
67-
})
68-
.await?;
58+
let global_data = GlobalStore::accounts_data(&*self.adapter, Network::Primary).await?;
59+
let global_metadata = GlobalStore::accounts_metadata(&*self.adapter).await?;
6960

7061
// Await the results
7162
let local_data = local_rx.await?;
72-
let global_data = global_data_rx.await??;
73-
let global_metadata = global_metadata_rx.await??;
7463

7564
let symbol_view =
7665
build_dashboard_data(local_data, global_data, global_metadata, &self.logger);

src/agent/metrics.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use {
2-
super::store::{
3-
global::Lookup,
4-
local::Message,
2+
super::{
3+
pythd::adapter::Adapter,
4+
store::local::Message,
55
},
66
crate::agent::{
77
solana::oracle::PriceEntry,
@@ -76,25 +76,25 @@ lazy_static! {
7676
/// dashboard and metrics.
7777
pub struct MetricsServer {
7878
/// Used to pull the state of all symbols in local store
79-
pub local_store_tx: mpsc::Sender<Message>,
80-
pub global_store_lookup_tx: mpsc::Sender<Lookup>,
81-
pub start_time: Instant,
82-
pub logger: Logger,
79+
pub local_store_tx: mpsc::Sender<Message>,
80+
pub start_time: Instant,
81+
pub logger: Logger,
82+
pub adapter: Arc<Adapter>,
8383
}
8484

8585
impl MetricsServer {
8686
/// Instantiate a metrics API with a dashboard
8787
pub async fn spawn(
8888
addr: impl Into<SocketAddr> + 'static,
8989
local_store_tx: mpsc::Sender<Message>,
90-
global_store_lookup_tx: mpsc::Sender<Lookup>,
9190
logger: Logger,
91+
adapter: Arc<Adapter>,
9292
) {
9393
let server = MetricsServer {
9494
local_store_tx,
95-
global_store_lookup_tx,
9695
start_time: Instant::now(),
9796
logger,
97+
adapter,
9898
};
9999

100100
let shared_state = Arc::new(Mutex::new(server));
@@ -111,7 +111,7 @@ impl MetricsServer {
111111
.await
112112
.unwrap_or_else(|e| {
113113
// Add logging here
114-
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
114+
error!(locked_state.logger,"Dashboard: Rendering failed"; "error" => e.to_string());
115115

116116
// Withhold failure details from client
117117
"Could not render dashboard! See the logs for details".to_owned()

0 commit comments

Comments
 (0)