Skip to content

Some lazer exporter unit tests #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ http = "1.3.1"
url = { version = "2.5.4", features = ["serde"] }
pyth-lazer-publisher-sdk = "0.1.5"

[dev-dependencies]
tempfile = "3.20.0"

[profile.release]
panic = 'abort'

Expand Down
313 changes: 311 additions & 2 deletions src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use {
protobuf::Message as ProtobufMessage,
pyth_lazer_publisher_sdk::transaction::SignedLazerTransaction,
reqwest::Client,
serde::Deserialize,
serde::{
Deserialize,
Serialize,
},
std::{
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -207,7 +210,7 @@ impl RelayerSessionTask {
}

// TODO: This is copied from history-service; move to Lazer protocol sdk.
#[derive(Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
struct SymbolResponse {
pub pyth_lazer_id: u32,
#[serde(rename = "name")]
Expand Down Expand Up @@ -450,3 +453,309 @@ mod lazer_exporter {
}
}
}

#[cfg(test)]
mod tests {
use {
crate::agent::{
services::lazer_exporter::{
Config,
RELAYER_CHANNEL_CAPACITY,
RelayerSessionTask,
SymbolResponse,
lazer_exporter::lazer_exporter,
},
state::{
local,
local::{
LocalStore,
PriceInfo,
},
},
},
ed25519_dalek::{
Signer,
SigningKey,
},
futures_util::StreamExt,
prometheus_client::registry::Registry,
protobuf::{
Message,
MessageField,
well_known_types::timestamp::Timestamp,
},
pyth_lazer_publisher_sdk::{
publisher_update::{
FeedUpdate,
PriceUpdate,
PublisherUpdate,
feed_update::{
self,
Update,
},
},
transaction::{
Ed25519SignatureData,
LazerTransaction,
SignatureData,
SignedLazerTransaction,
lazer_transaction::{
self,
Payload,
},
signature_data::Data::Ed25519,
},
},
pyth_sdk_solana::state::PriceStatus,
std::{
io::Write,
net::SocketAddr,
path::PathBuf,
sync::{
Arc,
Once,
},
time::Duration,
},
tempfile::NamedTempFile,
tokio::{
net::TcpListener,
sync::{
broadcast::{
self,
error::TryRecvError,
},
mpsc,
},
},
url::Url,
warp::Filter,
};

static INIT: Once = Once::new();

fn init_tracing() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer() // Send output to test output
.init();
});
}

pub async fn run_mock_history_server(addr: SocketAddr) {
let route = warp::path!("history" / "v1" / "symbols")
.and(warp::get())
.map(|| {
let response = vec![SymbolResponse {
pyth_lazer_id: 1,
_name: "BTCUSD".to_string(),
_symbol: "Crypto.BTC/USD".to_string(),
_description: "BITCOIN / US DOLLAR".to_string(),
_asset_type: "crypto".to_string(),
_exponent: -8,
_cmc_id: Some(1),
_interval: None,
_min_publishers: 1,
_min_channel: "real_time".to_string(),
_state: "stable".to_string(),
_schedule: "America/New_York;O,O,O,O,O,O,O;".to_string(),
hermes_id: Some(
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43"
.to_string(),
),
}];
warp::reply::json(&response)
});
warp::serve(route).run(addr).await;
}

fn get_private_key() -> SigningKey {
SigningKey::from_keypair_bytes(&[
105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
39, 179, 211, 177, 70, 252, 31,
])
Comment on lines +574 to +579
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to hard code these once as a const

.unwrap()
}

fn get_private_key_file() -> NamedTempFile {
let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]";
let mut temp_file = NamedTempFile::new().unwrap();
temp_file
.as_file_mut()
.write(private_key_string.as_bytes())
.unwrap();
temp_file.flush().unwrap();
temp_file
}

#[tokio::test]
async fn test_lazer_exporter() {
init_tracing();

let history_addr = "127.0.0.1:12345".parse().unwrap();
tokio::spawn(async move {
run_mock_history_server(history_addr).await;
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let state = Arc::new(local::Store::new(&mut Registry::default()));
let (relayer_sender, mut relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
let private_key_file = get_private_key_file();

let config = Config {
history_url: Url::parse("http://127.0.0.1:12345").unwrap(),
relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
authorization_token: "token1".to_string(),
publish_keypair_path: PathBuf::from(private_key_file.path()),
publish_interval_duration: Duration::from_secs(1),
};
tokio::spawn(lazer_exporter(config, state.clone(), relayer_sender));

tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
match relayer_receiver.try_recv() {
Err(TryRecvError::Empty) => (),
_ => panic!("channel should be empty"),
}

let btc_id = pyth_sdk::Identifier::from_hex(
"e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
)
.unwrap();
let price = PriceInfo {
status: PriceStatus::Trading,
price: 100_000_00000000i64,
conf: 1_00000000u64,
timestamp: Default::default(),
};
state.update(btc_id, price).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(2000)).await;
match relayer_receiver.try_recv() {
Ok(transaction) => {
let lazer_transaction =
LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice())
.unwrap();
let publisher_update =
if let lazer_transaction::Payload::PublisherUpdate(publisher_update) =
lazer_transaction.payload.unwrap()
{
publisher_update
} else {
panic!("expected publisher_update")
};
assert_eq!(publisher_update.updates.len(), 1);
let feed_update = &publisher_update.updates[0];
assert_eq!(feed_update.feed_id, Some(1u32));
let price_update = if let feed_update::Update::PriceUpdate(price_update) =
feed_update.clone().update.unwrap()
{
price_update
} else {
panic!("expected price_update")
};
assert_eq!(price_update.price, Some(100_000_00000000i64));
}
_ => panic!("channel should have a transaction waiting"),
}
}

pub async fn run_mock_relayer(
addr: SocketAddr,
back_sender: mpsc::Sender<SignedLazerTransaction>,
) {
let listener = TcpListener::bind(addr).await.unwrap();

tokio::spawn(async move {
let Ok((stream, _)) = listener.accept().await else {
panic!("failed to accept mock relayer websocket connection");
};
let ws_stream = tokio_tungstenite::accept_async(stream)
.await
.expect("handshake failed");
let (_, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
if let Ok(msg) = msg {
if msg.is_binary() {
tracing::info!("Received binary message: {msg:?}");
let transaction =
SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref())
.unwrap();
back_sender.clone().send(transaction).await.unwrap();
}
} else {
tracing::error!("Received a malformed message: {msg:?}");
}
}
});
}

#[tokio::test]
async fn test_relayer_session() {
init_tracing();

let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
let relayer_addr = "127.0.0.1:12346".parse().unwrap();
run_mock_relayer(relayer_addr, back_sender).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);

let mut relayer_session_task = RelayerSessionTask {
// connection state
url: Url::parse("ws://127.0.0.1:12346").unwrap(),
token: "token1".to_string(),
receiver: relayer_receiver,
};
tokio::spawn(async move { relayer_session_task.run().await });
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;

let transaction = get_signed_lazer_transaction();
relayer_sender
.send(transaction.clone())
.expect("relayer_sender.send failed");
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
let received_transaction = back_receiver
.recv()
.await
.expect("back_receiver.recv failed");
assert_eq!(transaction, received_transaction);
}

fn get_signed_lazer_transaction() -> SignedLazerTransaction {
let publisher_update = PublisherUpdate {
updates: vec![FeedUpdate {
feed_id: Some(1),
source_timestamp: MessageField::some(Timestamp::now()),
update: Some(Update::PriceUpdate(PriceUpdate {
price: Some(1_000_000_000i64),
..PriceUpdate::default()
})),
special_fields: Default::default(),
}],
publisher_timestamp: MessageField::some(Timestamp::now()),
special_fields: Default::default(),
};
let lazer_transaction = LazerTransaction {
payload: Some(Payload::PublisherUpdate(publisher_update)),
special_fields: Default::default(),
};
let buf = lazer_transaction.write_to_bytes().unwrap();
let signing_key = get_private_key();
let signature = signing_key.sign(&buf);
let signature_data = SignatureData {
data: Some(Ed25519(Ed25519SignatureData {
signature: Some(signature.to_bytes().into()),
public_key: Some(signing_key.verifying_key().to_bytes().into()),
special_fields: Default::default(),
})),
special_fields: Default::default(),
};
SignedLazerTransaction {
signature_data: MessageField::some(signature_data),
payload: Some(buf),
special_fields: Default::default(),
}
}
}
Loading