A Rust-based monitoring application that demonstrates concurrent tracking of Hyperlane cross-chain message dispatches across multiple blockchain networks using async Tokio operations.
This application monitors Hyperlane's Mailbox contracts on multiple chains to detect when messages are dispatched (sent) and processed (received). It provides real-time event tracking, historical scanning, and the ability to send test messages to the network.
- Concurrent Multi-Chain Monitoring: Independently monitor multiple blockchain networks in parallel
- Real-Time Event Detection: Track Dispatch and Process events as they occur on-chain
- Historical Block Scanning: Query specific block ranges for past events
- Message Simulation: Send real cross-chain messages to the Hyperlane network
- Thread-Safe State Management: Centralized message tracking with concurrent access support
- Comprehensive Logging: Detailed structured logging using the
tracingcrate
The application is built around three core components that work together to provide comprehensive cross-chain message monitoring:
Purpose: Centralized, thread-safe storage for cross-chain messages across all monitored chains.
Key Characteristics:
- Uses
Arc<RwLock<HashMap<String, MessageState>>>for concurrent access - Maintains a global view of all messages in flight across the system
- Thread-safe: Multiple chain monitors can record events simultaneously
- Supports multiple events per message (dispatch + delivery confirmation)
Key Methods:
record_message(): Records new dispatch events, appending to existing entries or creating new onesget_message(): Retrieves current state of a specific message by ID (test-only)list_pending_messages(): Returns all unprocessed messages (test-only)
Data Structures:
struct MessageState {
events: Vec<DispatchEvent>, // All related events
processed: bool, // Delivery status
}
struct DispatchEvent {
message_id: String, // Unique identifier (32-byte hash)
destination_domain: u32, // Target chain's Hyperlane domain ID
recipient: String, // Recipient contract address
message: String, // Encoded message payload
block_number: u64, // Block where dispatch occurred
timestamp: u64, // Unix timestamp
}Purpose: Per-chain monitoring and event detection for a single blockchain.
Key Characteristics:
- Each monitor maintains its own RPC connection via ethers.rs Provider
- Monitors the Hyperlane Mailbox contract for both Dispatch and Process events
- Runs independently in its own async task
- Polls for new events every 12 seconds
- Reports findings to the shared MessageTracker
Configuration per Chain:
- Chain name (e.g., "sepolia", "arbitrum-sepolia")
- RPC endpoint URL
- Mailbox contract address
- Reference to shared MessageTracker
Event Detection:
- Dispatch Events:
Dispatch(address,uint32,bytes32,bytes)- messages sent FROM this chain - Process Events:
ProcessId(bytes32)- messages received ON this chain
Purpose: Orchestrates concurrent monitoring across multiple chains.
Key Characteristics:
- Spawns independent async tasks for each ChainMonitor
- Enables true parallel monitoring using Tokio's async runtime
- Manages the lifecycle of all monitoring tasks
- Provides unified operations across all chains
Concurrency Model:
// Each chain gets its own async task
for monitor in &coordinator.monitors {
tokio::spawn(async move {
// Independent monitoring loop
loop {
// Check for new blocks
// Query Dispatch events
// Query Process events
tokio::time::sleep(Duration::from_secs(12)).await;
}
});
}- Rust 1.70+: Install from rustup.rs
- Git: For cloning the repository
- Testnet ETH: Required for the
simulatecommand (Sepolia testnet)
- Clone the repository:
git clone <repository-url>
cd hyperlane-monitor- Create environment file (optional):
cp .env.example .env- Configure RPC endpoints (optional):
Edit .env to set custom RPC URLs:
SEPOLIA_RPC_URL=https://your-sepolia-rpc-url
ARBITRUM_SEPOLIA_RPC_URL=https://your-arbitrum-sepolia-rpc-urlDefault values (if not set):
- Sepolia:
https://ethereum-sepolia-rpc.publicnode.com - Arbitrum Sepolia:
https://sepolia-rollup.arbitrum.io/rpc
- Build the project:
cargo build --releaseThe application provides four main commands:
Continuously monitor all configured chains for real-time events:
cargo run -- monitorWhat it does:
- Polls for new blocks every 12 seconds on each chain
- Detects Dispatch events (messages sent FROM the chain)
- Detects Process events (messages received ON the chain)
- Runs indefinitely until stopped with Ctrl+C
- Logs detailed information about all discovered events
Example Output:
sepolia: Checking blocks 9628401 to 9628403 for events
sepolia: Scanned 3 blocks, found 0 Dispatch events
sepolia: No Dispatch events in blocks 9628401 to 9628403
sepolia: No Process events in blocks 9628401 to 9628403
arbitrum-sepolia: Checking blocks 12345678 to 12345680 for events
Send a real Hyperlane message dispatch transaction to the network:
export PRIVATE_KEY=0x...
cargo run -- simulateWhat it does:
- Connects to Sepolia using your private key
- Queries the required protocol fee from the Mailbox contract
- Checks your wallet balance
- Sends a dispatch transaction with the correct fee
- Waits for confirmation and provides Etherscan links
- Creates a unique message with timestamp to avoid duplicates
Requirements:
PRIVATE_KEYenvironment variable must be set- Sufficient Sepolia testnet ETH (for gas + protocol fee, typically ~0.001 ETH)
- No pending transactions on the account
Example Output:
Using wallet address: 0x1234...
Wallet balance: 100000000000000000 wei (0.1 ETH)
Preparing to dispatch message:
Destination domain: 421614
Recipient: 0x74fAe95e4227653588631B55941e84633C181649
Message: Hello from Hyperlane V3! Timestamp: 1234567890
Required protocol fee: 50000000000000 wei (0.00005 ETH)
Transaction sent! Hash: 0xabc123...
View on Etherscan: https://sepolia.etherscan.io/tx/0xabc123...
✓ Transaction confirmed in block: 9628456
Scan a specific historical block range for events:
cargo run -- scan --chain sepolia --from 9628400 --to 9628900Parameters:
--chain: Chain name (sepoliaorarbitrum-sepolia)--from: Starting block number--to: Ending block number
What it does:
- Queries the specified block range for all Dispatch and Process events
- Shows detailed information about each event found
- Validates the block range against the RPC's current block
- Automatically expands single-block queries for RPC compatibility
Use Cases:
- Debugging specific transactions
- Verifying event detection logic
- Historical data analysis
- Confirming message delivery
Example Output:
Connecting to sepolia at https://ethereum-sepolia-rpc.publicnode.com
Mailbox address: 0xfFAEF09B3cd11D9b20d1a19bECca54EEC2884766
Current block: 9629000, scanning range: 9628400 to 9628900
Found 5 Dispatch events
Event #1: Block 9628456, TX: 0xabc123...
Topics: 4
Data length: 128 bytes
Run the test suite:
cargo run -- testAlternatively, run tests directly with cargo:
cargo test
cargo test -- --nocapture # Show output
cargo test test_message_tracker # Run specific test| Variable | Description | Default |
|---|---|---|
SEPOLIA_RPC_URL |
Sepolia RPC endpoint | https://ethereum-sepolia-rpc.publicnode.com |
ARBITRUM_SEPOLIA_RPC_URL |
Arbitrum Sepolia RPC endpoint | https://sepolia-rollup.arbitrum.io/rpc |
PRIVATE_KEY |
Private key for simulate command | None (required for simulate) |
Hardcoded in get_chain_configs() (src/main.rs:743-764):
Sepolia:
- Mailbox Address:
0xfFAEF09B3cd11D9b20d1a19bECca54EEC2884766 - Domain ID: 11155111
Arbitrum Sepolia:
- Mailbox Address:
0x598facE78a4302f11E3de0bee1894Da0b2Cb71F8 - Domain ID: 421614
- Polling Interval: 12 seconds (hardcoded in
run_monitor_continuous()) - Transaction Timeout: 60 seconds for simulate command
- Log Level: Configured via
RUST_LOGenvironment variable
# Enable debug logging
RUST_LOG=debug cargo run -- monitor
# Enable trace-level logging
RUST_LOG=trace cargo run -- monitorThe application follows this event flow:
-
Initialization:
- RelayCoordinator creates shared MessageTracker
- One ChainMonitor is initialized per configured chain
- Each monitor gets its own RPC provider connection
-
Concurrent Monitoring:
- Each ChainMonitor spawns as an independent Tokio task
- Monitors run in parallel, polling their respective chains
- No blocking between chains - true concurrent operation
-
Event Detection:
- Monitor queries new blocks for Dispatch/Process events
- Events are parsed from contract logs using ethers.rs filters
- Event data is extracted from topics and data fields
-
State Recording:
- ChainMonitor calls
MessageTracker::record_message() - MessageTracker acquires write lock (thread-safe)
- Event is appended to message's event list
- Write lock is released
- ChainMonitor calls
-
State Access:
- Other components can query MessageTracker (test functions)
- Read locks allow concurrent readers
- Write locks are exclusive but brief
Dispatch Event Signature:
event Dispatch(
address indexed sender,
uint32 indexed destination,
bytes32 indexed recipient,
bytes message
)Parsing Logic (src/main.rs:324-375):
let dispatch_filter = Filter::new()
.address(mailbox_address)
.event("Dispatch(address,uint32,bytes32,bytes)")
.from_block(last_block + 1)
.to_block(current_block);
let logs = provider.get_logs(&dispatch_filter).await?;
for log in logs {
// Message ID from transaction hash
let message_id = format!("0x{}", hex::encode(tx_hash));
// Destination domain from topics[2]
let destination = u32::from_be_bytes([
log.topics[2].as_bytes()[28],
log.topics[2].as_bytes()[29],
log.topics[2].as_bytes()[30],
log.topics[2].as_bytes()[31],
]);
// Recipient from topics[3]
let recipient = &log.topics[3].as_bytes()[12..32];
// Message body from data field
let message = format!("0x{}", hex::encode(&log.data));
}The simulate command demonstrates proper Hyperlane message dispatch:
- Fee Querying (
src/main.rs:499-536):
// Call quoteDispatch(uint32, bytes32, bytes) to get required fee
let quote_calldata = encode_function_call(
"quoteDispatch",
destination_domain,
recipient_bytes32,
message_body
);
let fee_bytes = provider.call("e_tx.into(), None).await?;
let required_fee = U256::from_big_endian(&fee_bytes);- Transaction Sending (
src/main.rs:547-618):
// Call dispatch(uint32, bytes32, bytes) with fee as value
let calldata = encode_function_call(
"dispatch",
destination_domain,
recipient_bytes32,
message_body
);
let tx = TransactionRequest::new()
.to(mailbox_address)
.data(calldata)
.value(required_fee); // Protocol fee paid in ETH
client.send_transaction(tx, None).await?;The application includes comprehensive error handling:
- RPC Errors: Logged with retry on next poll cycle
- Transaction Errors: Specific handling for common cases:
- "replacement transaction underpriced" → Wait for pending tx
- "already known" → Duplicate transaction in mempool
- Insufficient balance → Clear error message
- Synchronization Issues: Detects when RPC is behind the chain
- Timeout Handling: 60-second timeout for transaction confirmation
The test suite validates core functionality using Tokio's async test framework.
1. Basic Message Tracking (test_message_tracker):
#[tokio::test]
async fn test_message_tracker() {
// Tests:
// - Message recording
// - Message retrieval by ID
// - Event list maintenance
}2. Concurrent Recording (test_concurrent_message_recording):
#[tokio::test]
async fn test_concurrent_message_recording() {
// Tests:
// - 10 concurrent tasks recording messages
// - Thread-safe access via RwLock
// - No data races or lost updates
// - All messages correctly recorded
}# Run all tests
cargo test
# Run with output visible
cargo test -- --nocapture
# Run specific test
cargo test test_message_tracker
# Run tests in module
cargo test tests::Core dependencies and their purposes:
| Crate | Version | Purpose |
|---|---|---|
tokio |
1.42 | Async runtime with full features |
ethers |
2.0 | Ethereum RPC interaction and contract types |
serde |
1.0 | Message serialization |
serde_json |
1.0 | JSON handling |
tracing |
0.1 | Structured logging |
tracing-subscriber |
0.3 | Log output formatting |
anyhow |
1.0 | Error handling |
clap |
4.5 | Command-line argument parsing |
dotenv |
0.15 | Environment file loading |
hex |
0.4 | Hexadecimal encoding/decoding |
# Development build
cargo build
# Release build (optimized)
cargo build --release
# Check code without building
cargo check
# Format code
cargo fmt
# Lint with Clippy
cargo clippy
# Run application
cargo run -- <command>To add support for a new chain:
- Update
get_chain_configs()insrc/main.rs:
fn get_chain_configs() -> Vec<(String, String, String)> {
vec![
// Existing chains...
(
"new-chain".to_string(),
std::env::var("NEW_CHAIN_RPC_URL").unwrap_or_else(|_| "https://rpc.new-chain.com".to_string()),
"0xMAILBOX_ADDRESS".to_string(),
),
]
}-
Update the scan command to recognize the new chain name
-
Add environment variable documentation to README
The application uses structured logging with these levels:
error!: Critical failures (RPC errors, transaction failures)warn!: Important notices (pending transactions, RPC sync issues)info!: Standard operation logs (events detected, block scanning)debug!: Detailed debugging (no new blocks, intermediate steps)trace!: Extremely verbose (not currently used)
Potential improvements for production use:
- WebSocket Support: Replace polling with real-time event streaming
- Persistent Storage: Database for message history and state
- Message Delivery Tracking: Complete lifecycle from dispatch to delivery
- Status Dashboard: Web UI for monitoring state
- Metrics and Alerting: Prometheus metrics, alert on failures
- Additional Chains: Support mainnet and other testnets
- Retry Logic: Exponential backoff for failed RPC calls
- Message Validation: Verify message format and signatures
- Performance Optimization: Batch RPC calls, connection pooling
- Configuration File: YAML/TOML config instead of environment variables
- GraphQL API: Query message state via GraphQL
- Docker Support: Containerized deployment
- CI/CD Pipeline: Automated testing and releases
- Message Relaying: Automatically relay messages between chains
- Gas Optimization: Dynamic gas pricing for simulate command
1. RPC endpoint is behind the chain:
Error: RPC endpoint is 5 blocks behind
Solution: Wait a few minutes or use a different RPC endpoint
2. Replacement transaction underpriced:
Error: replacement transaction underpriced
Solution: Wait for the pending transaction to confirm (1-2 minutes)
3. Insufficient balance:
Error: Insufficient balance. Need X wei but only have Y wei
Solution: Get more testnet ETH from a Sepolia faucet
4. Private key not set:
Error: PRIVATE_KEY not configured
Solution: Export your private key before running simulate
5. Cannot connect to RPC:
Error: error sending request for url
Solution: Check your internet connection and RPC URL
- Hyperlane Documentation: docs.hyperlane.xyz
- Sepolia Faucet: sepoliafaucet.com
- Ethers.rs Documentation: docs.rs/ethers
- Tokio Documentation: tokio.rs
This project is licensed under the MIT License - see the LICENSE file for details.