Skip to content
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "3.0.2"
version = "3.0.3"
edition = "2024"

[[bin]]
Expand All @@ -10,6 +10,7 @@ path = "src/bin/agent.rs"
[dependencies]
anyhow = "1.0.81"
backoff = "0.4.0"
base64 = "0.22.1"
ed25519-dalek = "2.1.1"
serde = { version = "1.0.197", features = ["derive", "rc"] }
async-trait = "0.1.79"
Expand Down
169 changes: 100 additions & 69 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::agent::state,
anyhow::{
Context,
Result,
anyhow,
bail,
Expand All @@ -9,6 +10,11 @@ use {
ExponentialBackoffBuilder,
backoff::Backoff,
},
base64::{
Engine,
prelude::BASE64_STANDARD,
},
ed25519_dalek::SigningKey,
futures_util::{
SinkExt,
stream::{
Expand All @@ -25,6 +31,7 @@ use {
Deserialize,
Serialize,
},
solana_sdk::signature::keypair,
std::{
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;

#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub authorization_token: String,
pub publish_keypair_path: PathBuf,
pub history_url: Url,
pub relayer_urls: Vec<Url>,
pub publish_keypair_path: PathBuf,
#[serde(with = "humantime_serde", default = "default_publish_interval")]
pub publish_interval_duration: Duration,
pub publish_interval_duration: Duration,
#[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")]
pub symbol_fetch_interval_duration: Duration,
}

fn default_publish_interval() -> Duration {
Duration::from_millis(200)
}

fn default_symbol_fetch_interval() -> Duration {
Duration::from_secs(60 * 60)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using 1 minute. imagine we have a same-day listing. we want publishers to publish to it as fast as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

}

struct RelayerWsSession {
ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>,
}
Expand Down Expand Up @@ -145,8 +157,8 @@ impl RelayerSessionTask {

failure_count += 1;
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
tracing::error!(
"relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
tracing::warn!(
"relayer session ended with error: {:?}, failure_count: {}; retrying in {:?}",
e,
failure_count,
next_backoff
Expand Down Expand Up @@ -199,7 +211,7 @@ impl RelayerSessionTask {
tracing::error!("Error receiving message from at relayer: {e:?}");
}
None => {
tracing::error!("relayer connection closed");
tracing::warn!("relayer connection closed");
bail!("relayer connection closed");
}
}
Expand Down Expand Up @@ -240,25 +252,53 @@ struct SymbolResponse {

async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
let mut url = history_url.clone();
url.set_scheme("http").map_err(|_| anyhow!("invalid url"))?;
url.set_path("/history/v1/symbols");
let client = Client::new();
let response = client.get(url).send().await?.error_for_status()?;
let data = response.json().await?;
Ok(data)
}

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

#[instrument(skip(config, state))]
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
let mut handles = vec![];

let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
// This is fatal as we can't publish without the key.
tracing::error!("failed to get Lazer signing key: {e:?}");
panic!("failed to get Lazer signing key")
}
};
let pubkey_base64 = BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes());
tracing::info!("Loaded Lazer signing key; pubkey in base64: {pubkey_base64}");

// can safely drop first receiver for ease of iteration
let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

for url in config.relayer_urls.iter() {
let mut task = RelayerSessionTask {
url: url.clone(),
token: config.authorization_token.to_owned(),
token: pubkey_base64.clone(),
receiver: relayer_sender.subscribe(),
};
handles.push(tokio::spawn(async move { task.run().await }));
Expand All @@ -268,6 +308,7 @@ pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandl
config.clone(),
state,
relayer_sender,
signing_key,
)));

handles
Expand All @@ -284,11 +325,6 @@ mod lazer_exporter {
},
state::local::LocalStore,
},
anyhow::{
Context,
Result,
bail,
},
ed25519_dalek::{
Signer,
SigningKey,
Expand All @@ -314,74 +350,39 @@ mod lazer_exporter {
signature_data::Data::Ed25519,
},
},
solana_sdk::signer::keypair,
std::{
collections::HashMap,
sync::Arc,
},
tokio::sync::broadcast::Sender,
url::Url,
};

fn get_signing_key(config: &Config) -> Result<SigningKey> {
// Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) {
Ok(k) => k,
Err(e) => {
tracing::error!(
error = ?e,
publish_keypair_path = config.publish_keypair_path.display().to_string(),
"Reading publish keypair returned an error. ",
);
bail!("Reading publish keypair returned an error. ");
}
};

SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
.context("Failed to create signing key from keypair")
}

pub async fn lazer_exporter<S>(
config: Config,
state: Arc<S>,
relayer_sender: Sender<SignedLazerTransaction>,
signing_key: SigningKey,
) where
S: LocalStore,
S: Send + Sync + 'static,
{
let signing_key = match get_signing_key(&config) {
Ok(signing_key) => signing_key,
Err(e) => {
tracing::error!("lazer_exporter signing key failure: {e:?}");
return;
}
};

// TODO: Re-fetch on an interval?
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
match fetch_symbols(&config.history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
return;
}
};
let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
tracing::info!(
"Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}",
lazer_symbols.len(),
&config.history_url
);

let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
let mut symbol_fetch_interval =
tokio::time::interval(config.symbol_fetch_interval_duration);

loop {
tokio::select! {
_ = symbol_fetch_interval.tick() => {
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
},
_ = publish_interval.tick() => {
let publisher_timestamp = MessageField::some(Timestamp::now());
let mut publisher_update = PublisherUpdate {
Expand Down Expand Up @@ -452,6 +453,30 @@ mod lazer_exporter {
}
}
}

async fn get_lazer_symbol_map(
history_url: &Url,
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
match fetch_symbols(history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
HashMap::new()
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -604,15 +629,21 @@ mod tests {
let state = Arc::new(local::Store::new(&mut Registry::default()));
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
let private_key_file = get_private_key_file();
let private_key = get_private_key();

let config = Config {
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
authorization_token: "token1".to_string(),
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
symbol_fetch_interval_duration: Duration::from_secs(60 * 60),
};
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));
tokio::spawn(lazer_exporter(
config,
state.clone(),
relayer_sender,
private_key,
));

tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
match relayer_receiver.try_recv() {
Expand Down
Loading