Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generics #136

Merged
merged 17 commits into from
Nov 28, 2024
Prev Previous commit
Next Next commit
Block history data cleaning fix, best_tx_compose fix, market locks de…
…bug logging, pool loader with semaphore
  • Loading branch information
dexloom committed Nov 27, 2024
commit b26b2106f66898f5d418283c98062df4be87c99d
6 changes: 6 additions & 0 deletions crates/defi/health-monitor/src/pool_health_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub async fn pool_health_monitor_worker(
let entry = pool_errors_map.entry(swap_error.pool).or_insert(0);
*entry += 1;
if *entry >= 10 {
let start_time=std::time::Instant::now();
let mut market_guard = market.write().await;
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");

market_guard.set_pool_ok(swap_error.pool, false);
match market_guard.get_pool(&swap_error.pool) {
Some(pool)=>{
Expand All @@ -41,6 +44,9 @@ pub async fn pool_health_monitor_worker(
error!("Disabled pool missing in market: address={:?}, msg={} amount={}", swap_error.pool, swap_error.msg, swap_error.amount);
}
}
drop(market_guard);
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write released");

}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/defi/market/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ futures.workspace = true
tokio.workspace = true
tracing.workspace = true


# alloy
alloy-network.workspace = true
alloy-primitives.workspace = true
Expand Down
28 changes: 19 additions & 9 deletions crates/defi/market/src/pool_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,21 @@ where
let market_state = market_state.clone();

tokio::task::spawn(async move {
let permit = sema_clone.acquire().await;
if let Err(error) =
fetch_and_add_pool_by_address(client_clone, market_clone, market_state, pool_address, pool_class).await
{
error!(%error, "failed fetch_and_add_pool_by_address");
} else {
info!(%pool_address, %pool_class, "Pool loaded successfully");
match sema_clone.acquire().await {
Ok(permit) => {
if let Err(error) =
fetch_and_add_pool_by_address(client_clone, market_clone, market_state, pool_address, pool_class).await
{
error!(%error, "failed fetch_and_add_pool_by_address");
} else {
info!(%pool_address, %pool_class, "Pool loaded successfully");
}
drop(permit);
}
Err(error) => {
error!(%error, "failed acquire semaphore");
}
}
drop(permit);
});
}
}
Expand Down Expand Up @@ -193,14 +199,18 @@ where
directions_tree.insert(pool_wrapped.clone(), directions_vec);

{
let start_time = std::time::Instant::now();
let mut market_write_guard = market.write().await;
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write acquired");
// Ignore error if pool already exists because it was maybe already added by e.g. db pool loader
let _ = market_write_guard.add_pool(pool_wrapped);

let swap_paths = market_write_guard.build_swap_path_vec(&directions_tree)?;
market_write_guard.add_paths(swap_paths);
debug!(elapsed = start_time.elapsed().as_micros(), market = %market_write_guard, "market_guard path added");

drop(market_write_guard)
drop(market_write_guard);
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.write releases");
}
}
Err(e) => {
Expand Down
17 changes: 0 additions & 17 deletions crates/evm/db/src/loom_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,6 @@ use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tracing::{error, trace};
//
// pub trait LoomDatabaseExt {
// fn accounts(&self) -> HashMap<Address, FastDbAccount>;
// fn contracts(&self) -> HashMap<B256, Bytecode, SimpleBuildHasher>;
// fn logs(&self) -> Vec<Log>;
// fn block_hashes(&self) -> HashMap<BlockNumber, B256>;
//
// fn accounts_mut(&mut self) -> &mut HashMap<Address, FastDbAccount>;
// fn contracts_mut(&mut self) -> &mut HashMap<B256, Bytecode, SimpleBuildHasher>;
// fn logs_mut(&self) -> &mut Vec<Log>;
// fn block_hashes_mut(&self) -> &mut HashMap<BlockNumber, B256>;
//
// fn read_only_db(&self) -> Option<Box<dyn LoomDatabase<Error = ErrReport>>>;
// fn ext_db(&self) -> Option<Box<dyn DatabaseRef<Error = TransportError>>>;
// }
//
// pub trait LoomDatabase: DatabaseRef<Error = ErrReport> + LoomDatabaseExt {}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone)]
Expand Down
31 changes: 18 additions & 13 deletions crates/strategy/backrun/src/state_change_arb_searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
let mut db = state_update_event.market_state().clone();
DatabaseHelpers::apply_geth_state_update_vec(&mut db, state_update_event.state_update().clone());

let start_time = chrono::Local::now();
let start_time = std::time::Instant::now();
let mut swap_path_vec: Vec<SwapPath> = Vec::new();

let market_guard_read = market.read().await;
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.read acquired");

for (pool, v) in state_update_event.directions().iter() {
let pool_paths: Vec<SwapPath> = match market_guard_read.get_pool_paths(&pool.get_address()) {
Some(paths) => paths
Expand All @@ -60,16 +62,18 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
swap_path_vec.extend(pool_paths)
}
drop(market_guard_read);
debug!(elapsed = start_time.elapsed().as_micros(), "market_guard market.read released");

if swap_path_vec.is_empty() {
debug!(
"No swap path built for request: {:?} {}",
state_update_event.stuffing_txs_hashes().first().unwrap_or_default(),
chrono::Local::now() - start_time
request=?state_update_event.stuffing_txs_hashes().first().unwrap_or_default(),
elapsed=start_time.elapsed().as_micros(),
"No swap path built",

);
return Err(eyre!("NO_SWAP_PATHS"));
}
info!("Calculation started: swap_path_vec_len={} elapsed={}", swap_path_vec.len(), chrono::Local::now() - start_time);
info!("Calculation started: swap_path_vec_len={} elapsed={}", swap_path_vec.len(), start_time.elapsed().as_micros());

let env = state_update_event.evm_env();

Expand Down Expand Up @@ -125,10 +129,10 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
}
});
});
debug!(elapsed = %(chrono::Local::now() - start_time), "Calculation iteration finished");
debug!(elapsed = start_time.elapsed().as_micros(), "Calculation iteration finished");
});

debug!(elapsed = %(chrono::Local::now() - start_time), "Calculation results receiver started" );
debug!(elapsed = start_time.elapsed().as_micros(), "Calculation results receiver started");

let swap_request_tx_clone = swap_request_tx.clone();
let pool_health_monitor_tx_clone = pool_health_monitor_tx.clone();
Expand All @@ -142,7 +146,7 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
while let Some(swap_line_result) = swap_line_rx.recv().await {
match swap_line_result {
Ok(swap_line) => {
let encode_request = SwapComposeMessage::Prepare(SwapComposeData {
let prepare_request = SwapComposeMessage::Prepare(SwapComposeData {
tx_compose: TxComposeData {
eoa: backrun_config.eoa(),
next_block_number: state_update_event.next_block_number,
Expand All @@ -161,8 +165,8 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
..SwapComposeData::default()
});

if !backrun_config.smart() || best_answers.check(&encode_request) {
if let Err(e) = swap_request_tx_clone.send(Message::new(encode_request)).await {
if !backrun_config.smart() || best_answers.check(&prepare_request) {
if let Err(e) = swap_request_tx_clone.send(Message::new(prepare_request)).await {
error!("swap_request_tx_clone.send {}", e)
}
}
Expand All @@ -182,7 +186,7 @@ async fn state_change_arb_searcher_task<DB: DatabaseRef<Error = ErrReport> + Dat
origin = %state_update_event.origin,
swap_path_vec_len,
answers,
elapsed = %(chrono::Local::now() - start_time),
elapsed = start_time.elapsed().as_micros(),
stuffing_hash = %state_update_event.stuffing_tx_hash(),
"Calculation finished"
);
Expand All @@ -202,8 +206,9 @@ pub async fn state_change_arb_searcher_worker<
subscribe!(search_request_rx);

let cpus = num_cpus::get();
info!("Starting state arb searcher cpus={cpus}, tasks={}", cpus - 2);
let thread_pool = Arc::new(ThreadPoolBuilder::new().num_threads(cpus - 2).build()?);
let tasks = (cpus * 8) / 10;
info!("Starting state arb searcher cpus={cpus}, tasks={tasks}");
let thread_pool = Arc::new(ThreadPoolBuilder::new().num_threads(tasks).build()?);

loop {
tokio::select! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl<S> BlockHistory<S> {
self.block_numbers.retain(|&key, _| key > (block_number - self.depth as u64));
let actual_hashes: Vec<BlockHash> = self.block_numbers.values().cloned().collect();
self.block_entries.retain(|key, _| actual_hashes.contains(key));
self.block_states.retain(|key, _| actual_hashes.contains(key));
}

self.block_entries.entry(block_hash).or_insert(BlockHistoryEntry::new(header, None, None, None))
Expand Down
68 changes: 49 additions & 19 deletions crates/types/entities/src/market.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::type_complexity)]
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::sync::Arc;

use eyre::{eyre, OptionExt, Result};
Expand Down Expand Up @@ -30,6 +31,31 @@ pub struct Market<LDT: LoomDataTypes = LoomDataTypesEthereum> {
swap_paths: SwapPaths<LDT>,
}

impl<LDT: LoomDataTypes> Display for Market<LDT> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let token_token_len = self.token_tokens.values().map(|inner| inner.len()).sum::<usize>();
let token_token_pools_len = self.token_token_pools.values().map(|inner_map| inner_map.len()).sum::<usize>();
let token_pool_len = self.token_pools.values().map(|inner| inner.len()).sum::<usize>();
let token_pool_len_max = self.token_pools.values().map(|inner| inner.len()).max().unwrap_or_default();
let swap_path_len = self.swap_paths.len();
let swap_path_len_max = self.swap_paths.len_max();

write!(
f,
"Pools: {} Disabled : {} Tokens : {} TT : {} TTP {} TP {}/{} SwapPaths: {}/{}",
self.pools.len(),
self.pools_disabled.len(),
self.tokens.len(),
token_token_len,
token_token_pools_len,
token_pool_len,
token_pool_len_max,
swap_path_len,
swap_path_len_max
)
}
}

impl<LDT: LoomDataTypes> Market<LDT> {
pub fn is_weth(&self, &address: &LDT::Address) -> bool {
address.eq(&LDT::WETH)
Expand Down Expand Up @@ -109,24 +135,28 @@ impl<LDT: LoomDataTypes> Market<LDT> {
None => return,
};

for (token_from_address, token_to_address) in pool_contract.get_swap_directions().into_iter() {
if !ok {
// remove pool from token_token_pools
let _ = self
.token_token_pools
.get_mut(&token_from_address)
.and_then(|token_from_map| token_from_map.get_mut(&token_to_address))
.map(|pool_addresses| pool_addresses.retain(|&x| x != address));
} else if self
.token_token_pools
.get(&token_from_address)
.and_then(|token_from_map| token_from_map.get(&token_to_address))
.map_or(false, |pool_addresses| !pool_addresses.contains(&address))
{
// add pool to token_token_pools if it does not exist
self.token_token_pools.entry(token_from_address).or_default().entry(token_to_address).or_default().push(address);
}
}
self.swap_paths.disable_pool(&address, ok);

/* for (token_from_address, token_to_address) in pool_contract.get_swap_directions().into_iter() {
if !ok {
// remove pool from token_token_pools
let _ = self
.token_token_pools
.get_mut(&token_from_address)
.and_then(|token_from_map| token_from_map.get_mut(&token_to_address))
.map(|pool_addresses| pool_addresses.retain(|&x| x != address));
} else if self
.token_token_pools
.get(&token_from_address)
.and_then(|token_from_map| token_from_map.get(&token_to_address))
.map_or(false, |pool_addresses| !pool_addresses.contains(&address))
{
// add pool to token_token_pools if it does not exist
self.token_token_pools.entry(token_from_address).or_default().entry(token_to_address).or_default().push(address);
}
}

*/
}

/// Check if the pool is ok.
Expand Down Expand Up @@ -220,7 +250,7 @@ impl<LDT: LoomDataTypes> Market<LDT> {
pools.push(self.get_pool(pool_address).cloned().ok_or_eyre("TOKEN_NOT_FOUND")?);
}

Ok(SwapPath { tokens, pools })
Ok(SwapPath { tokens, pools, ..Default::default() })
}
}

Expand Down
Loading
Loading