Skip to content

Commit af91c2f

Browse files
authored
feat: RPC redundancy (#157)
1 parent 430ee06 commit af91c2f

15 files changed

+347
-110
lines changed

Cargo.lock

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ solana-account-decoder = "1.18.8"
3030
solana-client = "1.18.8"
3131
solana-pubkey = "2.3.0"
3232
solana-sdk = "1.18.8"
33+
solana-transaction-status = "1.18.26"
3334
bincode = { version = "2.0.1", features = ["serde"] }
3435
rand = "0.8.5"
3536
config = "0.14.0"

config/config.sample.pythnet.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910"
33

44
[primary_network]
55

6-
# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually
6+
# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually
77
# rate-limited, so a private endpoint should be used in most cases.
8-
rpc_url = "https://api2.pythnet.pyth.network"
8+
# API calls will cycle through each on failure.
9+
rpc_urls = ["https://api2.pythnet.pyth.network"]
910

1011
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
1112
# This can be omitted when oracle.subscriber_enabled is set to false.

config/config.sample.pythtest.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910"
33

44
[primary_network]
55

6-
# HTTP(S) endpoint of the RPC node.
7-
rpc_url = "https://api.pythtest.pyth.network"
6+
# HTTP(S) endpoints of the RPC node.
7+
# API calls will cycle through each on failure.
8+
rpc_urls = ["https://api.pythtest.pyth.network"]
89

910
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes
1011
# on the network. This can be omitted when oracle.subscriber_enabled is set to

config/config.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910"
2929
[primary_network]
3030
### Required fields ###
3131

32-
# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually
32+
# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually
3333
# rate-limited for Pythnet, and so a private endpoint should be used in most
3434
# cases. For Pythtest, the public endpoint can be used.
35-
rpc_url = "https://api.pythtest.pyth.network"
35+
# API calls will cycle through each on failure.
36+
rpc_urls = ["https://api.pythtest.pyth.network"]
3637

3738
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
3839
# This can be omitted when oracle.subscriber_enabled is set to false.

src/agent.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub mod pyth;
8383
pub mod services;
8484
pub mod solana;
8585
pub mod state;
86+
pub mod utils;
8687

8788
lazy_static! {
8889
/// A static exit flag to indicate to running threads that we're shutting down. This is used to
@@ -183,11 +184,11 @@ impl Agent {
183184
// Spawn the remote keypair loader endpoint for both networks
184185
handles.extend(
185186
services::keypairs(
186-
self.config.primary_network.rpc_url.clone(),
187+
self.config.primary_network.rpc_urls.clone(),
187188
self.config
188189
.secondary_network
189190
.as_ref()
190-
.map(|c| c.rpc_url.clone()),
191+
.map(|c| c.rpc_urls.clone()),
191192
self.config.remote_keypair_loader.clone(),
192193
state,
193194
)

src/agent/services/exporter.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use {
88
exporter::Exporter,
99
transactions::Transactions,
1010
},
11+
utils::rpc_multi_client::RpcMultiClient,
1112
},
1213
anyhow::Result,
1314
futures_util::future,
1415
serde::{
1516
Deserialize,
1617
Serialize,
1718
},
18-
solana_client::nonblocking::rpc_client::RpcClient,
1919
solana_sdk::commitment_config::CommitmentConfig,
2020
std::{
2121
sync::Arc,
@@ -27,6 +27,7 @@ use {
2727
time::Interval,
2828
},
2929
tracing::instrument,
30+
url::Url,
3031
};
3132

3233
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -111,7 +112,7 @@ pub struct NetworkState {
111112
/// fetching the blockhash and slot number.
112113
struct NetworkStateQuerier {
113114
/// The RPC client
114-
rpc_client: RpcClient,
115+
rpc_multi_client: RpcMultiClient,
115116

116117
/// The interval with which to query the network state
117118
query_interval: Interval,
@@ -122,20 +123,21 @@ struct NetworkStateQuerier {
122123

123124
impl NetworkStateQuerier {
124125
#[instrument(
125-
skip(rpc_endpoint, rpc_timeout, query_interval),
126+
skip(rpc_urls, rpc_timeout, query_interval),
126127
fields(
127128
rpc_timeout = rpc_timeout.as_millis(),
128129
query_interval = query_interval.period().as_millis(),
129130
)
130131
)]
131132
pub fn new(
132-
rpc_endpoint: &str,
133+
rpc_urls: &[Url],
133134
rpc_timeout: Duration,
134135
query_interval: Interval,
135136
network_state_tx: watch::Sender<NetworkState>,
136137
) -> Self {
138+
let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.to_vec(), rpc_timeout);
137139
NetworkStateQuerier {
138-
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
140+
rpc_multi_client,
139141
query_interval,
140142
network_state_tx,
141143
}
@@ -154,9 +156,9 @@ impl NetworkStateQuerier {
154156
async fn query_network_state(&mut self) -> Result<()> {
155157
// Fetch the blockhash and current slot in parallel
156158
let current_slot_future = self
157-
.rpc_client
159+
.rpc_multi_client
158160
.get_slot_with_commitment(CommitmentConfig::confirmed());
159-
let latest_blockhash_future = self.rpc_client.get_latest_blockhash();
161+
let latest_blockhash_future = self.rpc_multi_client.get_latest_blockhash();
160162

161163
let (current_slot_result, latest_blockhash_result) =
162164
future::join(current_slot_future, latest_blockhash_future).await;
@@ -183,7 +185,7 @@ where
183185
// Create and spawn the network state querier
184186
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
185187
let mut network_state_querier = NetworkStateQuerier::new(
186-
&config.rpc_url,
188+
&config.rpc_urls,
187189
config.rpc_timeout,
188190
tokio::time::interval(config.exporter.refresh_network_state_interval_duration),
189191
network_state_tx,
@@ -224,8 +226,9 @@ mod exporter {
224226
publish_batches,
225227
Exporter,
226228
},
229+
utils::rpc_multi_client::RpcMultiClient,
227230
},
228-
solana_client::nonblocking::rpc_client::RpcClient,
231+
solana_sdk::commitment_config::CommitmentConfig,
229232
std::sync::Arc,
230233
tokio::sync::watch,
231234
};
@@ -243,10 +246,14 @@ mod exporter {
243246
let mut dynamic_compute_unit_price_update_interval =
244247
tokio::time::interval(config.exporter.publish_interval_duration);
245248

246-
let client = Arc::new(RpcClient::new_with_timeout(
247-
config.rpc_url.to_string(),
248-
config.rpc_timeout,
249-
));
249+
let rpc_multi_client: Arc<RpcMultiClient> =
250+
Arc::new(RpcMultiClient::new_with_timeout_and_commitment(
251+
config.rpc_urls.clone(),
252+
config.rpc_timeout,
253+
CommitmentConfig {
254+
commitment: config.oracle.commitment,
255+
},
256+
));
250257
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
251258
tracing::warn!("Key store not available, Exporter won't start.");
252259
return;
@@ -265,7 +272,7 @@ mod exporter {
265272
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
266273
if let Err(err) = publish_batches(
267274
state.clone(),
268-
client.clone(),
275+
rpc_multi_client.clone(),
269276
network,
270277
&network_state_rx,
271278
key_store.accumulator_key,
@@ -293,7 +300,7 @@ mod exporter {
293300
if let Err(err) = Exporter::update_recent_compute_unit_price(
294301
&*state,
295302
&publish_keypair,
296-
&client,
303+
&rpc_multi_client,
297304
config.exporter.staleness_threshold,
298305
config.exporter.unchanged_publish_threshold,
299306
).await {
@@ -312,12 +319,12 @@ mod transaction_monitor {
312319
crate::agent::{
313320
solana::network,
314321
state::transactions::Transactions,
322+
utils::rpc_multi_client::RpcMultiClient,
315323
},
316324
serde::{
317325
Deserialize,
318326
Serialize,
319327
},
320-
solana_client::nonblocking::rpc_client::RpcClient,
321328
std::{
322329
sync::Arc,
323330
time::Duration,
@@ -352,13 +359,16 @@ mod transaction_monitor {
352359
where
353360
S: Transactions,
354361
{
355-
let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout);
362+
let rpc_multi_client =
363+
RpcMultiClient::new_with_timeout(config.rpc_urls.clone(), config.rpc_timeout);
356364
let mut poll_interval =
357365
tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration);
358366

359367
loop {
360368
poll_interval.tick().await;
361-
if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await {
369+
if let Err(err) =
370+
Transactions::poll_transactions_status(&*state, &rpc_multi_client).await
371+
{
362372
tracing::error!(err = ?err, "Transaction monitor failed.");
363373
}
364374
}

src/agent/services/keypairs.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ use {
66
crate::agent::{
77
solana::network::Network,
88
state::keypairs::Keypairs,
9+
utils::rpc_multi_client::RpcMultiClient,
910
},
1011
anyhow::{
11-
Context,
12+
bail,
1213
Result,
1314
},
1415
serde::Deserialize,
15-
solana_client::nonblocking::rpc_client::RpcClient,
1616
solana_sdk::{
1717
commitment_config::CommitmentConfig,
1818
signature::Keypair,
@@ -23,6 +23,7 @@ use {
2323
sync::Arc,
2424
},
2525
tokio::task::JoinHandle,
26+
url::Url,
2627
warp::{
2728
hyper::StatusCode,
2829
reject::Rejection,
@@ -61,8 +62,8 @@ impl Default for Config {
6162
}
6263

6364
pub async fn keypairs<S>(
64-
primary_rpc_url: String,
65-
secondary_rpc_url: Option<String>,
65+
primary_rpc_urls: Vec<Url>,
66+
secondary_rpc_urls: Option<Vec<Url>>,
6667
config: Config,
6768
state: Arc<S>,
6869
) -> Vec<JoinHandle<()>>
@@ -81,7 +82,7 @@ where
8182

8283
let primary_upload_route = {
8384
let state = state.clone();
84-
let rpc_url = primary_rpc_url.clone();
85+
let rpc_urls = primary_rpc_urls.clone();
8586
let min_balance = config.primary_min_keypair_balance_sol;
8687
warp::path!("primary" / "load_keypair")
8788
.and(warp::post())
@@ -90,14 +91,14 @@ where
9091
.and(warp::path::end())
9192
.and_then(move |kp: Vec<u8>| {
9293
let state = state.clone();
93-
let rpc_url = rpc_url.clone();
94+
let rpc_urls = rpc_urls.clone();
9495
async move {
9596
let response = handle_new_keypair(
9697
state,
9798
Network::Primary,
9899
kp,
99100
min_balance,
100-
rpc_url,
101+
rpc_urls,
101102
"primary",
102103
)
103104
.await;
@@ -113,16 +114,16 @@ where
113114
.and(warp::path::end())
114115
.and_then(move |kp: Vec<u8>| {
115116
let state = state.clone();
116-
let rpc_url = secondary_rpc_url.clone();
117+
let rpc_urls = secondary_rpc_urls.clone();
117118
async move {
118-
if let Some(rpc_url) = rpc_url {
119+
if let Some(rpc_urls) = rpc_urls {
119120
let min_balance = config.secondary_min_keypair_balance_sol;
120121
let response = handle_new_keypair(
121122
state,
122123
Network::Secondary,
123124
kp,
124125
min_balance,
125-
rpc_url,
126+
rpc_urls,
126127
"secondary",
127128
)
128129
.await;
@@ -160,15 +161,15 @@ async fn handle_new_keypair<'a, 'b: 'a, S>(
160161
network: Network,
161162
new_keypair_bytes: Vec<u8>,
162163
min_keypair_balance_sol: u64,
163-
rpc_url: String,
164+
rpc_urls: Vec<Url>,
164165
network_name: &'b str,
165166
) -> WithStatus<&'static str>
166167
where
167168
S: Keypairs,
168169
{
169170
let mut upload_ok = true;
170171
match Keypair::from_bytes(&new_keypair_bytes) {
171-
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await {
172+
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await {
172173
Ok(()) => {
173174
Keypairs::update_keypair(&*state, network, kp).await;
174175
}
@@ -205,14 +206,14 @@ where
205206
pub async fn validate_keypair(
206207
kp: &Keypair,
207208
min_keypair_balance_sol: u64,
208-
rpc_url: String,
209+
rpc_urls: Vec<Url>,
209210
) -> Result<()> {
210-
let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());
211-
212-
let balance_lamports = c
213-
.get_balance(&kp.pubkey())
214-
.await
215-
.context("Could not check keypair's balance")?;
211+
let rpc_multi_client =
212+
RpcMultiClient::new_with_commitment(rpc_urls, CommitmentConfig::confirmed());
213+
let balance_lamports = match rpc_multi_client.get_balance(kp).await {
214+
Ok(balance_lamports) => balance_lamports,
215+
Err(_) => bail!("Could not check keypair's balance"),
216+
};
216217

217218
let lamports_in_sol = 1_000_000_000;
218219

0 commit comments

Comments
 (0)