Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor agent event indexing #2246

Merged
merged 51 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ba41f21
Start preparations for backfill indexing
asaj May 12, 2023
fe97a32
builds
asaj May 13, 2023
1de96b0
Logging, fix bug that caused unnecessary backtracks
asaj May 13, 2023
4b18101
Ready to begin backwards pass
asaj May 13, 2023
fdb53e5
Backwards pass passing e2e tests
asaj May 13, 2023
56dd8e0
cleanup
asaj May 13, 2023
e62f83f
Faster e2e test
asaj May 13, 2023
fc8a8b8
More e2e speedups
asaj May 13, 2023
002a2ce
more speedups
asaj May 13, 2023
a55229c
Add HyperlaneDb trait
asaj May 14, 2023
30a1f6c
HyperlaneDB -> HyperlaneRocksDB
asaj May 14, 2023
2dfbfbd
ContractSync uses HyperlaneDB
asaj May 14, 2023
339454b
HyperlaneDB is async
asaj May 14, 2023
69df330
SqlChainScraper is HyperlaneDB
asaj May 14, 2023
19dc8e1
Dedupe syncing logic in scraper
asaj May 15, 2023
8ef541b
Revert
asaj May 15, 2023
38331ca
cleanup
asaj May 15, 2023
29c918a
Make contract sync generic
asaj May 16, 2023
02899cf
ForwardBackwardMessageSyncCursor
asaj May 16, 2023
1e1b922
fix build
asaj May 16, 2023
0ccccbb
No foundry in e2e
asaj May 16, 2023
e6e5da4
Fix bug in RateLimitingCursor
asaj May 16, 2023
275562c
Index deliveries
asaj May 16, 2023
53975a7
Read high watermarks
asaj May 16, 2023
e2551e6
Update high watermark
asaj May 17, 2023
43af6e2
clippy
asaj May 17, 2023
046abfc
Cleanup
asaj May 17, 2023
88fd97d
Merge branch 'main' into asaj/indexing
asaj May 17, 2023
7f7025b
merge main
asaj May 17, 2023
3644635
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 17, 2023
09947d8
fmt
asaj May 17, 2023
fd57967
Fix for higher order lifetime issue
mattiekat May 17, 2023
0f9d7c8
Merge remote-tracking branch 'origin/asaj/indexing' into asaj/indexing
mattiekat May 17, 2023
0b0df18
lint
asaj May 17, 2023
08a3f19
fmt
asaj May 17, 2023
fd74c22
some comments
asaj May 18, 2023
2cbf21a
more comments
asaj May 18, 2023
b442787
fix tests
asaj May 18, 2023
743c70f
comments
asaj May 18, 2023
d40081a
fmt
asaj May 18, 2023
1d9fd9f
Merge branch 'main' into asaj/indexing
asaj May 19, 2023
b80b17b
Scraper running in e2e
asaj May 19, 2023
97cf613
Add scraper to e2e test
asaj May 20, 2023
cc5b55a
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 20, 2023
211b05d
Merge branch 'main' into asaj/indexing
asaj May 20, 2023
fa2533b
lint
asaj May 20, 2023
6cde032
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 20, 2023
88db437
metrics
asaj May 22, 2023
9afe4eb
Final pass
asaj May 22, 2023
81f3733
update message count
asaj May 22, 2023
e3aca3f
fmt
asaj May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
HyperlaneDB -> HyperlaneRocksDB
  • Loading branch information
asaj committed May 14, 2023
commit 30a1f6c177f1b8a5b68a0a84d269793994e44a18
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/merkle_tree_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Display;
use eyre::Result;
use tracing::{debug, error, instrument};

use hyperlane_base::db::{DbError, HyperlaneDB};
use hyperlane_base::db::{DbError, HyperlaneRocksDB};
use hyperlane_core::{
accumulator::{incremental::IncrementalMerkle, merkle::Proof},
ChainCommunicationError, H256,
Expand All @@ -14,7 +14,7 @@ use crate::prover::{Prover, ProverError};
/// Struct to sync prover.
#[derive(Debug)]
pub struct MerkleTreeBuilder {
db: HyperlaneDB,
db: HyperlaneRocksDB,
prover: Prover,
incremental: IncrementalMerkle,
}
Expand Down Expand Up @@ -68,7 +68,7 @@ pub enum MerkleTreeBuilderError {
}

impl MerkleTreeBuilder {
pub fn new(db: HyperlaneDB) -> Self {
pub fn new(db: HyperlaneRocksDB) -> Self {
let prover = Prover::default();
let incremental = IncrementalMerkle::default();
Self {
Expand Down
14 changes: 7 additions & 7 deletions rust/agents/relayer/src/msg/gas_payment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use eyre::Result;
use tracing::{debug, error, trace};

use hyperlane_base::db::HyperlaneDB;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_core::{
HyperlaneMessage, InterchainGasExpenditure, InterchainGasPayment, TxCostEstimate, TxOutcome,
U256,
Expand Down Expand Up @@ -40,13 +40,13 @@ pub struct GasPaymentEnforcer {
/// policy or another. If a message matches multiple policies'
/// whitelists, then whichever is first in the list will be used.
policies: Vec<(Box<dyn GasPaymentPolicy>, MatchingList)>,
db: HyperlaneDB,
db: HyperlaneRocksDB,
}

impl GasPaymentEnforcer {
pub fn new(
policy_configs: impl IntoIterator<Item = GasPaymentEnforcementConf>,
db: HyperlaneDB,
db: HyperlaneRocksDB,
) -> Self {
let policies = policy_configs
.into_iter()
Expand Down Expand Up @@ -136,7 +136,7 @@ impl GasPaymentEnforcer {
mod test {
use std::str::FromStr;

use hyperlane_base::db::{test_utils, HyperlaneDB};
use hyperlane_base::db::{test_utils, HyperlaneRocksDB};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TxCostEstimate, H160, H256, U256};

use crate::settings::{
Expand All @@ -148,7 +148,7 @@ mod test {
#[tokio::test]
async fn test_empty_whitelist() {
test_utils::run_test_db(|db| async move {
let hyperlane_db = HyperlaneDB::new(
let hyperlane_db = HyperlaneRocksDB::new(
&HyperlaneDomain::new_test_domain("test_empty_whitelist"),
db,
);
Expand Down Expand Up @@ -185,7 +185,7 @@ mod test {
#[allow(unused_must_use)]
test_utils::run_test_db(|db| async move {
let hyperlane_db =
HyperlaneDB::new(&HyperlaneDomain::new_test_domain("test_no_match"), db);
HyperlaneRocksDB::new(&HyperlaneDomain::new_test_domain("test_no_match"), db);
let matching_list = serde_json::from_str(r#"[{"originDomain": 234}]"#).unwrap();
let enforcer = GasPaymentEnforcer::new(
// Require a payment
Expand All @@ -212,7 +212,7 @@ mod test {
#[tokio::test]
async fn test_non_empty_matching_list() {
test_utils::run_test_db(|db| async move {
let hyperlane_db = HyperlaneDB::new(&HyperlaneDomain::new_test_domain("test_non_empty_matching_list"), db);
let hyperlane_db = HyperlaneRocksDB::new(&HyperlaneDomain::new_test_domain("test_non_empty_matching_list"), db);

let sender_address = "0xaa000000000000000000000000000000000000aa";
let recipient_address = "0xbb000000000000000000000000000000000000bb";
Expand Down
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::{
};
use tracing::{debug, info_span, instrument, instrument::Instrumented, Instrument};

use hyperlane_base::{db::HyperlaneDB, CoreMetrics};
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};

use crate::{
Expand All @@ -19,7 +19,7 @@ use crate::{

#[derive(Debug, new)]
pub(crate) struct MessageProcessor {
db: HyperlaneDB,
db: HyperlaneRocksDB,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
metrics: MessageProcessorMetrics,
Expand All @@ -37,7 +37,7 @@ impl MessageProcessor {

#[instrument(ret, err, skip(self), level = "info")]
async fn main_loop(mut self) -> Result<()> {
// Forever, scan HyperlaneDB looking for new messages to send. When criteria are
// Forever, scan HyperlaneRocksDB looking for new messages to send. When criteria are
// satisfied or the message is disqualified, push the message onto
// self.tx_msg and then continue the scan at the next highest
// nonce.
Expand Down
10 changes: 5 additions & 5 deletions rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};

use hyperlane_base::{db::HyperlaneDB, CachingMailbox, CoreMetrics};
use hyperlane_base::{db::HyperlaneRocksDB, CachingMailbox, CoreMetrics};
use hyperlane_core::{HyperlaneChain, HyperlaneDomain, Mailbox, U256};

use crate::msg::PendingMessage;
Expand Down Expand Up @@ -43,7 +43,7 @@ use super::{
///
/// Finally, the SerialSubmitter ensures that message delivery is robust to
/// destination chain reorgs prior to committing delivery status to
/// HyperlaneDB.
/// HyperlaneRocksDB.
///
///
/// Objectives
Expand Down Expand Up @@ -86,7 +86,7 @@ pub(crate) struct SerialSubmitter {
metadata_builder: BaseMetadataBuilder,
/// Interface to agent rocks DB for e.g. writing delivery status upon
/// completion.
db: HyperlaneDB,
db: HyperlaneRocksDB,
/// Metrics for serial submitter.
metrics: SerialSubmitterMetrics,
/// Used to determine if messages have made sufficient gas payments.
Expand Down Expand Up @@ -310,11 +310,11 @@ impl SerialSubmitter {
}
}

/// Record in HyperlaneDB and various metrics that this process has observed
/// Record in HyperlaneRocksDB and various metrics that this process has observed
/// the successful processing of a message. An `Ok(())` value returned by
/// this function is the 'commit' point in a message's lifetime for
/// final processing -- after this function has been seen to
/// `return Ok(())`, then without a wiped HyperlaneDB, we will never
/// `return Ok(())`, then without a wiped HyperlaneRocksDB, we will never
/// re-attempt processing for this message again, even after the relayer
/// restarts.
fn record_message_process_success(&mut self, msg: &PendingMessage) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/scraper/src/chain_scraper/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Syncer {

// Okay, what do we want to do here?
// For now, let's separate out dispatched message syncing from the rest.
// TODO: Do smart syncing. Tricky bit is that the other cursor relies on a HyperlaneDB, which we don't have here.
// TODO: Do smart syncing. Tricky bit is that the other cursor relies on a HyperlaneRocksDB, which we don't have here.
loop {
let start_block = self.sync_cursor.current_position();
let Ok(Some((from, to, eta))) = self.sync_cursor.next_range().await else { continue };
Expand Down
6 changes: 3 additions & 3 deletions rust/hyperlane-base/src/contract_sync/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use hyperlane_core::{
ChainResult, Indexer, MailboxIndexer, MessageSyncCursor, SyncBlockRangeCursor,
};

use crate::{contract_sync::eta_calculator::SyncerEtaCalculator, db::HyperlaneDB};
use crate::{contract_sync::eta_calculator::SyncerEtaCalculator, db::HyperlaneRocksDB};

/// Time window for the moving average used in the eta calculator in seconds.
const ETA_TIME_WINDOW: f64 = 2. * 60.;
Expand All @@ -24,8 +24,8 @@ const ETA_TIME_WINDOW: f64 = 2. * 60.;
pub struct MessageSyncCursorData<I> {
/// The MailboxIndexer that this cursor is associated with.
indexer: I,
/// The HyperlaneDB that this cursor is associated with.
db: HyperlaneDB,
/// The HyperlaneRocksDB that this cursor is associated with.
db: HyperlaneRocksDB,
/// The size of the largest block range that should be returned by the cursor.
chunk_size: u32,
/// The starting block for the cursor
Expand Down
4 changes: 2 additions & 2 deletions rust/hyperlane-base/src/contract_sync/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod test {
use crate::{
contract_sync::{mailbox::MOCK_CURSOR, schema::MailboxContractSyncDB, IndexSettings},
db::test_utils,
db::HyperlaneDB,
db::HyperlaneRocksDB,
ContractSync, ContractSyncMetrics, CoreMetrics,
};

Expand Down Expand Up @@ -278,7 +278,7 @@ mod test {
});
}

let hyperlane_db = HyperlaneDB::new(
let hyperlane_db = HyperlaneRocksDB::new(
&HyperlaneDomain::new_test_domain("handles_missing_rpc_messages"),
db,
);
Expand Down
2 changes: 1 addition & 1 deletion rust/hyperlane-base/src/contract_sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct ContractSyncMetrics {
/// - `chain`: Chain the indexer is collecting data from.
pub indexed_height: IntGaugeVec,

/// Events stored into HyperlaneDB (label values differentiate checkpoints vs.
/// Events stored into HyperlaneRocksDB (label values differentiate checkpoints vs.
/// messages)
///
/// Labels:
Expand Down
4 changes: 2 additions & 2 deletions rust/hyperlane-base/src/contract_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub use interchain_gas::*;
pub use mailbox::*;
pub use metrics::ContractSyncMetrics;

use crate::{chains::IndexSettings, db::HyperlaneDB};
use crate::{chains::IndexSettings, db::HyperlaneRocksDB};

mod cursor;
mod eta_calculator;
Expand All @@ -24,7 +24,7 @@ mod schema;
#[derive(Debug, new, Clone)]
pub(crate) struct ContractSync<I> {
domain: HyperlaneDomain,
db: HyperlaneDB,
db: HyperlaneRocksDB,
indexer: I,
index_settings: IndexSettings,
metrics: ContractSyncMetrics,
Expand Down
7 changes: 4 additions & 3 deletions rust/hyperlane-base/src/contract_sync/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use eyre::Result;

use crate::db::{DbError, HyperlaneDB};
use crate::db::{DbError, HyperlaneRocksDB};

/// The start block number of the latest "valid" message block range.
/// This is an interval of block indexes where > 0 messages were indexed,
Expand All @@ -10,12 +10,13 @@ use crate::db::{DbError, HyperlaneDB};
static LATEST_VALID_MESSAGE_RANGE_START_BLOCK: &str = "latest_valid_message_range_start_block";
static LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block";

// TODO: Can all of these be deleted?
pub(crate) trait MailboxContractSyncDB {
fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError>;
fn retrieve_latest_valid_message_range_start_block(&self) -> Option<u32>;
}

impl MailboxContractSyncDB for HyperlaneDB {
impl MailboxContractSyncDB for HyperlaneRocksDB {
fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError> {
self.store_encodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK, &block_num)
}
Expand All @@ -31,7 +32,7 @@ pub(crate) trait InterchainGasPaymasterContractSyncDB {
fn retrieve_latest_indexed_gas_payment_block(&self) -> Option<u32>;
}

impl InterchainGasPaymasterContractSyncDB for HyperlaneDB {
impl InterchainGasPaymasterContractSyncDB for HyperlaneRocksDB {
fn store_latest_indexed_gas_payment_block(&self, latest_block: u32) -> Result<(), DbError> {
self.store_encodable("", LATEST_INDEXED_GAS_PAYMENT_BLOCK, &latest_block)
}
Expand Down
106 changes: 2 additions & 104 deletions rust/hyperlane-base/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,106 +1,4 @@
use std::path::PathBuf;
use std::{io, path::Path, sync::Arc};

use hyperlane_core::HyperlaneProtocolError;
use rocksdb::{Options, DB as Rocks};
use tracing::info;

pub use hyperlane_db::*;
pub use typed_db::*;

/// Shared functionality surrounding use of rocksdb
pub mod iterator;
pub use rocks::*;

/// DB operations tied to specific Mailbox
mod hyperlane_db;
/// Type-specific db operations
mod typed_db;

/// Internal-use storage types.
mod storage_types;

/// Database test utilities.
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

#[derive(Debug, Clone)]
/// A KV Store
pub struct DB(Arc<Rocks>);

impl From<Rocks> for DB {
fn from(rocks: Rocks) -> Self {
Self(Arc::new(rocks))
}
}

/// DB Error type
#[derive(thiserror::Error, Debug)]
pub enum DbError {
/// Rocks DB Error
#[error("{0}")]
RockError(#[from] rocksdb::Error),
#[error("Failed to open {path}, canonicalized as {canonicalized}: {source}")]
/// Error opening the database
OpeningError {
/// Rocksdb error during opening
#[source]
source: rocksdb::Error,
/// Raw database path provided
path: PathBuf,
/// Parsed path used
canonicalized: PathBuf,
},
/// Could not parse the provided database path string
#[error("Invalid database path supplied {1:?}; {0}")]
InvalidDbPath(#[source] io::Error, String),
/// Hyperlane Error
#[error("{0}")]
HyperlaneError(#[from] HyperlaneProtocolError),
}

type Result<T> = std::result::Result<T, DbError>;

impl DB {
/// Opens db at `db_path` and creates if missing
#[tracing::instrument(err)]
pub fn from_path(db_path: &Path) -> Result<DB> {
let path = {
let mut path = db_path
.parent()
.unwrap_or(Path::new("."))
.canonicalize()
.map_err(|e| DbError::InvalidDbPath(e, db_path.to_string_lossy().into()))?;
if let Some(file_name) = db_path.file_name() {
path.push(file_name);
}
path
};

if path.is_dir() {
info!(path=%path.to_string_lossy(), "Opening existing db")
} else {
info!(path=%path.to_string_lossy(), "Creating db")
}

let mut opts = Options::default();
opts.create_if_missing(true);

Rocks::open(&opts, &path)
.map_err(|e| DbError::OpeningError {
source: e,
path: db_path.into(),
canonicalized: path,
})
.map(Into::into)
}

/// Store a value in the DB
pub fn store(&self, key: &[u8], value: &[u8]) -> Result<()> {
Ok(self.0.put(key, value)?)
}

/// Retrieve a value from the DB
pub fn retrieve(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.0.get(key)?)
}
}
mod rocks;
asaj marked this conversation as resolved.
Show resolved Hide resolved
Loading