Skip to content

Commit

Permalink
Generics (#136)
Browse files Browse the repository at this point in the history
* Loom Data Types trait

* LDB : LoomDataTypes implemented

* More refactoring

* More refactoring

* pre-release, not verified

* taplo fix

* taplo fix

* UniswapV3Like fix

* UniswapV3Like fix, transact_commit fix, swaprouter::ready added, LoomDB merging added, DatabaseLoomExt::maintain added, four hopes paths removed.

* Block history data cleaning fix, best_tx_compose fix, market locks debug logging, pool loader with semaphore

* build_swap_paths optimization, inlining, SwapPaths refactoring, disabling pools with paths

* pool_health_monitor fix, pool_ok -> pool_disabled

* pre-release. tests failing

* pre-release. tests. HashMap/Set are alloy, "map-fxhash" used

* taplo

* CallSequence merge

---------

Co-authored-by: dexloom <xdexloom@gmail.com>
  • Loading branch information
dexloom and dexloom authored Nov 28, 2024
1 parent e39ec24 commit 077cfd9
Show file tree
Hide file tree
Showing 121 changed files with 2,705 additions and 1,863 deletions.
108 changes: 58 additions & 50 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ homepage = "https://github.com/dexloom/loom"
license = "MIT OR Apache-2.0"
repository = "https://github.com/dexloom/loom"
rust-version = "1.82"
version = "0.3.0"
version = "0.4.0"

[workspace.dependencies]
# broadcast
Expand Down Expand Up @@ -205,14 +205,15 @@ alloy = { version = "0.6.4", features = [
"consensus",
"full",
"json-rpc",
"map-fxhash",
"node-bindings",
"rand",
"rlp",
"rpc-types-anvil",
"rpc-types-json",
"rpc-types-trace",
"serde",
] }
], default-features = false }
alloy-consensus = "0.6.4"
alloy-contract = "0.6.4"
alloy-eips = "0.6.4"
Expand All @@ -238,7 +239,7 @@ alloy-transport-ws = "0.6.4"

#alloy-core
alloy-dyn-abi = { version = "0.8.12", features = ["json", "std"] }
alloy-primitives = { version = "0.8.12", features = ["rand"] }
alloy-primitives = { version = "0.8.12", features = ["map-fxhash", "rand"], default-features = false }
alloy-sol-macro = { version = "0.8.12", features = ["json"] }
alloy-sol-types = { version = "0.8.12", features = ["json"] }

Expand Down
18 changes: 9 additions & 9 deletions bin/gasbench/src/preloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy_provider::Provider;
use alloy_transport::Transport;
use loom_core_actors::SharedState;
use loom_defi_address_book::{
CurveMetapoolAddress, CurvePoolAddress, PancakeV2PoolAddress, PancakeV3PoolAddress, TokenAddress, UniswapV2PoolAddress,
CurveMetapoolAddress, CurvePoolAddress, PancakeV2PoolAddress, PancakeV3PoolAddress, TokenAddressEth, UniswapV2PoolAddress,
UniswapV3PoolAddress,
};
use loom_defi_market::fetch_and_add_pool_by_address;
Expand All @@ -24,14 +24,14 @@ where
{
let mut market_instance = market.write().await;

market_instance.add_token(Token::new_with_data(TokenAddress::WETH, Some("WETH".to_string()), None, Some(18), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::USDC, Some("USDC".to_string()), None, Some(6), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::USDT, Some("USDT".to_string()), None, Some(6), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::DAI, Some("DAI".to_string()), None, Some(18), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::WBTC, Some("WBTC".to_string()), None, Some(8), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::THREECRV, Some("3Crv".to_string()), None, Some(18), false, true))?;
market_instance.add_token(Token::new_with_data(TokenAddress::CRV, Some("Crv".to_string()), None, Some(18), false, false))?;
market_instance.add_token(Token::new_with_data(TokenAddress::LUSD, Some("LUSD".to_string()), None, Some(18), false, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::WETH, Some("WETH".to_string()), None, Some(18), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::USDC, Some("USDC".to_string()), None, Some(6), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::USDT, Some("USDT".to_string()), None, Some(6), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::DAI, Some("DAI".to_string()), None, Some(18), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::WBTC, Some("WBTC".to_string()), None, Some(8), true, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::THREECRV, Some("3Crv".to_string()), None, Some(18), false, true))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::CRV, Some("Crv".to_string()), None, Some(18), false, false))?;
market_instance.add_token(Token::new_with_data(TokenAddressEth::LUSD, Some("LUSD".to_string()), None, Some(18), false, false))?;

drop(market_instance);

Expand Down
54 changes: 28 additions & 26 deletions bin/loom_anvil/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use loom::broadcast::flashbots::Flashbots;
use loom::core::actors::{Accessor, Actor, Broadcaster, Consumer, Producer, SharedState};
use loom::core::block_history::BlockHistoryActor;
use loom::core::router::SwapRouterActor;
use loom::defi::address_book::TokenAddress;
use loom::defi::address_book::TokenAddressEth;
use loom::defi::market::{fetch_and_add_pool_by_address, fetch_state_and_add_pool};
use loom::defi::pools::protocols::CurveProtocol;
use loom::defi::pools::CurvePool;
Expand All @@ -38,13 +38,13 @@ use loom::node::actor_config::NodeBlockActorConfig;
use loom::node::json_rpc::NodeBlockActor;
use loom::strategy::backrun::{BackrunConfig, StateChangeArbActor};
use loom::strategy::merger::{ArbSwapPathMergerActor, DiffPathMergerActor, SamePathMergerActor};
use loom::types::blockchain::{debug_trace_block, ChainParameters, Mempool};
use loom::types::blockchain::{debug_trace_block, ChainParameters, LoomDataTypesEthereum, Mempool};
use loom::types::entities::{
AccountNonceAndBalanceState, BlockHistory, LatestBlock, Market, MarketState, PoolClass, Swap, Token, TxSigners,
};
use loom::types::events::{
MarketEvents, MempoolEvents, MessageBlock, MessageBlockHeader, MessageBlockLogs, MessageBlockStateUpdate, MessageHealthEvent,
MessageTxCompose, TxCompose,
MessageSwapCompose, MessageTxCompose, SwapComposeMessage,
};
use revm::db::EmptyDBTyped;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -164,16 +164,16 @@ async fn main() -> Result<()> {
let market_state_instance = MarketState::new(cache_db.clone());

// Add default tokens for price actor
let usdc_token = Token::new_with_data(TokenAddress::USDC, Some("USDC".to_string()), None, Some(6), true, false);
let usdt_token = Token::new_with_data(TokenAddress::USDT, Some("USDT".to_string()), None, Some(6), true, false);
let wbtc_token = Token::new_with_data(TokenAddress::WBTC, Some("WBTC".to_string()), None, Some(8), true, false);
let dai_token = Token::new_with_data(TokenAddress::DAI, Some("DAI".to_string()), None, Some(18), true, false);
let usdc_token = Token::new_with_data(TokenAddressEth::USDC, Some("USDC".to_string()), None, Some(6), true, false);
let usdt_token = Token::new_with_data(TokenAddressEth::USDT, Some("USDT".to_string()), None, Some(6), true, false);
let wbtc_token = Token::new_with_data(TokenAddressEth::WBTC, Some("WBTC".to_string()), None, Some(8), true, false);
let dai_token = Token::new_with_data(TokenAddressEth::DAI, Some("DAI".to_string()), None, Some(18), true, false);
market_instance.add_token(usdc_token)?;
market_instance.add_token(usdt_token)?;
market_instance.add_token(wbtc_token)?;
market_instance.add_token(dai_token)?;

let mempool_instance = Mempool::new();
let mempool_instance = Mempool::<LoomDataTypesEthereum>::new();

info!("Creating channels");
let new_block_headers_channel: Broadcaster<MessageBlockHeader> = Broadcaster::new(10);
Expand Down Expand Up @@ -351,7 +351,8 @@ async fn main() -> Result<()> {
}
}

let tx_compose_channel: Broadcaster<MessageTxCompose<LoomDBType>> = Broadcaster::new(100);
let swap_compose_channel: Broadcaster<MessageSwapCompose<LoomDBType>> = Broadcaster::new(100);
let tx_compose_channel: Broadcaster<MessageTxCompose> = Broadcaster::new(100);

let mut broadcast_actor = AnvilBroadcastActor::new(client.clone());
match broadcast_actor.consume(tx_compose_channel.clone()).start() {
Expand All @@ -363,7 +364,7 @@ async fn main() -> Result<()> {

// Start estimator actor
let mut estimator_actor = EvmEstimatorActor::new_with_provider(encoder.clone(), Some(client.clone()));
match estimator_actor.consume(tx_compose_channel.clone()).produce(tx_compose_channel.clone()).start() {
match estimator_actor.consume(swap_compose_channel.clone()).produce(swap_compose_channel.clone()).start() {
Err(e) => error!("{e}"),
_ => {
info!("Estimate actor started successfully")
Expand All @@ -379,7 +380,8 @@ async fn main() -> Result<()> {
match swap_router_actor
.access(tx_signers.clone())
.access(accounts_state.clone())
.consume(tx_compose_channel.clone())
.consume(swap_compose_channel.clone())
.produce(swap_compose_channel.clone())
.produce(tx_compose_channel.clone())
.start()
{
Expand Down Expand Up @@ -422,7 +424,7 @@ async fn main() -> Result<()> {
.access(block_history_state.clone())
.consume(market_events_channel.clone())
.consume(mempool_events_channel.clone())
.produce(tx_compose_channel.clone())
.produce(swap_compose_channel.clone())
.produce(pool_health_monitor_channel.clone())
.start()
{
Expand All @@ -441,9 +443,9 @@ async fn main() -> Result<()> {
let mut swap_path_merger_actor = ArbSwapPathMergerActor::new(multicaller_address);
match swap_path_merger_actor
.access(latest_block.clone())
.consume(tx_compose_channel.clone())
.consume(swap_compose_channel.clone())
.consume(market_events_channel.clone())
.produce(tx_compose_channel.clone())
.produce(swap_compose_channel.clone())
.start()
{
Err(e) => {
Expand All @@ -461,9 +463,9 @@ async fn main() -> Result<()> {
match same_path_merger_actor
.access(market_state.clone())
.access(latest_block.clone())
.consume(tx_compose_channel.clone())
.consume(swap_compose_channel.clone())
.consume(market_events_channel.clone())
.produce(tx_compose_channel.clone())
.produce(swap_compose_channel.clone())
.start()
{
Err(e) => {
Expand All @@ -477,7 +479,7 @@ async fn main() -> Result<()> {
if test_config.modules.flashbots {
let relays = vec![RelayConfig { id: 1, url: mock_server.as_ref().unwrap().uri(), name: "relay".to_string(), no_sign: Some(false) }];
let flashbots = Flashbots::new(client.clone(), "https://unused", None).with_relays(relays);
let mut flashbots_broadcast_actor = FlashbotsBroadcastActor::new(flashbots, false, true);
let mut flashbots_broadcast_actor = FlashbotsBroadcastActor::new(flashbots, true);
match flashbots_broadcast_actor.consume(tx_compose_channel.clone()).start() {
Err(e) => {
error!("{}", e)
Expand All @@ -491,9 +493,9 @@ async fn main() -> Result<()> {
// Diff path merger tries to merge all found swaplines into one transaction s
let mut diff_path_merger_actor = DiffPathMergerActor::new();
match diff_path_merger_actor
.consume(tx_compose_channel.clone())
.consume(swap_compose_channel.clone())
.consume(market_events_channel.clone())
.produce(tx_compose_channel.clone())
.produce(swap_compose_channel.clone())
.start()
{
Err(e) => {
Expand Down Expand Up @@ -577,7 +579,7 @@ async fn main() -> Result<()> {

println!("Test '{}' is started!", args.config);

let mut tx_compose_sub = tx_compose_channel.subscribe().await;
let mut tx_compose_sub = swap_compose_channel.subscribe().await;

let mut stat = Stat::default();
let timeout_duration = Duration::from_secs(args.timeout);
Expand All @@ -587,13 +589,13 @@ async fn main() -> Result<()> {
msg = tx_compose_sub.recv() => {
match msg {
Ok(msg) => match msg.inner {
TxCompose::Sign(sign_message) => {
debug!(swap=%sign_message.swap, "Sign message");
SwapComposeMessage::Ready(ready_message) => {
debug!(swap=%ready_message.swap, "Sign message");
stat.sign_counter += 1;

if stat.best_profit_eth < sign_message.swap.abs_profit_eth() {
stat.best_profit_eth = sign_message.swap.abs_profit_eth();
stat.best_swap = Some(sign_message.swap.clone());
if stat.best_profit_eth < ready_message.swap.abs_profit_eth() {
stat.best_profit_eth = ready_message.swap.abs_profit_eth();
stat.best_swap = Some(ready_message.swap.clone());
}

if let Some(swaps_ok) = test_config.assertions.swaps_ok {
Expand All @@ -602,7 +604,7 @@ async fn main() -> Result<()> {
}
}
}
TxCompose::Route(encode_message) => {
SwapComposeMessage::Prepare(encode_message) => {
debug!(swap=%encode_message.swap, "Route message");
stat.found_counter += 1;
}
Expand Down
34 changes: 19 additions & 15 deletions bin/loom_backrun/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ async fn main() -> Result<()> {

let client = topology.get_client(Some("local".to_string()).as_ref())?;
let blockchain = topology.get_blockchain(Some("mainnet".to_string()).as_ref())?;
let blockchain_state = topology.get_blockchain_state(Some("mainnet".to_string()).as_ref())?;
let strategy = topology.get_strategy(Some("mainnet".to_string()).as_ref())?;

let tx_signers = topology.get_signers(Some("env_signer".to_string()).as_ref())?;

let backrun_config: BackrunConfigSection = load_from_file("./config.toml".to_string().into()).await?;
Expand All @@ -43,11 +46,11 @@ async fn main() -> Result<()> {
.access(blockchain.mempool())
.access(blockchain.latest_block())
.access(blockchain.market())
.access(blockchain.market_state())
.access(blockchain.block_history())
.access(blockchain_state.market_state())
.access(blockchain_state.block_history())
.consume(blockchain.market_events_channel())
.consume(blockchain.mempool_events_channel())
.produce(blockchain.compose_channel())
.produce(strategy.swap_compose_channel())
.produce(blockchain.pool_health_monitor_channel())
.start()
{
Expand All @@ -68,8 +71,9 @@ async fn main() -> Result<()> {
match swap_path_encoder_actor
.access(tx_signers.clone())
.access(blockchain.nonce_and_balance())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.consume(strategy.swap_compose_channel())
.produce(strategy.swap_compose_channel())
.produce(blockchain.tx_compose_channel())
.start()
{
Ok(r) => {
Expand All @@ -87,8 +91,8 @@ async fn main() -> Result<()> {
match swap_path_merger_actor
.access(blockchain.latest_block())
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.consume(strategy.swap_compose_channel())
.produce(strategy.swap_compose_channel())
.start()
{
Ok(r) => {
Expand All @@ -103,11 +107,11 @@ async fn main() -> Result<()> {
let mut same_path_merger_actor = SamePathMergerActor::new(client.clone());

match same_path_merger_actor
.access(blockchain.market_state())
.access(blockchain_state.market_state())
.access(blockchain.latest_block())
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.consume(strategy.swap_compose_channel())
.produce(strategy.swap_compose_channel())
.start()
{
Ok(r) => {
Expand All @@ -124,8 +128,8 @@ async fn main() -> Result<()> {

match diff_path_merger_actor
.consume(blockchain.market_events_channel())
.consume(blockchain.compose_channel())
.produce(blockchain.compose_channel())
.consume(strategy.swap_compose_channel())
.produce(strategy.swap_compose_channel())
.start()
{
Ok(r) => {
Expand All @@ -141,8 +145,8 @@ async fn main() -> Result<()> {
let mut state_health_monitor_actor = StateHealthMonitorActor::new(client.clone());

match state_health_monitor_actor
.access(blockchain.market_state())
.consume(blockchain.compose_channel())
.access(blockchain_state.market_state())
.consume(blockchain.tx_compose_channel())
.consume(blockchain.market_events_channel())
.start()
{
Expand All @@ -159,7 +163,7 @@ async fn main() -> Result<()> {
let mut stuffing_txs_monitor_actor = StuffingTxMonitorActor::new(client.clone());
match stuffing_txs_monitor_actor
.access(blockchain.latest_block())
.consume(blockchain.compose_channel())
.consume(blockchain.tx_compose_channel())
.consume(blockchain.market_events_channel())
.start()
{
Expand Down
26 changes: 8 additions & 18 deletions bin/loom_exex/src/loom_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy::providers::Provider;
use alloy::transports::Transport;
use axum::Router;
use eyre::{ErrReport, OptionExt};
use loom::core::blockchain::Blockchain;
use loom::core::blockchain::{Blockchain, BlockchainState, Strategy};
use loom::core::blockchain_actors::BlockchainActors;
use loom::core::topology::{BroadcasterConfig, EncoderConfig, TopologyConfig};
use loom::defi::pools::PoolsConfig;
Expand All @@ -23,29 +23,19 @@ use std::env;
use std::future::Future;
use tracing::info;

pub async fn init<
Node: FullNodeComponents,
DB: Database<Error = ErrReport>
+ DatabaseRef<Error = ErrReport>
+ DatabaseCommit
+ DatabaseLoomExt
+ BlockHistoryState
+ Send
+ Sync
+ Clone
+ Default
+ 'static,
>(
pub async fn init<Node: FullNodeComponents>(
ctx: ExExContext<Node>,
bc: Blockchain<DB>,
bc: Blockchain,
config: NodeBlockActorConfig,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
Ok(loom_exex(ctx, bc, config.clone()))
}

pub async fn start_loom<P, T, DB>(
provider: P,
bc: Blockchain<DB>,
bc: Blockchain,
bc_state: BlockchainState<DB>,
strategy: Strategy<DB>,
topology_config: TopologyConfig,
loom_config_filepath: String,
is_exex: bool,
Expand Down Expand Up @@ -97,7 +87,7 @@ where
let backrun_config: BackrunConfigSection = load_from_file::<BackrunConfigSection>(loom_config_filepath.into()).await?;
let backrun_config: BackrunConfig = backrun_config.backrun_strategy;

let mut bc_actors = BlockchainActors::new(provider.clone(), bc.clone(), relays);
let mut bc_actors = BlockchainActors::new(provider.clone(), bc.clone(), bc_state, strategy, relays);
bc_actors
.mempool()?
.with_wait_for_node_sync()? // wait for node to sync before
Expand All @@ -110,7 +100,7 @@ where
.with_swap_encoder(Some(multicaller_address))? // convert swaps to opcodes and passes to estimator
.with_evm_estimator()? // estimate gas, add tips
.with_signers()? // start signer actor that signs transactions before broadcasting
.with_flashbots_broadcaster(false, true)? // broadcast signed txes to flashbots
.with_flashbots_broadcaster( true)? // broadcast signed txes to flashbots
.with_market_state_preloader()? // preload contracts to market state
.with_nonce_and_balance_monitor()? // start monitoring balances of
.with_pool_history_loader(pools_config.clone())? // load pools used in latest 10000 blocks
Expand Down
Loading

0 comments on commit 077cfd9

Please sign in to comment.