Skip to content

A Rust-based Hyperlane cross-chain message monitor that demonstrates concurrent monitoring of message dispatches across multiple blockchain networks using async Tokio operations.

Notifications You must be signed in to change notification settings

mstampfer/hyperlane-monitor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Hyperlane Cross-Chain Message Monitor

A Rust-based monitoring application that demonstrates concurrent tracking of Hyperlane cross-chain message dispatches across multiple blockchain networks using async Tokio operations.

Overview

This application monitors Hyperlane's Mailbox contracts on multiple chains to detect when messages are dispatched (sent) and processed (received). It provides real-time event tracking, historical scanning, and the ability to send test messages to the network.

Key Features

  • Concurrent Multi-Chain Monitoring: Independently monitor multiple blockchain networks in parallel
  • Real-Time Event Detection: Track Dispatch and Process events as they occur on-chain
  • Historical Block Scanning: Query specific block ranges for past events
  • Message Simulation: Send real cross-chain messages to the Hyperlane network
  • Thread-Safe State Management: Centralized message tracking with concurrent access support
  • Comprehensive Logging: Detailed structured logging using the tracing crate

Architecture

The application is built around three core components that work together to provide comprehensive cross-chain message monitoring:

1. MessageTracker (src/main.rs:91-160)

Purpose: Centralized, thread-safe storage for cross-chain messages across all monitored chains.

Key Characteristics:

  • Uses Arc<RwLock<HashMap<String, MessageState>>> for concurrent access
  • Maintains a global view of all messages in flight across the system
  • Thread-safe: Multiple chain monitors can record events simultaneously
  • Supports multiple events per message (dispatch + delivery confirmation)

Key Methods:

  • record_message(): Records new dispatch events, appending to existing entries or creating new ones
  • get_message(): Retrieves current state of a specific message by ID (test-only)
  • list_pending_messages(): Returns all unprocessed messages (test-only)

Data Structures:

struct MessageState {
    events: Vec<DispatchEvent>,  // All related events
    processed: bool,              // Delivery status
}

struct DispatchEvent {
    message_id: String,           // Unique identifier (32-byte hash)
    destination_domain: u32,      // Target chain's Hyperlane domain ID
    recipient: String,            // Recipient contract address
    message: String,              // Encoded message payload
    block_number: u64,           // Block where dispatch occurred
    timestamp: u64,              // Unix timestamp
}

2. ChainMonitor (src/main.rs:162-213)

Purpose: Per-chain monitoring and event detection for a single blockchain.

Key Characteristics:

  • Each monitor maintains its own RPC connection via ethers.rs Provider
  • Monitors the Hyperlane Mailbox contract for both Dispatch and Process events
  • Runs independently in its own async task
  • Polls for new events every 12 seconds
  • Reports findings to the shared MessageTracker

Configuration per Chain:

  • Chain name (e.g., "sepolia", "arbitrum-sepolia")
  • RPC endpoint URL
  • Mailbox contract address
  • Reference to shared MessageTracker

Event Detection:

  • Dispatch Events: Dispatch(address,uint32,bytes32,bytes) - messages sent FROM this chain
  • Process Events: ProcessId(bytes32) - messages received ON this chain

3. RelayCoordinator (src/main.rs:215-276)

Purpose: Orchestrates concurrent monitoring across multiple chains.

Key Characteristics:

  • Spawns independent async tasks for each ChainMonitor
  • Enables true parallel monitoring using Tokio's async runtime
  • Manages the lifecycle of all monitoring tasks
  • Provides unified operations across all chains

Concurrency Model:

// Each chain gets its own async task
for monitor in &coordinator.monitors {
    tokio::spawn(async move {
        // Independent monitoring loop
        loop {
            // Check for new blocks
            // Query Dispatch events
            // Query Process events
            tokio::time::sleep(Duration::from_secs(12)).await;
        }
    });
}

Installation

Prerequisites

  • Rust 1.70+: Install from rustup.rs
  • Git: For cloning the repository
  • Testnet ETH: Required for the simulate command (Sepolia testnet)

Setup

  1. Clone the repository:
git clone <repository-url>
cd hyperlane-monitor
  1. Create environment file (optional):
cp .env.example .env
  1. Configure RPC endpoints (optional):

Edit .env to set custom RPC URLs:

SEPOLIA_RPC_URL=https://your-sepolia-rpc-url
ARBITRUM_SEPOLIA_RPC_URL=https://your-arbitrum-sepolia-rpc-url

Default values (if not set):

  • Sepolia: https://ethereum-sepolia-rpc.publicnode.com
  • Arbitrum Sepolia: https://sepolia-rollup.arbitrum.io/rpc
  1. Build the project:
cargo build --release

Usage

The application provides four main commands:

1. Monitor Mode

Continuously monitor all configured chains for real-time events:

cargo run -- monitor

What it does:

  • Polls for new blocks every 12 seconds on each chain
  • Detects Dispatch events (messages sent FROM the chain)
  • Detects Process events (messages received ON the chain)
  • Runs indefinitely until stopped with Ctrl+C
  • Logs detailed information about all discovered events

Example Output:

sepolia: Checking blocks 9628401 to 9628403 for events
sepolia: Scanned 3 blocks, found 0 Dispatch events
sepolia: No Dispatch events in blocks 9628401 to 9628403
sepolia: No Process events in blocks 9628401 to 9628403
arbitrum-sepolia: Checking blocks 12345678 to 12345680 for events

2. Simulate Mode

Send a real Hyperlane message dispatch transaction to the network:

export PRIVATE_KEY=0x...
cargo run -- simulate

What it does:

  • Connects to Sepolia using your private key
  • Queries the required protocol fee from the Mailbox contract
  • Checks your wallet balance
  • Sends a dispatch transaction with the correct fee
  • Waits for confirmation and provides Etherscan links
  • Creates a unique message with timestamp to avoid duplicates

Requirements:

  • PRIVATE_KEY environment variable must be set
  • Sufficient Sepolia testnet ETH (for gas + protocol fee, typically ~0.001 ETH)
  • No pending transactions on the account

Example Output:

Using wallet address: 0x1234...
Wallet balance: 100000000000000000 wei (0.1 ETH)
Preparing to dispatch message:
  Destination domain: 421614
  Recipient: 0x74fAe95e4227653588631B55941e84633C181649
  Message: Hello from Hyperlane V3! Timestamp: 1234567890
Required protocol fee: 50000000000000 wei (0.00005 ETH)
Transaction sent! Hash: 0xabc123...
View on Etherscan: https://sepolia.etherscan.io/tx/0xabc123...
✓ Transaction confirmed in block: 9628456

3. Scan Mode

Scan a specific historical block range for events:

cargo run -- scan --chain sepolia --from 9628400 --to 9628900

Parameters:

  • --chain: Chain name (sepolia or arbitrum-sepolia)
  • --from: Starting block number
  • --to: Ending block number

What it does:

  • Queries the specified block range for all Dispatch and Process events
  • Shows detailed information about each event found
  • Validates the block range against the RPC's current block
  • Automatically expands single-block queries for RPC compatibility

Use Cases:

  • Debugging specific transactions
  • Verifying event detection logic
  • Historical data analysis
  • Confirming message delivery

Example Output:

Connecting to sepolia at https://ethereum-sepolia-rpc.publicnode.com
Mailbox address: 0xfFAEF09B3cd11D9b20d1a19bECca54EEC2884766
Current block: 9629000, scanning range: 9628400 to 9628900
Found 5 Dispatch events
Event #1: Block 9628456, TX: 0xabc123...
  Topics: 4
  Data length: 128 bytes

4. Test Mode

Run the test suite:

cargo run -- test

Alternatively, run tests directly with cargo:

cargo test
cargo test -- --nocapture  # Show output
cargo test test_message_tracker  # Run specific test

Configuration

Environment Variables

Variable Description Default
SEPOLIA_RPC_URL Sepolia RPC endpoint https://ethereum-sepolia-rpc.publicnode.com
ARBITRUM_SEPOLIA_RPC_URL Arbitrum Sepolia RPC endpoint https://sepolia-rollup.arbitrum.io/rpc
PRIVATE_KEY Private key for simulate command None (required for simulate)

Chain Configuration

Hardcoded in get_chain_configs() (src/main.rs:743-764):

Sepolia:

  • Mailbox Address: 0xfFAEF09B3cd11D9b20d1a19bECca54EEC2884766
  • Domain ID: 11155111

Arbitrum Sepolia:

  • Mailbox Address: 0x598facE78a4302f11E3de0bee1894Da0b2Cb71F8
  • Domain ID: 421614

Monitoring Configuration

  • Polling Interval: 12 seconds (hardcoded in run_monitor_continuous())
  • Transaction Timeout: 60 seconds for simulate command
  • Log Level: Configured via RUST_LOG environment variable
# Enable debug logging
RUST_LOG=debug cargo run -- monitor

# Enable trace-level logging
RUST_LOG=trace cargo run -- monitor

Data Flow

The application follows this event flow:

  1. Initialization:

    • RelayCoordinator creates shared MessageTracker
    • One ChainMonitor is initialized per configured chain
    • Each monitor gets its own RPC provider connection
  2. Concurrent Monitoring:

    • Each ChainMonitor spawns as an independent Tokio task
    • Monitors run in parallel, polling their respective chains
    • No blocking between chains - true concurrent operation
  3. Event Detection:

    • Monitor queries new blocks for Dispatch/Process events
    • Events are parsed from contract logs using ethers.rs filters
    • Event data is extracted from topics and data fields
  4. State Recording:

    • ChainMonitor calls MessageTracker::record_message()
    • MessageTracker acquires write lock (thread-safe)
    • Event is appended to message's event list
    • Write lock is released
  5. State Access:

    • Other components can query MessageTracker (test functions)
    • Read locks allow concurrent readers
    • Write locks are exclusive but brief

Implementation Details

Event Parsing

Dispatch Event Signature:

event Dispatch(
    address indexed sender,
    uint32 indexed destination,
    bytes32 indexed recipient,
    bytes message
)

Parsing Logic (src/main.rs:324-375):

let dispatch_filter = Filter::new()
    .address(mailbox_address)
    .event("Dispatch(address,uint32,bytes32,bytes)")
    .from_block(last_block + 1)
    .to_block(current_block);

let logs = provider.get_logs(&dispatch_filter).await?;

for log in logs {
    // Message ID from transaction hash
    let message_id = format!("0x{}", hex::encode(tx_hash));

    // Destination domain from topics[2]
    let destination = u32::from_be_bytes([
        log.topics[2].as_bytes()[28],
        log.topics[2].as_bytes()[29],
        log.topics[2].as_bytes()[30],
        log.topics[2].as_bytes()[31],
    ]);

    // Recipient from topics[3]
    let recipient = &log.topics[3].as_bytes()[12..32];

    // Message body from data field
    let message = format!("0x{}", hex::encode(&log.data));
}

Transaction Construction

The simulate command demonstrates proper Hyperlane message dispatch:

  1. Fee Querying (src/main.rs:499-536):
// Call quoteDispatch(uint32, bytes32, bytes) to get required fee
let quote_calldata = encode_function_call(
    "quoteDispatch",
    destination_domain,
    recipient_bytes32,
    message_body
);

let fee_bytes = provider.call(&quote_tx.into(), None).await?;
let required_fee = U256::from_big_endian(&fee_bytes);
  1. Transaction Sending (src/main.rs:547-618):
// Call dispatch(uint32, bytes32, bytes) with fee as value
let calldata = encode_function_call(
    "dispatch",
    destination_domain,
    recipient_bytes32,
    message_body
);

let tx = TransactionRequest::new()
    .to(mailbox_address)
    .data(calldata)
    .value(required_fee);  // Protocol fee paid in ETH

client.send_transaction(tx, None).await?;

Error Handling

The application includes comprehensive error handling:

  • RPC Errors: Logged with retry on next poll cycle
  • Transaction Errors: Specific handling for common cases:
    • "replacement transaction underpriced" → Wait for pending tx
    • "already known" → Duplicate transaction in mempool
    • Insufficient balance → Clear error message
  • Synchronization Issues: Detects when RPC is behind the chain
  • Timeout Handling: 60-second timeout for transaction confirmation

Testing

The test suite validates core functionality using Tokio's async test framework.

Test Cases

1. Basic Message Tracking (test_message_tracker):

#[tokio::test]
async fn test_message_tracker() {
    // Tests:
    // - Message recording
    // - Message retrieval by ID
    // - Event list maintenance
}

2. Concurrent Recording (test_concurrent_message_recording):

#[tokio::test]
async fn test_concurrent_message_recording() {
    // Tests:
    // - 10 concurrent tasks recording messages
    // - Thread-safe access via RwLock
    // - No data races or lost updates
    // - All messages correctly recorded
}

Running Tests

# Run all tests
cargo test

# Run with output visible
cargo test -- --nocapture

# Run specific test
cargo test test_message_tracker

# Run tests in module
cargo test tests::

Dependencies

Core dependencies and their purposes:

Crate Version Purpose
tokio 1.42 Async runtime with full features
ethers 2.0 Ethereum RPC interaction and contract types
serde 1.0 Message serialization
serde_json 1.0 JSON handling
tracing 0.1 Structured logging
tracing-subscriber 0.3 Log output formatting
anyhow 1.0 Error handling
clap 4.5 Command-line argument parsing
dotenv 0.15 Environment file loading
hex 0.4 Hexadecimal encoding/decoding

Development

Build Commands

# Development build
cargo build

# Release build (optimized)
cargo build --release

# Check code without building
cargo check

# Format code
cargo fmt

# Lint with Clippy
cargo clippy

# Run application
cargo run -- <command>

Adding New Chains

To add support for a new chain:

  1. Update get_chain_configs() in src/main.rs:
fn get_chain_configs() -> Vec<(String, String, String)> {
    vec![
        // Existing chains...
        (
            "new-chain".to_string(),
            std::env::var("NEW_CHAIN_RPC_URL").unwrap_or_else(|_| "https://rpc.new-chain.com".to_string()),
            "0xMAILBOX_ADDRESS".to_string(),
        ),
    ]
}
  1. Update the scan command to recognize the new chain name

  2. Add environment variable documentation to README

Logging Levels

The application uses structured logging with these levels:

  • error!: Critical failures (RPC errors, transaction failures)
  • warn!: Important notices (pending transactions, RPC sync issues)
  • info!: Standard operation logs (events detected, block scanning)
  • debug!: Detailed debugging (no new blocks, intermediate steps)
  • trace!: Extremely verbose (not currently used)

Future Enhancements

Potential improvements for production use:

High Priority

  • WebSocket Support: Replace polling with real-time event streaming
  • Persistent Storage: Database for message history and state
  • Message Delivery Tracking: Complete lifecycle from dispatch to delivery
  • Status Dashboard: Web UI for monitoring state
  • Metrics and Alerting: Prometheus metrics, alert on failures

Medium Priority

  • Additional Chains: Support mainnet and other testnets
  • Retry Logic: Exponential backoff for failed RPC calls
  • Message Validation: Verify message format and signatures
  • Performance Optimization: Batch RPC calls, connection pooling
  • Configuration File: YAML/TOML config instead of environment variables

Low Priority

  • GraphQL API: Query message state via GraphQL
  • Docker Support: Containerized deployment
  • CI/CD Pipeline: Automated testing and releases
  • Message Relaying: Automatically relay messages between chains
  • Gas Optimization: Dynamic gas pricing for simulate command

Troubleshooting

Common Issues

1. RPC endpoint is behind the chain:

Error: RPC endpoint is 5 blocks behind

Solution: Wait a few minutes or use a different RPC endpoint

2. Replacement transaction underpriced:

Error: replacement transaction underpriced

Solution: Wait for the pending transaction to confirm (1-2 minutes)

3. Insufficient balance:

Error: Insufficient balance. Need X wei but only have Y wei

Solution: Get more testnet ETH from a Sepolia faucet

4. Private key not set:

Error: PRIVATE_KEY not configured

Solution: Export your private key before running simulate

5. Cannot connect to RPC:

Error: error sending request for url

Solution: Check your internet connection and RPC URL

Resources

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

A Rust-based Hyperlane cross-chain message monitor that demonstrates concurrent monitoring of message dispatches across multiple blockchain networks using async Tokio operations.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages