Skip to content

feat: RPC redundancy #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ solana-account-decoder = "1.18.8"
solana-client = "1.18.8"
solana-pubkey = "2.3.0"
solana-sdk = "1.18.8"
solana-transaction-status = "1.18.26"
bincode = { version = "2.0.1", features = ["serde"] }
rand = "0.8.5"
config = "0.14.0"
Expand Down
5 changes: 3 additions & 2 deletions config/config.sample.pythnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually
# HTTP(S) endpoints of the RPC node. Public Pythnet RPC endpoints are usually
# rate-limited, so a private endpoint should be used in most cases.
rpc_url = "https://api2.pythnet.pyth.network"
# API calls will cycle through each on failure.
rpc_urls = ["https://api2.pythnet.pyth.network"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks backward compatibility, let's bump the version as a major one. (we can ship it with lazer in one go)


# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
Expand Down
5 changes: 3 additions & 2 deletions config/config.sample.pythtest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node.
rpc_url = "https://api.pythtest.pyth.network"
# HTTP(S) endpoints of the RPC node.
# API calls will cycle through each on failure.
rpc_urls = ["https://api.pythtest.pyth.network"]

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes
# on the network. This can be omitted when oracle.subscriber_enabled is set to
Expand Down
5 changes: 3 additions & 2 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ listen_address = "127.0.0.1:8910"
[primary_network]
### Required fields ###

# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually
# HTTP(S) endpoints of the RPC node. Public RPC endpoints are usually
# rate-limited for Pythnet, and so a private endpoint should be used in most
# cases. For Pythtest, the public endpoint can be used.
rpc_url = "https://api.pythtest.pyth.network"
# API calls will cycle through each on failure.
rpc_urls = ["https://api.pythtest.pyth.network"]

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
Expand Down
5 changes: 3 additions & 2 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub mod pyth;
pub mod services;
pub mod solana;
pub mod state;
pub mod utils;

lazy_static! {
/// A static exit flag to indicate to running threads that we're shutting down. This is used to
Expand Down Expand Up @@ -183,11 +184,11 @@ impl Agent {
// Spawn the remote keypair loader endpoint for both networks
handles.extend(
services::keypairs(
self.config.primary_network.rpc_url.clone(),
self.config.primary_network.rpc_urls.clone(),
self.config
.secondary_network
.as_ref()
.map(|c| c.rpc_url.clone()),
.map(|c| c.rpc_urls.clone()),
self.config.remote_keypair_loader.clone(),
state,
)
Expand Down
46 changes: 28 additions & 18 deletions src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use {
exporter::Exporter,
transactions::Transactions,
},
utils::rpc_multi_client::RpcMultiClient,
},
anyhow::Result,
futures_util::future,
serde::{
Deserialize,
Serialize,
},
solana_client::nonblocking::rpc_client::RpcClient,
solana_sdk::commitment_config::CommitmentConfig,
std::{
sync::Arc,
Expand All @@ -27,6 +27,7 @@ use {
time::Interval,
},
tracing::instrument,
url::Url,
};

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -111,7 +112,7 @@ pub struct NetworkState {
/// fetching the blockhash and slot number.
struct NetworkStateQuerier {
/// The RPC client
rpc_client: RpcClient,
rpc_multi_client: RpcMultiClient,

/// The interval with which to query the network state
query_interval: Interval,
Expand All @@ -122,20 +123,21 @@ struct NetworkStateQuerier {

impl NetworkStateQuerier {
#[instrument(
skip(rpc_endpoint, rpc_timeout, query_interval),
skip(rpc_urls, rpc_timeout, query_interval),
fields(
rpc_timeout = rpc_timeout.as_millis(),
query_interval = query_interval.period().as_millis(),
)
)]
pub fn new(
rpc_endpoint: &str,
rpc_urls: &[Url],
rpc_timeout: Duration,
query_interval: Interval,
network_state_tx: watch::Sender<NetworkState>,
) -> Self {
let rpc_multi_client = RpcMultiClient::new_with_timeout(rpc_urls.to_vec(), rpc_timeout);
NetworkStateQuerier {
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
rpc_multi_client,
query_interval,
network_state_tx,
}
Expand All @@ -154,9 +156,9 @@ impl NetworkStateQuerier {
async fn query_network_state(&mut self) -> Result<()> {
// Fetch the blockhash and current slot in parallel
let current_slot_future = self
.rpc_client
.rpc_multi_client
.get_slot_with_commitment(CommitmentConfig::confirmed());
let latest_blockhash_future = self.rpc_client.get_latest_blockhash();
let latest_blockhash_future = self.rpc_multi_client.get_latest_blockhash();

let (current_slot_result, latest_blockhash_result) =
future::join(current_slot_future, latest_blockhash_future).await;
Expand All @@ -183,7 +185,7 @@ where
// Create and spawn the network state querier
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
let mut network_state_querier = NetworkStateQuerier::new(
&config.rpc_url,
&config.rpc_urls,
config.rpc_timeout,
tokio::time::interval(config.exporter.refresh_network_state_interval_duration),
network_state_tx,
Expand Down Expand Up @@ -224,8 +226,9 @@ mod exporter {
publish_batches,
Exporter,
},
utils::rpc_multi_client::RpcMultiClient,
},
solana_client::nonblocking::rpc_client::RpcClient,
solana_sdk::commitment_config::CommitmentConfig,
std::sync::Arc,
tokio::sync::watch,
};
Expand All @@ -243,10 +246,14 @@ mod exporter {
let mut dynamic_compute_unit_price_update_interval =
tokio::time::interval(config.exporter.publish_interval_duration);

let client = Arc::new(RpcClient::new_with_timeout(
config.rpc_url.to_string(),
config.rpc_timeout,
));
let rpc_multi_client: Arc<RpcMultiClient> =
Arc::new(RpcMultiClient::new_with_timeout_and_commitment(
config.rpc_urls.clone(),
config.rpc_timeout,
CommitmentConfig {
commitment: config.oracle.commitment,
},
));
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
tracing::warn!("Key store not available, Exporter won't start.");
return;
Expand All @@ -265,7 +272,7 @@ mod exporter {
let publisher_buffer_key = Exporter::get_publisher_buffer_key(&*state).await;
if let Err(err) = publish_batches(
state.clone(),
client.clone(),
rpc_multi_client.clone(),
network,
&network_state_rx,
key_store.accumulator_key,
Expand Down Expand Up @@ -293,7 +300,7 @@ mod exporter {
if let Err(err) = Exporter::update_recent_compute_unit_price(
&*state,
&publish_keypair,
&client,
&rpc_multi_client,
config.exporter.staleness_threshold,
config.exporter.unchanged_publish_threshold,
).await {
Expand All @@ -312,12 +319,12 @@ mod transaction_monitor {
crate::agent::{
solana::network,
state::transactions::Transactions,
utils::rpc_multi_client::RpcMultiClient,
},
serde::{
Deserialize,
Serialize,
},
solana_client::nonblocking::rpc_client::RpcClient,
std::{
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -352,13 +359,16 @@ mod transaction_monitor {
where
S: Transactions,
{
let client = RpcClient::new_with_timeout(config.rpc_url.to_string(), config.rpc_timeout);
let rpc_multi_client =
RpcMultiClient::new_with_timeout(config.rpc_urls.clone(), config.rpc_timeout);
let mut poll_interval =
tokio::time::interval(config.exporter.transaction_monitor.poll_interval_duration);

loop {
poll_interval.tick().await;
if let Err(err) = Transactions::poll_transactions_status(&*state, &client).await {
if let Err(err) =
Transactions::poll_transactions_status(&*state, &rpc_multi_client).await
{
tracing::error!(err = ?err, "Transaction monitor failed.");
}
}
Expand Down
39 changes: 20 additions & 19 deletions src/agent/services/keypairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use {
crate::agent::{
solana::network::Network,
state::keypairs::Keypairs,
utils::rpc_multi_client::RpcMultiClient,
},
anyhow::{
Context,
bail,
Result,
},
serde::Deserialize,
solana_client::nonblocking::rpc_client::RpcClient,
solana_sdk::{
commitment_config::CommitmentConfig,
signature::Keypair,
Expand All @@ -23,6 +23,7 @@ use {
sync::Arc,
},
tokio::task::JoinHandle,
url::Url,
warp::{
hyper::StatusCode,
reject::Rejection,
Expand Down Expand Up @@ -61,8 +62,8 @@ impl Default for Config {
}

pub async fn keypairs<S>(
primary_rpc_url: String,
secondary_rpc_url: Option<String>,
primary_rpc_urls: Vec<Url>,
secondary_rpc_urls: Option<Vec<Url>>,
config: Config,
state: Arc<S>,
) -> Vec<JoinHandle<()>>
Expand All @@ -81,7 +82,7 @@ where

let primary_upload_route = {
let state = state.clone();
let rpc_url = primary_rpc_url.clone();
let rpc_urls = primary_rpc_urls.clone();
let min_balance = config.primary_min_keypair_balance_sol;
warp::path!("primary" / "load_keypair")
.and(warp::post())
Expand All @@ -90,14 +91,14 @@ where
.and(warp::path::end())
.and_then(move |kp: Vec<u8>| {
let state = state.clone();
let rpc_url = rpc_url.clone();
let rpc_urls = rpc_urls.clone();
async move {
let response = handle_new_keypair(
state,
Network::Primary,
kp,
min_balance,
rpc_url,
rpc_urls,
"primary",
)
.await;
Expand All @@ -113,16 +114,16 @@ where
.and(warp::path::end())
.and_then(move |kp: Vec<u8>| {
let state = state.clone();
let rpc_url = secondary_rpc_url.clone();
let rpc_urls = secondary_rpc_urls.clone();
async move {
if let Some(rpc_url) = rpc_url {
if let Some(rpc_urls) = rpc_urls {
let min_balance = config.secondary_min_keypair_balance_sol;
let response = handle_new_keypair(
state,
Network::Secondary,
kp,
min_balance,
rpc_url,
rpc_urls,
"secondary",
)
.await;
Expand Down Expand Up @@ -160,15 +161,15 @@ async fn handle_new_keypair<'a, 'b: 'a, S>(
network: Network,
new_keypair_bytes: Vec<u8>,
min_keypair_balance_sol: u64,
rpc_url: String,
rpc_urls: Vec<Url>,
network_name: &'b str,
) -> WithStatus<&'static str>
where
S: Keypairs,
{
let mut upload_ok = true;
match Keypair::from_bytes(&new_keypair_bytes) {
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_url.clone()).await {
Ok(kp) => match validate_keypair(&kp, min_keypair_balance_sol, rpc_urls.clone()).await {
Ok(()) => {
Keypairs::update_keypair(&*state, network, kp).await;
}
Expand Down Expand Up @@ -205,14 +206,14 @@ where
pub async fn validate_keypair(
kp: &Keypair,
min_keypair_balance_sol: u64,
rpc_url: String,
rpc_urls: Vec<Url>,
) -> Result<()> {
let c = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed());

let balance_lamports = c
.get_balance(&kp.pubkey())
.await
.context("Could not check keypair's balance")?;
let rpc_multi_client =
RpcMultiClient::new_with_commitment(rpc_urls, CommitmentConfig::confirmed());
let balance_lamports = match rpc_multi_client.get_balance(kp).await {
Ok(balance_lamports) => balance_lamports,
Err(_) => bail!("Could not check keypair's balance"),
};

let lamports_in_sol = 1_000_000_000;

Expand Down
Loading
Loading