Skip to content

fix(hermes): improve TWAP reliability - non-optional price selection and consistent time windows #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.8.3"
version = "0.8.4"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
2 changes: 1 addition & 1 deletion apps/hermes/server/src/api/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ mod tests {
async fn get_twaps_with_update_data(
&self,
_price_ids: &[PriceIdentifier],
_start_time: RequestTime,
_window_seconds: u64,
_end_time: RequestTime,
) -> Result<TwapsWithUpdateData> {
unimplemented!("Not needed for this test")
Expand Down
14 changes: 3 additions & 11 deletions apps/hermes/server/src/api/rest/v2/latest_twaps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
Json,
},
base64::{engine::general_purpose::STANDARD as base64_standard_engine, Engine as _},
pyth_sdk::{DurationInSeconds, PriceIdentifier, UnixTimestamp},
pyth_sdk::{DurationInSeconds, PriceIdentifier},
serde::Deserialize,
serde_qs::axum::QsQuery,
utoipa::IntoParams,
Expand Down Expand Up @@ -105,20 +105,12 @@ where
let price_ids: Vec<PriceIdentifier> =
validate_price_ids(&state, &price_id_inputs, params.ignore_invalid_price_ids).await?;

// Collect start and end bounds for the TWAP window
let window_seconds = path_params.window_seconds as i64;
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as UnixTimestamp;
let start_time = current_time - window_seconds;

// Calculate the average
let twaps_with_update_data = Aggregates::get_twaps_with_update_data(
&*state.state,
&price_ids,
RequestTime::FirstAfter(start_time),
RequestTime::Latest,
path_params.window_seconds,
RequestTime::LatestTimeEarliestSlot,
)
.await
.map_err(|e| {
Expand Down
164 changes: 146 additions & 18 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use log::warn;
#[cfg(test)]
use mock_instant::{SystemTime, UNIX_EPOCH};
use pythnet_sdk::messages::TwapMessage;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub type UnixTimestamp = i64;
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum RequestTime {
Latest,
LatestTimeEarliestSlot,
FirstAfter(UnixTimestamp),
AtSlot(Slot),
}
Expand Down Expand Up @@ -242,7 +244,7 @@ where
async fn get_twaps_with_update_data(
&self,
price_ids: &[PriceIdentifier],
start_time: RequestTime,
window_seconds: u64,
end_time: RequestTime,
) -> Result<TwapsWithUpdateData>;
}
Expand Down Expand Up @@ -410,16 +412,11 @@ where
async fn get_twaps_with_update_data(
&self,
price_ids: &[PriceIdentifier],
start_time: RequestTime,
window_seconds: u64,
end_time: RequestTime,
) -> Result<TwapsWithUpdateData> {
match get_verified_twaps_with_update_data(
self,
price_ids,
start_time.clone(),
end_time.clone(),
)
.await
match get_verified_twaps_with_update_data(self, price_ids, window_seconds, end_time.clone())
.await
{
Ok(twaps_with_update_data) => Ok(twaps_with_update_data),
Err(e) => {
Expand Down Expand Up @@ -637,33 +634,68 @@ where
async fn get_verified_twaps_with_update_data<S>(
state: &S,
price_ids: &[PriceIdentifier],
start_time: RequestTime,
window_seconds: u64,
end_time: RequestTime,
) -> Result<TwapsWithUpdateData>
where
S: Cache,
{
// Get all start messages for all price IDs
let start_messages = state
// Get all end messages for all price IDs
let end_messages = state
.fetch_message_states(
price_ids.iter().map(|id| id.to_bytes()).collect(),
start_time.clone(),
end_time.clone(),
MessageStateFilter::Only(MessageType::TwapMessage),
)
.await?;

// Get all end messages for all price IDs
let end_messages = state
// Calculate start_time based on the publish time of the end messages
// to guarantee that the start and end messages are window_seconds apart
let start_timestamp = if end_messages.is_empty() {
// If there are no end messages, we can't calculate a TWAP
tracing::warn!(
price_ids = ?price_ids,
time = ?end_time,
"Could not find TWAP messages"
);
return Err(anyhow!(
"Update data not found for the specified timestamps"
));
} else {
// Use the publish time from the first end message
end_messages[0].message.publish_time() - window_seconds as i64
};
let start_time = RequestTime::FirstAfter(start_timestamp);

// Get all start messages for all price IDs
let start_messages = state
.fetch_message_states(
price_ids.iter().map(|id| id.to_bytes()).collect(),
end_time.clone(),
start_time.clone(),
MessageStateFilter::Only(MessageType::TwapMessage),
)
.await?;

if start_messages.is_empty() {
tracing::warn!(
price_ids = ?price_ids,
time = ?start_time,
"Could not find TWAP messages"
);
return Err(anyhow!(
"Update data not found for the specified timestamps"
));
}

// Verify we have matching start and end messages.
// The cache should throw an error earlier, but checking just in case.
if start_messages.len() != end_messages.len() {
tracing::warn!(
price_ids = ?price_ids,
start_message_length = ?price_ids,
end_message_length = ?start_time,
"Start and end messages length mismatch"
);
return Err(anyhow!(
"Update data not found for the specified timestamps"
));
Expand Down Expand Up @@ -695,6 +727,11 @@ where
});
}
Err(e) => {
tracing::error!(
feed_id = ?start_twap.feed_id,
error = %e,
"Failed to calculate TWAP for price feed"
);
return Err(anyhow!(
"Failed to calculate TWAP for price feed {:?}: {}",
start_twap.feed_id,
Expand Down Expand Up @@ -1295,7 +1332,7 @@ mod test {
PriceIdentifier::new(feed_id_1),
PriceIdentifier::new(feed_id_2),
],
RequestTime::FirstAfter(100), // Start time
100, // window seconds
RequestTime::FirstAfter(200), // End time
)
.await
Expand Down Expand Up @@ -1329,6 +1366,97 @@ mod test {
// update_data should have 2 elements, one for the start block and one for the end block.
assert_eq!(result.update_data.len(), 2);
}

#[tokio::test]
/// Tests that the TWAP calculation correctly selects TWAP messages that are the first ones
/// for their timestamp (non-optional prices). This is important because if a message such that
/// `publish_time == prev_publish_time`is chosen, the TWAP calculation will fail due to the optionality check.
async fn test_get_verified_twaps_with_update_data_uses_non_optional_prices() {
let (state, _update_rx) = setup_state(10).await;
let feed_id = [1u8; 32];

// Store start TWAP message
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(
vec![create_basic_twap_message(
feed_id, 100, // cumulative_price
0, // num_down_slots
100, // publish_time
99, // prev_publish_time
1000, // publish_slot
)],
1000,
20,
),
)
.await;

// Store end TWAP messages

// This first message has the latest publish_time and earliest slot,
// so it should be chosen as the end_message to calculate TWAP with.
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(
vec![create_basic_twap_message(
feed_id, 300, // cumulative_price
50, // num_down_slots
200, // publish_time
180, // prev_publish_time
1100, // publish_slot
)],
1100,
21,
),
)
.await;

// This second message has the same publish_time as the previous one and a later slot.
// It will fail the optionality check since publish_time == prev_publish_time.
// Thus, it should not be chosen to calculate TWAP with.
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(
vec![create_basic_twap_message(
feed_id, 900, // cumulative_price
50, // num_down_slots
200, // publish_time
200, // prev_publish_time
1101, // publish_slot
)],
1101,
22,
),
)
.await;

// Get TWAPs over timestamp window 100 -> 200
let result = get_verified_twaps_with_update_data(
&*state,
&[PriceIdentifier::new(feed_id)],
100, // window seconds
RequestTime::LatestTimeEarliestSlot, // End time
)
.await
.unwrap();

// Verify that the first end message was chosen to calculate the TWAP
// and that the calculation is accurate
assert_eq!(result.twaps.len(), 1);
let twap_1 = result
.twaps
.iter()
.find(|t| t.id == PriceIdentifier::new(feed_id))
.unwrap();
assert_eq!(twap_1.twap.price, 2); // (300-100)/(1100-1000) = 2
assert_eq!(twap_1.down_slots_ratio, Decimal::from_f64(0.5).unwrap()); // (50-0)/(1100-1000) = 0.5
assert_eq!(twap_1.start_timestamp, 100);
assert_eq!(twap_1.end_timestamp, 200);

// update_data should have 2 elements, one for the start block and one for the end block.
assert_eq!(result.update_data.len(), 2);
}
#[tokio::test]

async fn test_get_verified_twaps_with_missing_messages_throws_error() {
Expand Down Expand Up @@ -1385,7 +1513,7 @@ mod test {
PriceIdentifier::new(feed_id_1),
PriceIdentifier::new(feed_id_2),
],
RequestTime::FirstAfter(100),
100,
RequestTime::FirstAfter(200),
)
.await;
Expand Down
Loading
Loading