-
Notifications
You must be signed in to change notification settings - Fork 20
Lazer exporter fixes #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lazer exporter fixes #166
Changes from 1 commit
44e7374
bddcc16
a51fbd9
28f71a5
75a3409
657e639
f61ab5e
f01b88b
fdbbc70
9d18b62
d21dd1f
7e7676d
e785797
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| use { | ||
| crate::agent::state, | ||
| anyhow::{ | ||
| Context, | ||
| Result, | ||
| anyhow, | ||
| bail, | ||
|
|
@@ -9,6 +10,11 @@ use { | |
| ExponentialBackoffBuilder, | ||
| backoff::Backoff, | ||
| }, | ||
| base64::{ | ||
| Engine, | ||
| prelude::BASE64_STANDARD, | ||
| }, | ||
| ed25519_dalek::SigningKey, | ||
| futures_util::{ | ||
| SinkExt, | ||
| stream::{ | ||
|
|
@@ -25,6 +31,7 @@ use { | |
| Deserialize, | ||
| Serialize, | ||
| }, | ||
| solana_sdk::signature::keypair, | ||
| std::{ | ||
| path::PathBuf, | ||
| sync::Arc, | ||
|
|
@@ -59,18 +66,23 @@ pub const RELAYER_CHANNEL_CAPACITY: usize = 1000; | |
|
|
||
| #[derive(Clone, Debug, Deserialize)] | ||
| pub struct Config { | ||
| pub history_url: Url, | ||
| pub relayer_urls: Vec<Url>, | ||
| pub authorization_token: String, | ||
| pub publish_keypair_path: PathBuf, | ||
| pub history_url: Url, | ||
| pub relayer_urls: Vec<Url>, | ||
| pub publish_keypair_path: PathBuf, | ||
| #[serde(with = "humantime_serde", default = "default_publish_interval")] | ||
| pub publish_interval_duration: Duration, | ||
| pub publish_interval_duration: Duration, | ||
| #[serde(with = "humantime_serde", default = "default_symbol_fetch_interval")] | ||
| pub symbol_fetch_interval_duration: Duration, | ||
| } | ||
|
|
||
| fn default_publish_interval() -> Duration { | ||
| Duration::from_millis(200) | ||
| } | ||
|
|
||
| fn default_symbol_fetch_interval() -> Duration { | ||
| Duration::from_secs(60 * 60) | ||
| } | ||
|
|
||
| struct RelayerWsSession { | ||
| ws_sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, TungsteniteMessage>, | ||
| } | ||
|
|
@@ -248,17 +260,44 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> { | |
| Ok(data) | ||
| } | ||
|
|
||
| fn get_signing_key(config: &Config) -> Result<SigningKey> { | ||
| // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher | ||
| let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { | ||
| Ok(k) => k, | ||
| Err(e) => { | ||
| tracing::error!( | ||
| error = ?e, | ||
| publish_keypair_path = config.publish_keypair_path.display().to_string(), | ||
| "Reading publish keypair returned an error. ", | ||
| ); | ||
| bail!("Reading publish keypair returned an error. "); | ||
| } | ||
| }; | ||
|
|
||
| SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) | ||
| .context("Failed to create signing key from keypair") | ||
| } | ||
|
|
||
| #[instrument(skip(config, state))] | ||
| pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> { | ||
| let mut handles = vec![]; | ||
|
|
||
| let signing_key = match get_signing_key(&config) { | ||
| Ok(signing_key) => signing_key, | ||
| Err(e) => { | ||
| // This is fatal as we can't publish without the key. | ||
| tracing::error!("failed to get Lazer signing key: {e:?}"); | ||
| panic!("failed to get Lazer signing key") | ||
| } | ||
| }; | ||
|
|
||
| // can safely drop first receiver for ease of iteration | ||
| let (relayer_sender, _) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); | ||
|
|
||
| for url in config.relayer_urls.iter() { | ||
| let mut task = RelayerSessionTask { | ||
| url: url.clone(), | ||
| token: config.authorization_token.to_owned(), | ||
| token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()), | ||
|
||
| receiver: relayer_sender.subscribe(), | ||
| }; | ||
| handles.push(tokio::spawn(async move { task.run().await })); | ||
|
|
@@ -268,6 +307,7 @@ pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandl | |
| config.clone(), | ||
| state, | ||
| relayer_sender, | ||
| signing_key, | ||
| ))); | ||
|
|
||
| handles | ||
|
|
@@ -284,11 +324,6 @@ mod lazer_exporter { | |
| }, | ||
| state::local::LocalStore, | ||
| }, | ||
| anyhow::{ | ||
| Context, | ||
| Result, | ||
| bail, | ||
| }, | ||
| ed25519_dalek::{ | ||
| Signer, | ||
| SigningKey, | ||
|
|
@@ -314,74 +349,33 @@ mod lazer_exporter { | |
| signature_data::Data::Ed25519, | ||
| }, | ||
| }, | ||
| solana_sdk::signer::keypair, | ||
| std::{ | ||
| collections::HashMap, | ||
| sync::Arc, | ||
| }, | ||
| tokio::sync::broadcast::Sender, | ||
| url::Url, | ||
| }; | ||
|
|
||
| fn get_signing_key(config: &Config) -> Result<SigningKey> { | ||
| // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher | ||
| let publish_keypair = match keypair::read_keypair_file(&config.publish_keypair_path) { | ||
| Ok(k) => k, | ||
| Err(e) => { | ||
| tracing::error!( | ||
| error = ?e, | ||
| publish_keypair_path = config.publish_keypair_path.display().to_string(), | ||
| "Reading publish keypair returned an error. ", | ||
| ); | ||
| bail!("Reading publish keypair returned an error. "); | ||
| } | ||
| }; | ||
|
|
||
| SigningKey::from_keypair_bytes(&publish_keypair.to_bytes()) | ||
| .context("Failed to create signing key from keypair") | ||
| } | ||
|
|
||
| pub async fn lazer_exporter<S>( | ||
| config: Config, | ||
| state: Arc<S>, | ||
| relayer_sender: Sender<SignedLazerTransaction>, | ||
| signing_key: SigningKey, | ||
| ) where | ||
| S: LocalStore, | ||
| S: Send + Sync + 'static, | ||
| { | ||
| let signing_key = match get_signing_key(&config) { | ||
| Ok(signing_key) => signing_key, | ||
| Err(e) => { | ||
| tracing::error!("lazer_exporter signing key failure: {e:?}"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| // TODO: Re-fetch on an interval? | ||
| let lazer_symbols: HashMap<pyth_sdk::Identifier, SymbolResponse> = | ||
| match fetch_symbols(&config.history_url).await { | ||
| Ok(symbols) => symbols | ||
| .into_iter() | ||
| .filter_map(|symbol| { | ||
| let hermes_id = symbol.hermes_id.clone()?; | ||
| match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { | ||
| Ok(id) => Some((id, symbol)), | ||
| Err(e) => { | ||
| tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); | ||
| None | ||
| } | ||
| } | ||
| }) | ||
| .collect(), | ||
| Err(e) => { | ||
| tracing::error!("Failed to fetch Lazer symbols: {e:?}"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await; | ||
| let mut publish_interval = tokio::time::interval(config.publish_interval_duration); | ||
| let mut symbol_fetch_interval = | ||
| tokio::time::interval(config.symbol_fetch_interval_duration); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| _ = symbol_fetch_interval.tick() => { | ||
| lazer_symbols = get_lazer_symbol_map(&config.history_url).await; | ||
| }, | ||
| _ = publish_interval.tick() => { | ||
| let publisher_timestamp = MessageField::some(Timestamp::now()); | ||
| let mut publisher_update = PublisherUpdate { | ||
|
|
@@ -452,6 +446,30 @@ mod lazer_exporter { | |
| } | ||
| } | ||
| } | ||
|
|
||
| async fn get_lazer_symbol_map( | ||
| history_url: &Url, | ||
| ) -> HashMap<pyth_sdk::Identifier, SymbolResponse> { | ||
| match fetch_symbols(history_url).await { | ||
| Ok(symbols) => symbols | ||
| .into_iter() | ||
| .filter_map(|symbol| { | ||
| let hermes_id = symbol.hermes_id.clone()?; | ||
| match pyth_sdk::Identifier::from_hex(hermes_id.clone()) { | ||
| Ok(id) => Some((id, symbol)), | ||
| Err(e) => { | ||
| tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id); | ||
| None | ||
| } | ||
| } | ||
| }) | ||
| .collect(), | ||
| Err(e) => { | ||
| tracing::error!("Failed to fetch Lazer symbols: {e:?}"); | ||
| HashMap::new() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
@@ -604,15 +622,21 @@ mod tests { | |
| let state = Arc::new(local::Store::new(&mut Registry::default())); | ||
| let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY); | ||
| let private_key_file = get_private_key_file(); | ||
| let private_key = get_private_key(); | ||
|
|
||
| let config = Config { | ||
| history_url: Url::parse("http://127.0.0.1:12345").unwrap(), | ||
| relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], | ||
| authorization_token: "token1".to_string(), | ||
| publish_keypair_path: PathBuf::from(private_key_file.path()), | ||
| publish_interval_duration: Duration::from_secs(1), | ||
| history_url: Url::parse("http://127.0.0.1:12345").unwrap(), | ||
| relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()], | ||
| publish_keypair_path: PathBuf::from(private_key_file.path()), | ||
| publish_interval_duration: Duration::from_secs(1), | ||
| symbol_fetch_interval_duration: Duration::from_secs(60 * 60), | ||
| }; | ||
| tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender)); | ||
| tokio::spawn(lazer_exporter( | ||
| config, | ||
| state.clone(), | ||
| relayer_sender, | ||
| private_key, | ||
| )); | ||
|
|
||
| tokio::time::sleep(std::time::Duration::from_millis(2000)).await; | ||
| match relayer_receiver.try_recv() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using 1 minute. imagine we have a same-day listing. we want publishers to publish to it as fast as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated!