Skip to content

Commit

Permalink
Properly store gas payment expenditure data in agent rocks db (#2268)
Browse files Browse the repository at this point in the history
### Description

This PR fixes a bug that resulted in gas payment expenditures not being
properly stored in the rocks DB, making them unretrievable.

### Drive-by changes

- Introduces a macro to HyperlaneRocksDB to prevent similar bugs from
happening in the future
  • Loading branch information
asaj authored May 22, 2023
1 parent 63562c7 commit 73fb967
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 95 deletions.
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/merkle_tree_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl MerkleTreeBuilder {
}

fn ingest_nonce(&mut self, nonce: u32) -> Result<(), MerkleTreeBuilderError> {
match self.db.message_id_by_nonce(nonce) {
match self.db.retrieve_message_id_by_nonce(&nonce) {
Ok(Some(leaf)) => {
debug!(nonce, "Ingesting leaf");
self.prover.ingest(leaf).expect("!tree full");
Expand Down
4 changes: 2 additions & 2 deletions rust/agents/relayer/src/msg/gas_payment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl GasPaymentEnforcer {
tx_cost_estimate: &TxCostEstimate,
) -> Result<Option<U256>> {
let msg_id = message.id();
let current_payment = self.db.retrieve_gas_payment_for_message_id(msg_id)?;
let current_expenditure = self.db.retrieve_gas_expenditure_for_message_id(msg_id)?;
let current_payment = self.db.retrieve_gas_payment_by_message_id(msg_id)?;
let current_expenditure = self.db.retrieve_gas_expenditure_by_message_id(msg_id)?;
for (policy, whitelist) in &self.policies {
if !whitelist.msg_matches(message, true) {
trace!(
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl PendingMessage {
fn record_message_process_success(&mut self) -> Result<()> {
self.ctx
.origin_db
.mark_nonce_as_processed(self.message.nonce)?;
.store_processed_by_nonce(&self.message.nonce, &true)?;
self.ctx.metrics.update_nonce(&self.message);
self.ctx.metrics.messages_processed.inc();
Ok(())
Expand Down
8 changes: 6 additions & 2 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl MessageProcessor {
fn try_get_unprocessed_message(&mut self) -> Result<Option<HyperlaneMessage>> {
loop {
// First, see if we can find the message so we can update the gauge.
if let Some(message) = self.db.message_by_nonce(self.message_nonce)? {
if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? {
// Update the latest nonce gauges
self.metrics
.max_last_known_message_nonce_gauge
Expand All @@ -90,7 +90,11 @@ impl MessageProcessor {
}

// If this message has already been processed, on to the next one.
if !self.db.retrieve_message_processed(self.message_nonce)? {
if !self
.db
.retrieve_processed_by_nonce(&self.message_nonce)?
.unwrap_or(false)
{
return Ok(Some(message));
} else {
debug!(nonce=?self.message_nonce, "Message already marked as processed in DB");
Expand Down
150 changes: 68 additions & 82 deletions rust/hyperlane-base/src/db/rocks/hyperlane_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::time::Duration;

use async_trait::async_trait;
use eyre::Result;
use paste::paste;
use tokio::time::sleep;
use tracing::{debug, trace};

use hyperlane_core::{
HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneMessageStore,
HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment,
InterchainGasPaymentMeta, LogMeta, H256, U256,
InterchainGasPaymentMeta, LogMeta, H256,
};

use super::{
Expand All @@ -26,7 +27,7 @@ const MESSAGE: &str = "message_";
const NONCE_PROCESSED: &str = "nonce_processed_";
const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_for_message_id_v2_";
const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v2_";
const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_";
const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_";
const LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block";

type DbResult<T> = std::result::Result<T, DbError>;
Expand Down Expand Up @@ -77,7 +78,7 @@ impl HyperlaneRocksDB {
message: &HyperlaneMessage,
dispatched_block_number: u64,
) -> DbResult<bool> {
if let Ok(Some(_)) = self.message_id_by_nonce(message.nonce) {
if let Ok(Some(_)) = self.retrieve_message_id_by_nonce(&message.nonce) {
trace!(msg=?message, "Message already stored in db");
return Ok(false);
}
Expand All @@ -86,34 +87,20 @@ impl HyperlaneRocksDB {
debug!(msg=?message, "Storing new message in db",);

// - `id` --> `message`
self.store_keyed_encodable(MESSAGE, &id, message)?;
self.store_message_by_id(&id, message)?;
// - `nonce` --> `id`
self.store_keyed_encodable(MESSAGE_ID, &message.nonce, &id)?;
self.store_message_id_by_nonce(&message.nonce, &id)?;
// - `nonce` --> `dispatched block number`
self.store_keyed_encodable(
MESSAGE_DISPATCHED_BLOCK_NUMBER,
&message.nonce,
&dispatched_block_number,
)?;
self.store_dispatched_block_number_by_nonce(&message.nonce, &dispatched_block_number)?;
Ok(true)
}

/// Retrieve a message by its id
pub fn message_by_id(&self, id: H256) -> DbResult<Option<HyperlaneMessage>> {
self.retrieve_keyed_decodable(MESSAGE, &id)
}

/// Retrieve the message id keyed by nonce
pub fn message_id_by_nonce(&self, nonce: u32) -> DbResult<Option<H256>> {
self.retrieve_keyed_decodable(MESSAGE_ID, &nonce)
}

/// Retrieve a message by its nonce
pub fn message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>> {
let id: Option<H256> = self.message_id_by_nonce(nonce)?;
pub fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>> {
let id = self.retrieve_message_id_by_nonce(&nonce)?;
match id {
None => Ok(None),
Some(id) => self.message_by_id(id),
Some(id) => self.retrieve_message_by_id(&id),
}
}

Expand All @@ -123,27 +110,14 @@ impl HyperlaneRocksDB {
let slf = self.clone();
async move {
loop {
if let Some(id) = slf.message_id_by_nonce(nonce)? {
if let Some(id) = slf.retrieve_message_id_by_nonce(&nonce)? {
return Ok(id);
}
sleep(Duration::from_millis(100)).await
}
}
}

/// Mark nonce as processed
pub fn mark_nonce_as_processed(&self, nonce: u32) -> DbResult<()> {
debug!(?nonce, "mark nonce as processed");
self.store_keyed_encodable(NONCE_PROCESSED, &nonce, &true)
}

/// Retrieve nonce processed status
pub fn retrieve_message_processed(&self, nonce: u32) -> Result<bool> {
Ok(self
.retrieve_keyed_decodable(NONCE_PROCESSED, &nonce)?
.unwrap_or(false))
}

/// If the provided gas payment, identified by its metadata, has not been
/// processed, processes the gas payment and records it as processed.
/// Returns whether the gas payment was processed for the first time.
Expand All @@ -154,7 +128,10 @@ impl HyperlaneRocksDB {
) -> DbResult<bool> {
let payment_meta = log_meta.into();
// If the gas payment has already been processed, do nothing
if self.retrieve_gas_payment_meta_processed(&payment_meta)? {
if self
.retrieve_processed_by_gas_payment_meta(&payment_meta)?
.unwrap_or(false)
{
trace!(
?payment,
?log_meta,
Expand All @@ -164,10 +141,10 @@ impl HyperlaneRocksDB {
return Ok(false);
}
// Set the gas payment as processed
self.store_gas_payment_meta_processed(&payment_meta)?;
self.store_processed_by_gas_payment_meta(&payment_meta, &true)?;

// Update the total gas payment for the message to include the payment
self.update_gas_payment_for_message_id(payment)?;
self.update_gas_payment_by_message_id(payment)?;

// Return true to indicate the gas payment was processed for the first time
Ok(true)
Expand All @@ -177,82 +154,57 @@ impl HyperlaneRocksDB {
/// message.
pub fn process_gas_expenditure(&self, expenditure: InterchainGasExpenditure) -> DbResult<()> {
// Update the total gas expenditure for the message to include the payment
self.update_gas_expenditure_for_message_id(expenditure)
}

/// Record a gas payment, identified by its metadata, as processed
fn store_gas_payment_meta_processed(&self, meta: &InterchainGasPaymentMeta) -> DbResult<()> {
self.store_keyed_encodable(GAS_PAYMENT_META_PROCESSED, meta, &true)
}

/// Get whether a gas payment, identified by its metadata, has been
/// processed already
fn retrieve_gas_payment_meta_processed(
&self,
meta: &InterchainGasPaymentMeta,
) -> DbResult<bool> {
Ok(self
.retrieve_keyed_decodable(GAS_PAYMENT_META_PROCESSED, meta)?
.unwrap_or(false))
self.update_gas_expenditure_by_message_id(expenditure)
}

/// Update the total gas payment for a message to include gas_payment
fn update_gas_payment_for_message_id(&self, event: InterchainGasPayment) -> DbResult<()> {
let existing_payment = self.retrieve_gas_payment_for_message_id(event.message_id)?;
fn update_gas_payment_by_message_id(&self, event: InterchainGasPayment) -> DbResult<()> {
let existing_payment = self.retrieve_gas_payment_by_message_id(event.message_id)?;
let total = existing_payment + event;

debug!(?event, new_total_gas_payment=?total, "Storing gas payment");
self.store_keyed_encodable::<_, InterchainGasPaymentData>(
GAS_PAYMENT_FOR_MESSAGE_ID,
&total.message_id,
&total.into(),
)?;
self.store_interchain_gas_payment_data_by_message_id(&total.message_id, &total.into())?;

Ok(())
}

/// Update the total gas spent for a message
fn update_gas_expenditure_for_message_id(
fn update_gas_expenditure_by_message_id(
&self,
event: InterchainGasExpenditure,
) -> DbResult<()> {
let existing_payment = self.retrieve_gas_expenditure_for_message_id(event.message_id)?;
let existing_payment = self.retrieve_gas_expenditure_by_message_id(event.message_id)?;
let total = existing_payment + event;

debug!(?event, new_total_gas_payment=?total, "Storing gas payment");
self.store_keyed_encodable::<_, U256>(
GAS_EXPENDITURE_FOR_MESSAGE_ID,
self.store_interchain_gas_expenditure_data_by_message_id(
&total.message_id,
&total.tokens_used,
&InterchainGasExpenditureData {
tokens_used: total.tokens_used,
gas_used: total.gas_used,
},
)?;

Ok(())
}

/// Retrieve the total gas payment for a message
pub fn retrieve_gas_payment_for_message_id(
pub fn retrieve_gas_payment_by_message_id(
&self,
message_id: H256,
) -> DbResult<InterchainGasPayment> {
Ok(self
.retrieve_keyed_decodable::<_, InterchainGasPaymentData>(
GAS_PAYMENT_FOR_MESSAGE_ID,
&message_id,
)?
.retrieve_interchain_gas_payment_data_by_message_id(&message_id)?
.unwrap_or_default()
.complete(message_id))
}

/// Retrieve the total gas payment for a message
pub fn retrieve_gas_expenditure_for_message_id(
pub fn retrieve_gas_expenditure_by_message_id(
&self,
message_id: H256,
) -> DbResult<InterchainGasExpenditure> {
Ok(self
.retrieve_keyed_decodable::<_, InterchainGasExpenditureData>(
GAS_EXPENDITURE_FOR_MESSAGE_ID,
&message_id,
)?
.retrieve_interchain_gas_expenditure_data_by_message_id(&message_id)?
.unwrap_or_default()
.complete(message_id))
}
Expand Down Expand Up @@ -291,13 +243,13 @@ impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneRocksDB {
impl HyperlaneMessageStore for HyperlaneRocksDB {
/// Gets a message by nonce.
async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result<Option<HyperlaneMessage>> {
let message = self.message_by_nonce(nonce)?;
let message = self.retrieve_message_by_nonce(nonce)?;
Ok(message)
}

/// Retrieve dispatched block number by message nonce
async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result<Option<u64>> {
let number = self.retrieve_keyed_decodable(MESSAGE_DISPATCHED_BLOCK_NUMBER, &nonce)?;
let number = self.retrieve_dispatched_block_number_by_nonce(&nonce)?;
Ok(number)
}
}
Expand All @@ -320,3 +272,37 @@ where
Ok(result)
}
}

/// Generate a call to ChainSetup for the given builder
macro_rules! make_store_and_retrieve {
($vis:vis, $name_suffix:ident, $key_prefix: ident, $key_ty:ty, $val_ty:ty$(,)?) => {
impl HyperlaneRocksDB {
paste! {
/// Stores a key value pair in the DB
$vis fn [<store_ $name_suffix>] (
&self,
key: &$key_ty,
val: &$val_ty,
) -> DbResult<()> {
self.store_keyed_encodable($key_prefix, key, val)
}

/// Retrieves a key value pair from the DB
$vis fn [<retrieve_ $name_suffix>] (
&self,
key: &$key_ty,
) -> DbResult<Option<$val_ty>> {
self.retrieve_keyed_decodable($key_prefix, key)
}
}
}
};
}

make_store_and_retrieve!(pub, message_id_by_nonce, MESSAGE_ID, u32, H256);
make_store_and_retrieve!(pub(self), message_by_id, MESSAGE, H256, HyperlaneMessage);
make_store_and_retrieve!(pub(self), dispatched_block_number_by_nonce, MESSAGE_DISPATCHED_BLOCK_NUMBER, u32, u64);
make_store_and_retrieve!(pub, processed_by_nonce, NONCE_PROCESSED, u32, bool);
make_store_and_retrieve!(pub(self), processed_by_gas_payment_meta, GAS_PAYMENT_META_PROCESSED, InterchainGasPaymentMeta, bool);
make_store_and_retrieve!(pub(self), interchain_gas_expenditure_data_by_message_id, GAS_EXPENDITURE_FOR_MESSAGE_ID, H256, InterchainGasExpenditureData);
make_store_and_retrieve!(pub(self), interchain_gas_payment_data_by_message_id, GAS_PAYMENT_FOR_MESSAGE_ID, H256, InterchainGasPaymentData);
8 changes: 1 addition & 7 deletions rust/hyperlane-base/src/db/rocks/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@ mod test {

db.store_logs(&vec![(m.clone(), meta)]).await.unwrap();

let by_id = db.message_by_id(m.id()).unwrap().unwrap();
assert_eq!(
RawHyperlaneMessage::from(&by_id),
RawHyperlaneMessage::from(&m)
);

let by_nonce = db.message_by_nonce(m.nonce).unwrap().unwrap();
let by_nonce = db.retrieve_message_by_nonce(m.nonce).unwrap().unwrap();
assert_eq!(
RawHyperlaneMessage::from(&by_nonce),
RawHyperlaneMessage::from(&m)
Expand Down

0 comments on commit 73fb967

Please sign in to comment.