Skip to content

fix: hermes id handling #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use {
Result,
anyhow,
},
ed25519_dalek::SecretKey,
futures_util::{
SinkExt,
stream::{
Expand Down Expand Up @@ -54,14 +53,6 @@ pub struct Config {
pub publish_interval_duration: Duration,
}

#[derive(Clone, Deserialize)]
struct PublisherSecretKey(SecretKey);
impl std::fmt::Debug for PublisherSecretKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PublisherSecretKey(redacted)")
}
}

fn default_publish_interval() -> Duration {
Duration::from_millis(200)
}
Expand Down Expand Up @@ -129,16 +120,27 @@ async fn connect_to_relayers(
#[derive(Deserialize)]
struct SymbolResponse {
pub pyth_lazer_id: u32,
#[serde(rename = "name")]
pub _name: String,
#[serde(rename = "symbol")]
pub _symbol: String,
#[serde(rename = "description")]
pub _description: String,
#[serde(rename = "asset_type")]
pub _asset_type: String,
#[serde(rename = "exponent")]
pub _exponent: i16,
#[serde(rename = "cmc_id")]
pub _cmc_id: Option<u32>,
#[serde(rename = "interval")]
pub _interval: Option<String>,
#[serde(rename = "min_publishers")]
pub _min_publishers: u16,
#[serde(rename = "min_channel")]
pub _min_channel: String,
#[serde(rename = "state")]
pub _state: String,
#[serde(rename = "schedule")]
pub _schedule: String,
pub hermes_id: Option<String>,
}
Expand Down Expand Up @@ -245,11 +247,20 @@ mod lazer_exporter {
S: Send + Sync + 'static,
{
// TODO: Re-fetch on an interval?
let lazer_symbols: HashMap<String, SymbolResponse> =
let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> =
match fetch_symbols(&config.history_url).await {
Ok(symbols) => symbols
.into_iter()
.filter_map(|symbol| symbol.hermes_id.clone().map(|id| (id, symbol)))
.filter_map(|symbol| {
let hermes_id = symbol.hermes_id.clone()?;
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
Ok(id) => Some((id, symbol)),
Err(e) => {
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
None
}
}
})
.collect(),
Err(e) => {
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
Expand Down Expand Up @@ -296,7 +307,7 @@ mod lazer_exporter {

// TODO: This read locks and clones local::Store::prices, which may not meet performance needs.
for (identifier, price_info) in state.get_all_price_infos().await {
if let Some(symbol) = lazer_symbols.get(&identifier.to_string()) {
if let Some(symbol) = lazer_symbols.get(&identifier) {
let source_timestamp_micros = price_info.timestamp.and_utc().timestamp_micros();
let source_timestamp = MessageField::some(Timestamp {
seconds: source_timestamp_micros / 1_000_000,
Expand Down Expand Up @@ -353,7 +364,6 @@ mod lazer_exporter {
tracing::error!("Error receiving message from at relayer {relayer_url}: {e:?}");
}
None => {
// TODO: Probably still appropriate to return here, but retry in caller.
tracing::error!("relayer connection closed");
bail!("relayer connection closed");
}
Expand Down
Loading