Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ba41f21
Start preparations for backfill indexing
asaj May 12, 2023
fe97a32
builds
asaj May 13, 2023
1de96b0
Logging, fix bug that caused unnecessary backtracks
asaj May 13, 2023
4b18101
Ready to begin backwards pass
asaj May 13, 2023
fdb53e5
Backwards pass passing e2e tests
asaj May 13, 2023
56dd8e0
cleanup
asaj May 13, 2023
e62f83f
Faster e2e test
asaj May 13, 2023
fc8a8b8
More e2e speedups
asaj May 13, 2023
002a2ce
more speedups
asaj May 13, 2023
a55229c
Add HyperlaneDb trait
asaj May 14, 2023
30a1f6c
HyperlaneDB -> HyperlaneRocksDB
asaj May 14, 2023
2dfbfbd
ContractSync uses HyperlaneDB
asaj May 14, 2023
339454b
HyperlaneDB is async
asaj May 14, 2023
69df330
SqlChainScraper is HyperlaneDB
asaj May 14, 2023
19dc8e1
Dedupe syncing logic in scraper
asaj May 15, 2023
8ef541b
Revert
asaj May 15, 2023
38331ca
cleanup
asaj May 15, 2023
29c918a
Make contract sync generic
asaj May 16, 2023
02899cf
ForwardBackwardMessageSyncCursor
asaj May 16, 2023
1e1b922
fix build
asaj May 16, 2023
0ccccbb
No foundry in e2e
asaj May 16, 2023
e6e5da4
Fix bug in RateLimitingCursor
asaj May 16, 2023
275562c
Index deliveries
asaj May 16, 2023
53975a7
Read high watermarks
asaj May 16, 2023
e2551e6
Update high watermark
asaj May 17, 2023
43af6e2
clippy
asaj May 17, 2023
046abfc
Cleanup
asaj May 17, 2023
88fd97d
Merge branch 'main' into asaj/indexing
asaj May 17, 2023
7f7025b
merge main
asaj May 17, 2023
3644635
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 17, 2023
09947d8
fmt
asaj May 17, 2023
fd57967
Fix for higher order lifetime issue
mattiekat May 17, 2023
0f9d7c8
Merge remote-tracking branch 'origin/asaj/indexing' into asaj/indexing
mattiekat May 17, 2023
0b0df18
lint
asaj May 17, 2023
08a3f19
fmt
asaj May 17, 2023
fd74c22
some comments
asaj May 18, 2023
2cbf21a
more comments
asaj May 18, 2023
b442787
fix tests
asaj May 18, 2023
743c70f
comments
asaj May 18, 2023
d40081a
fmt
asaj May 18, 2023
1d9fd9f
Merge branch 'main' into asaj/indexing
asaj May 19, 2023
b80b17b
Scraper running in e2e
asaj May 19, 2023
97cf613
Add scraper to e2e test
asaj May 20, 2023
cc5b55a
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 20, 2023
211b05d
Merge branch 'main' into asaj/indexing
asaj May 20, 2023
fa2533b
lint
asaj May 20, 2023
6cde032
Merge branch 'asaj/indexing' of github.com:hyperlane-xyz/hyperlane-mo…
asaj May 20, 2023
88db437
metrics
asaj May 22, 2023
9afe4eb
Final pass
asaj May 22, 2023
81f3733
update message count
asaj May 22, 2023
e3aca3f
fmt
asaj May 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ jobs:
env:
E2E_CI_MODE: 'true'
E2E_CI_TIMEOUT_SEC: '600'
E2E_KATHY_ROUNDS: '2'
E2E_KATHY_MESSAGES: '20'
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"private": true,
"scripts": {
"build": "yarn workspaces foreach --verbose --parallel --topological run build",
"build:e2e": "yarn workspaces foreach --verbose --parallel --exclude @hyperlane-xyz/monorepo --exclude @hyperlane-xyz/hyperlane-token --topological run build",
"clean": "yarn workspaces foreach --verbose --parallel run clean",
"sync-submodules": "./scripts/sync-submodules.sh",
"postinstall": "husky install",
Expand Down
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/merkle_tree_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Display;
use eyre::Result;
use tracing::{debug, error, instrument};

use hyperlane_base::db::{DbError, HyperlaneDB};
use hyperlane_base::db::{DbError, HyperlaneRocksDB};
use hyperlane_core::{
accumulator::{incremental::IncrementalMerkle, merkle::Proof},
ChainCommunicationError, H256,
Expand All @@ -14,7 +14,7 @@ use crate::prover::{Prover, ProverError};
/// Struct to sync prover.
#[derive(Debug)]
pub struct MerkleTreeBuilder {
db: HyperlaneDB,
db: HyperlaneRocksDB,
prover: Prover,
incremental: IncrementalMerkle,
}
Expand Down Expand Up @@ -68,7 +68,7 @@ pub enum MerkleTreeBuilderError {
}

impl MerkleTreeBuilder {
pub fn new(db: HyperlaneDB) -> Self {
pub fn new(db: HyperlaneRocksDB) -> Self {
let prover = Prover::default();
let incremental = IncrementalMerkle::default();
Self {
Expand Down
14 changes: 7 additions & 7 deletions rust/agents/relayer/src/msg/gas_payment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use eyre::Result;
use tracing::{debug, error, trace};

use hyperlane_base::db::HyperlaneDB;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_core::{
HyperlaneMessage, InterchainGasExpenditure, InterchainGasPayment, TxCostEstimate, TxOutcome,
U256,
Expand Down Expand Up @@ -40,13 +40,13 @@ pub struct GasPaymentEnforcer {
/// policy or another. If a message matches multiple policies'
/// whitelists, then whichever is first in the list will be used.
policies: Vec<(Box<dyn GasPaymentPolicy>, MatchingList)>,
db: HyperlaneDB,
db: HyperlaneRocksDB,
}

impl GasPaymentEnforcer {
pub fn new(
policy_configs: impl IntoIterator<Item = GasPaymentEnforcementConf>,
db: HyperlaneDB,
db: HyperlaneRocksDB,
) -> Self {
let policies = policy_configs
.into_iter()
Expand Down Expand Up @@ -136,7 +136,7 @@ impl GasPaymentEnforcer {
mod test {
use std::str::FromStr;

use hyperlane_base::db::{test_utils, HyperlaneDB};
use hyperlane_base::db::{test_utils, HyperlaneRocksDB};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TxCostEstimate, H160, H256, U256};

use crate::settings::{
Expand All @@ -148,7 +148,7 @@ mod test {
#[tokio::test]
async fn test_empty_whitelist() {
test_utils::run_test_db(|db| async move {
let hyperlane_db = HyperlaneDB::new(
let hyperlane_db = HyperlaneRocksDB::new(
&HyperlaneDomain::new_test_domain("test_empty_whitelist"),
db,
);
Expand Down Expand Up @@ -185,7 +185,7 @@ mod test {
#[allow(unused_must_use)]
test_utils::run_test_db(|db| async move {
let hyperlane_db =
HyperlaneDB::new(&HyperlaneDomain::new_test_domain("test_no_match"), db);
HyperlaneRocksDB::new(&HyperlaneDomain::new_test_domain("test_no_match"), db);
let matching_list = serde_json::from_str(r#"[{"originDomain": 234}]"#).unwrap();
let enforcer = GasPaymentEnforcer::new(
// Require a payment
Expand All @@ -212,7 +212,7 @@ mod test {
#[tokio::test]
async fn test_non_empty_matching_list() {
test_utils::run_test_db(|db| async move {
let hyperlane_db = HyperlaneDB::new(&HyperlaneDomain::new_test_domain("test_non_empty_matching_list"), db);
let hyperlane_db = HyperlaneRocksDB::new(&HyperlaneDomain::new_test_domain("test_non_empty_matching_list"), db);

let sender_address = "0xaa000000000000000000000000000000000000aa";
let recipient_address = "0xbb000000000000000000000000000000000000bb";
Expand Down
7 changes: 4 additions & 3 deletions rust/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use derive_new::new;
use eyre::{Context, Result};
use hyperlane_base::db::HyperlaneRocksDB;
use prometheus::{IntCounter, IntGauge};
use tracing::{debug, error, info, instrument, trace, warn};

use hyperlane_base::{db::HyperlaneDB, CachingMailbox, CoreMetrics};
use hyperlane_base::CoreMetrics;
use hyperlane_core::{HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, U256};

use super::{
Expand All @@ -29,9 +30,9 @@ const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
/// instance is for a unique origin -> destination pairing.
pub struct MessageContext {
/// Mailbox on the destination chain.
pub destination_mailbox: CachingMailbox,
pub destination_mailbox: Arc<dyn Mailbox>,
/// Origin chain database to verify gas payments.
pub origin_db: HyperlaneDB,
pub origin_db: HyperlaneRocksDB,
/// Used to construct the ISM metadata needed to verify a message from the
/// origin.
pub metadata_builder: BaseMetadataBuilder,
Expand Down
6 changes: 3 additions & 3 deletions rust/agents/relayer/src/msg/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{
};
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};

use hyperlane_base::{db::HyperlaneDB, CoreMetrics};
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};

use crate::msg::pending_operation::DynPendingOperation;
Expand All @@ -22,7 +22,7 @@ use super::pending_message::*;
/// for to the appropriate destination.
#[derive(new)]
pub struct MessageProcessor {
db: HyperlaneDB,
db: HyperlaneRocksDB,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
metrics: MessageProcessorMetrics,
Expand Down Expand Up @@ -62,7 +62,7 @@ impl MessageProcessor {

#[instrument(ret, err, skip(self), level = "info")]
async fn main_loop(mut self) -> Result<()> {
// Forever, scan HyperlaneDB looking for new messages to send. When criteria are
// Forever, scan HyperlaneRocksDB looking for new messages to send. When criteria are
// satisfied or the message is disqualified, push the message onto
// self.tx_msg and then continue the scan at the next highest
// nonce.
Expand Down
2 changes: 1 addition & 1 deletion rust/agents/relayer/src/msg/serial_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type OpQueue = Arc<Mutex<BinaryHeap<Reverse<Box<DynPendingOperation>>>>>;
///
/// Finally, the SerialSubmitter ensures that message delivery is robust to
/// destination chain reorgs prior to committing delivery status to
/// HyperlaneDB.
/// HyperlaneRocksDB.
///
///
/// Objectives
Expand Down
113 changes: 68 additions & 45 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use async_trait::async_trait;
use eyre::Result;
use hyperlane_base::{MessageContractSync, WatermarkContractSync};
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
sync::{
Expand All @@ -17,11 +18,10 @@ use tokio::{
use tracing::{info, info_span, instrument::Instrumented, Instrument};

use hyperlane_base::{
db::{HyperlaneDB, DB},
run_all, BaseAgent, CachingInterchainGasPaymaster, CachingMailbox, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::{HyperlaneDomain, U256};
use hyperlane_core::{HyperlaneDomain, InterchainGasPayment, U256};

use crate::msg::pending_message::MessageSubmissionMetrics;
use crate::{
Expand All @@ -48,15 +48,14 @@ pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<MessageContractSync>>,
interchain_gas_payment_syncs:
HashMap<HyperlaneDomain, Arc<WatermarkContractSync<InterchainGasPayment>>>,
/// Context data for each (origin, destination) chain pair a message can be
/// sent between
msg_ctxs: HashMap<ContextKey, Arc<MessageContext>>,
/// Mailboxes for all chains (technically only need caching mailbox for
/// origin chains)
mailboxes: HashMap<HyperlaneDomain, CachingMailbox>,
/// Interchain gas paymaster for each origin chain
interchain_gas_paymasters: HashMap<HyperlaneDomain, CachingInterchainGasPaymaster>,
prover_syncs: HashMap<HyperlaneDomain, Arc<RwLock<MerkleTreeBuilder>>>,
dbs: HashMap<HyperlaneDomain, HyperlaneRocksDB>,
whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
transaction_gas_limit: Option<U256>,
Expand Down Expand Up @@ -99,26 +98,40 @@ impl BaseAgent for Relayer {
{
let core = settings.build_hyperlane_core(metrics.clone());
let db = DB::from_path(&settings.db)?;

// Use defined origin chains and remote chains
let domains = settings
let dbs = settings
.origin_chains
.iter()
.chain(&settings.destination_chains)
.collect::<HashSet<_>>();
.map(|origin| (origin.clone(), HyperlaneRocksDB::new(origin, db.clone())))
.collect::<HashMap<_, _>>();

let mailboxes = settings
.build_all_mailboxes(domains.into_iter(), &metrics, db.clone())
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.await?;
let interchain_gas_paymasters = settings
.build_all_interchain_gas_paymasters(
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.await?;

let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));

let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
db.clone(),
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.collect(),
)
.await?;
let validator_announces = settings
.build_all_validator_announces(settings.origin_chains.iter(), &metrics)
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
.collect(),
)
.await?;

let whitelist = Arc::new(settings.whitelist);
Expand All @@ -139,7 +152,7 @@ impl BaseAgent for Relayer {
.origin_chains
.iter()
.map(|origin| {
let db = HyperlaneDB::new(origin, db.clone());
let db = dbs.get(origin).unwrap().clone();
(
origin.clone(),
Arc::new(RwLock::new(MerkleTreeBuilder::new(db))),
Expand All @@ -148,6 +161,7 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();

info!(gas_enforcement_policies=?settings.gas_payment_enforcement, "Gas enforcement configuration");

// need one of these per origin chain due to the database scoping even though
// the config itself is the same
let gas_payment_enforcers: HashMap<_, _> = settings
Expand All @@ -158,7 +172,7 @@ impl BaseAgent for Relayer {
domain.clone(),
Arc::new(GasPaymentEnforcer::new(
settings.gas_payment_enforcement.clone(),
HyperlaneDB::new(domain, db.clone()),
dbs.get(domain).unwrap().clone(),
)),
)
})
Expand Down Expand Up @@ -192,7 +206,7 @@ impl BaseAgent for Relayer {
},
Arc::new(MessageContext {
destination_mailbox: mailboxes[destination].clone(),
origin_db: HyperlaneDB::new(origin, db.clone()),
origin_db: dbs.get(origin).unwrap().clone(),
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
Expand All @@ -203,12 +217,13 @@ impl BaseAgent for Relayer {
}

Ok(Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
msg_ctxs,
core,
mailboxes,
interchain_gas_paymasters,
message_syncs,
interchain_gas_payment_syncs,
prover_syncs,
whitelist,
blacklist,
Expand All @@ -232,10 +247,9 @@ impl BaseAgent for Relayer {
tasks.push(self.run_destination_submitter(destination, receive_channel));
}

let sync_metrics = ContractSyncMetrics::new(self.core.metrics.clone());
for origin in &self.origin_chains {
tasks.push(self.run_origin_mailbox_sync(origin, sync_metrics.clone()));
tasks.push(self.run_interchain_gas_paymaster_sync(origin, sync_metrics.clone()));
tasks.push(self.run_message_sync(origin).await);
tasks.push(self.run_interchain_gas_payment_sync(origin).await);
}

// each message process attempts to send messages from a chain
Expand All @@ -248,28 +262,37 @@ impl BaseAgent for Relayer {
}

impl Relayer {
fn run_origin_mailbox_sync(
async fn run_message_sync(
&self,
origin: &HyperlaneDomain,
sync_metrics: ContractSyncMetrics,
) -> Instrumented<JoinHandle<Result<()>>> {
let sync = self.mailboxes[origin].sync(
self.as_ref().settings.chains[origin.name()].index.clone(),
sync_metrics,
);
sync
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.message_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
tokio::spawn(async move {
contract_sync
.clone()
.sync("dispatched_messages", cursor)
.await
})
.instrument(info_span!("ContractSync"))
}

fn run_interchain_gas_paymaster_sync(
async fn run_interchain_gas_payment_sync(
&self,
origin: &HyperlaneDomain,
sync_metrics: ContractSyncMetrics,
) -> Instrumented<JoinHandle<Result<()>>> {
let sync = self.interchain_gas_paymasters[origin].sync(
self.as_ref().settings.chains[origin.name()].index.clone(),
sync_metrics,
);
sync
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self
.interchain_gas_payment_syncs
.get(origin)
.unwrap()
.clone();
let cursor = contract_sync.rate_limited_cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await })
.instrument(info_span!("ContractSync"))
}

fn run_message_processor(
Expand Down Expand Up @@ -298,7 +321,7 @@ impl Relayer {
})
.collect();
let message_processor = MessageProcessor::new(
self.mailboxes[origin].db().clone(),
self.dbs.get(origin).unwrap().clone(),
self.whitelist.clone(),
self.blacklist.clone(),
metrics,
Expand Down
Loading