Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generics #136

Merged
merged 17 commits into from
Nov 28, 2024
Prev Previous commit
Next Next commit
UniswapV3Like fix, transact_commit fix, swaprouter::ready added, Loom…
…DB merging added, DatabaseLoomExt::maintain added, four hopes paths removed.
  • Loading branch information
dexloom committed Nov 26, 2024
commit 8afc471e830e0ef326a48c51c8d91a222bafd123
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/loom_backrun/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn main() -> Result<()> {
.access(blockchain.nonce_and_balance())
.consume(strategy.swap_compose_channel())
.produce(strategy.swap_compose_channel())
.produce(blockchain.tx_compose_channel())
.start()
{
Ok(r) => {
Expand Down
2 changes: 2 additions & 0 deletions bin/replayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ async fn main() -> Result<()> {

cur_header = header.clone();
if header.number % 10 == 0 {
info!("Composing swap: block_number={}, block_hash={}", header.number, header.hash);

let swap_path = market.read().await.swap_path(vec![TokenAddress::WETH, TokenAddress::USDC], vec![UniswapV3PoolAddress::USDC_WETH_500])?;
let mut swap_line = SwapLine::from(swap_path);
swap_line.amount_in = SwapAmountType::Set( NWETH::from_float(0.1));
Expand Down
20 changes: 9 additions & 11 deletions crates/core/block-history-actor/src/block_history_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,21 @@ where
info!("market state updated ok records : update len: {} accounts: {} contracts: {} storage: {}", msg.state_update.len(),
updated_db.accounts_len(), updated_db.contracts_len() , updated_db.storage_len() );

market_state_guard.state_db = updated_db;
market_state_guard.state_db = updated_db.clone();
market_state_guard.block_hash = msg_block_hash;
market_state_guard.block_number = latest_block_number;


run_async!(market_events_tx.send(MarketEvents::BlockStateUpdate{ block_hash : msg_block_hash} ));

// TODO LoomDB Merging
/*

#[cfg(not(debug_assertions))]
{
// Merging DB in background and update market state
let market_state_clone = market_state.clone();

tokio::task::spawn( async move{
let merged_db = new_market_state_db.merge_all();
let merged_db = updated_db.maintain();
let mut market_state_guard = market_state_clone.write().await;
market_state_guard.state_db = merged_db;
debug!("Merged DB stored in MarketState at block {}", msg_block_number)
Expand All @@ -329,18 +327,18 @@ where
#[cfg(debug_assertions)]
{

market_state_guard.state_db = new_market_state_db.merge_all();
market_state_guard.state_db = updated_db.maintain();

let accounts_len = market_state_guard.state_db.accounts_len();
let accounts_db_len = market_state_guard.state_db.ro_accounts_len();
let accounts = market_state_guard.state_db.accounts_len();

let storage_len = market_state_guard.state_db.storage_len();
let storage_db_len = market_state_guard.state_db.ro_storage_len();
let storage = market_state_guard.state_db.storage_len();
let contracts = market_state_guard.state_db.contracts_len();

trace!("Merging finished. Market state len accounts {}/{} storage {}/{}", accounts_len, accounts_db_len, storage_len, storage_db_len);
trace!(accounts, storage, contracts, "Merging finished. Market state len" );

}
*/



}

Expand Down
87 changes: 63 additions & 24 deletions crates/core/router/src/swap_router_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use loom_core_actors::{Accessor, Actor, ActorResult, Broadcaster, Consumer, Prod
use loom_core_actors_macros::{Accessor, Consumer, Producer};
use loom_core_blockchain::{Blockchain, Strategy};
use loom_types_entities::{AccountNonceAndBalanceState, TxSigners};
use loom_types_events::{MessageSwapCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
use loom_types_events::{MessageSwapCompose, MessageTxCompose, SwapComposeData, SwapComposeMessage, TxComposeData};
use revm::DatabaseRef;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tracing::{debug, error, info};

/// encoder task performs encode for request
async fn router_task<DB: DatabaseRef + Send + Sync + Clone + 'static>(
/// encoder task performs initial routing for swap request
async fn router_task_prepare<DB: DatabaseRef + Send + Sync + Clone + 'static>(
route_request: SwapComposeData<DB>,
compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
signers: SharedState<TxSigners>,
account_monitor: SharedState<AccountNonceAndBalanceState>,
) -> Result<()> {
debug!("Routing started {}", route_request.swap);
debug!("router_task_prepare started {}", route_request.swap);

let signer = match route_request.tx_compose.eoa {
Some(eoa) => signers.read().await.get_signer_by_address(&eoa)?,
Expand Down Expand Up @@ -48,13 +48,29 @@ async fn router_task<DB: DatabaseRef + Send + Sync + Clone + 'static>(
}
}

async fn router_task_broadcast<DB: DatabaseRef + Send + Sync + Clone + 'static>(
route_request: SwapComposeData<DB>,
tx_compose_channel_tx: Broadcaster<MessageTxCompose>,
) -> Result<()> {
debug!("router_task_broadcast started {}", route_request.swap);

match tx_compose_channel_tx.send(MessageTxCompose::sign(route_request.tx_compose)).await {
Err(_) => {
error!("compose_channel_tx.send(estimate_request)");
Err(eyre!("ERROR_SENDING_REQUEST"))
}
Ok(_) => Ok(()),
}
}

async fn swap_router_worker<DB: DatabaseRef + Clone + Send + Sync + 'static>(
signers: SharedState<TxSigners>,
account_monitor: SharedState<AccountNonceAndBalanceState>,
compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
swap_compose_channel_rx: Broadcaster<MessageSwapCompose<DB>>,
swap_compose_channel_tx: Broadcaster<MessageSwapCompose<DB>>,
tx_compose_channel_tx: Broadcaster<MessageTxCompose>,
) -> WorkerResult {
let mut compose_channel_rx: Receiver<MessageSwapCompose<DB>> = compose_channel_rx.subscribe().await;
let mut compose_channel_rx: Receiver<MessageSwapCompose<DB>> = swap_compose_channel_rx.subscribe().await;

info!("swap router worker started");

Expand All @@ -64,16 +80,29 @@ async fn swap_router_worker<DB: DatabaseRef + Clone + Send + Sync + 'static>(
let msg : Result<MessageSwapCompose<DB>, RecvError> = msg;
match msg {
Ok(compose_request) => {
if let SwapComposeMessage::Prepare(encode_request) = compose_request.inner {
debug!("MessageSwapPathEncodeRequest received. stuffing: {:?} swap: {}", encode_request.tx_compose.stuffing_txs_hashes, encode_request.swap);
tokio::task::spawn(
router_task(
encode_request,
compose_channel_tx.clone(),
signers.clone(),
account_monitor.clone(),
)
);
match compose_request.inner {
SwapComposeMessage::Prepare(swap_compose_request)=>{
debug!("MessageSwapComposeRequest::Prepare received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
tokio::task::spawn(
router_task_prepare(
swap_compose_request,
swap_compose_channel_tx.clone(),
signers.clone(),
account_monitor.clone(),
)
);
}
SwapComposeMessage::Ready(swap_compose_request)=>{
debug!("MessageSwapComposeRequest::Ready received. stuffing: {:?} swap: {}", swap_compose_request.tx_compose.stuffing_txs_hashes, swap_compose_request.swap);
tokio::task::spawn(
router_task_broadcast(
swap_compose_request,
tx_compose_channel_tx.clone(),
)
);
}
_=>{}

}
}
Err(e)=>{error!("compose_channel_rx {}",e)}
Expand All @@ -90,17 +119,25 @@ pub struct SwapRouterActor<DB: Send + Sync + Clone + 'static> {
#[accessor]
account_nonce_balance: Option<SharedState<AccountNonceAndBalanceState>>,
#[consumer]
compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
swap_compose_channel_rx: Option<Broadcaster<MessageSwapCompose<DB>>>,
#[producer]
swap_compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
#[producer]
compose_channel_tx: Option<Broadcaster<MessageSwapCompose<DB>>>,
tx_compose_channel_tx: Option<Broadcaster<MessageTxCompose>>,
}

impl<DB> SwapRouterActor<DB>
where
DB: DatabaseRef + Send + Sync + Clone + Default + 'static,
{
pub fn new() -> SwapRouterActor<DB> {
SwapRouterActor { signers: None, account_nonce_balance: None, compose_channel_rx: None, compose_channel_tx: None }
SwapRouterActor {
signers: None,
account_nonce_balance: None,
swap_compose_channel_rx: None,
swap_compose_channel_tx: None,
tx_compose_channel_tx: None,
}
}

pub fn with_signers(self, signers: SharedState<TxSigners>) -> Self {
Expand All @@ -109,9 +146,10 @@ where

pub fn on_bc(self, bc: &Blockchain, strategy: &Strategy<DB>) -> Self {
Self {
compose_channel_rx: Some(strategy.swap_compose_channel()),
compose_channel_tx: Some(strategy.swap_compose_channel()),
swap_compose_channel_rx: Some(strategy.swap_compose_channel()),
swap_compose_channel_tx: Some(strategy.swap_compose_channel()),
account_nonce_balance: Some(bc.nonce_and_balance()),
tx_compose_channel_tx: Some(bc.tx_compose_channel()),
..self
}
}
Expand All @@ -125,8 +163,9 @@ where
let task = tokio::task::spawn(swap_router_worker(
self.signers.clone().unwrap(),
self.account_nonce_balance.clone().unwrap(),
self.compose_channel_rx.clone().unwrap(),
self.compose_channel_tx.clone().unwrap(),
self.swap_compose_channel_rx.clone().unwrap(),
self.swap_compose_channel_tx.clone().unwrap(),
self.tx_compose_channel_tx.clone().unwrap(),
));
Ok(vec![task])
}
Expand Down
2 changes: 2 additions & 0 deletions crates/evm/db/src/database_loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ pub trait DatabaseLoomExt {
fn insert_account_storage(&mut self, address: Address, slot: U256, value: U256) -> eyre::Result<()>;

fn replace_account_storage(&mut self, address: Address, storage: HashMap<U256, U256>) -> eyre::Result<()>;

fn maintain(self) -> Self;
}
12 changes: 9 additions & 3 deletions crates/evm/db/src/loom_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ impl LoomDB {
}

pub fn is_rw_ro_slot(&self, address: &Address, slot: &U256) -> bool {
if let Some(account) = self.accounts.get(address) {
account.storage.contains_key(slot)
let is_rw_slot = if let Some(account) = self.accounts.get(address) { account.storage.contains_key(slot) } else { false };

if is_rw_slot {
true
} else if let Some(read_only_db) = &self.read_only_db {
if let Some(account) = read_only_db.accounts.get(address) {
account.storage.contains_key(slot)
Expand Down Expand Up @@ -520,6 +522,10 @@ impl DatabaseLoomExt for LoomDB {
fn replace_account_storage(&mut self, address: Address, storage: HashMap<U256, U256>) -> Result<()> {
self.replace_account_storage(address, storage)
}

fn maintain(self) -> Self {
self.merge_all()
}
}

impl DatabaseRef for LoomDB {
Expand Down Expand Up @@ -689,7 +695,7 @@ impl DatabaseCommit for LoomDB {
}

#[cfg(test)]
mod test1 {
mod test {
use super::GethAccountState;
use crate::alloydb::AlloyDB;
use crate::loom_db::LoomDB;
Expand Down
17 changes: 13 additions & 4 deletions crates/evm/utils/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use loom_types_blockchain::GethStateUpdate;
use revm::primitives::{Account, Env, ExecutionResult, HaltReason, Output, ResultAndState, TransactTo, CANCUN};
use revm::{Database, DatabaseCommit, DatabaseRef, Evm};
use std::collections::BTreeMap;
use std::fmt::Debug;
use thiserror::Error;
use tracing::{debug, error};

Expand Down Expand Up @@ -62,8 +63,12 @@ where
pub fn evm_transact<DB>(evm: &mut Evm<(), DB>) -> eyre::Result<(Vec<u8>, u64)>
where
DB: Database + DatabaseCommit,
<DB as Database>::Error: Debug,
{
let execution_result = evm.transact_commit().map_err(|_| EvmError::TransactCommitError("COMMIT_ERROR".to_string()))?;
let execution_result = evm.transact_commit().map_err(|error| {
error!(?error, "evm_transact evm.transact_commit()");
EvmError::TransactCommitError("COMMIT_ERROR".to_string())
})?;
let gas_used = execution_result.gas_used();

parse_execution_result(execution_result, gas_used)
Expand All @@ -72,10 +77,10 @@ where
pub fn evm_access_list<DB: DatabaseRef>(state_db: DB, env: &Env, tx: &TransactionRequest) -> eyre::Result<(u64, AccessList)> {
let mut env = env.clone();

let txto = tx.to.unwrap_or_default().to().map_or(Address::ZERO, |x| *x);
let tx_to = tx.to.unwrap_or_default().to().map_or(Address::ZERO, |x| *x);

env.tx.chain_id = tx.chain_id;
env.tx.transact_to = TransactTo::Call(txto);
env.tx.transact_to = TransactTo::Call(tx_to);
env.tx.nonce = tx.nonce;
env.tx.data = tx.input.clone().input.unwrap();
env.tx.value = tx.value.unwrap_or_default();
Expand Down Expand Up @@ -110,12 +115,16 @@ pub fn evm_access_list<DB: DatabaseRef>(state_db: DB, env: &Env, tx: &Transactio
pub fn evm_call_tx_in_block<DB, T: Into<Transaction>>(tx: T, state_db: DB, header: &Header) -> eyre::Result<ResultAndState>
where
DB: DatabaseRef,
<DB as DatabaseRef>::Error: Debug,
{
let env = evm_env_from_tx(tx, header);

let mut evm = Evm::builder().with_spec_id(CANCUN).with_ref_db(state_db).with_env(Box::new(env)).build();

evm.transact().map_err(|_| eyre!("TRANSACT_ERROR"))
evm.transact().map_err(|error| {
error!(?error, "evm_call_tx_in_block evm.transact");
eyre!("TRANSACT_ERROR")
})
}

pub fn convert_evm_result_to_rpc(
Expand Down
12 changes: 9 additions & 3 deletions crates/evm/utils/src/evm_env.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use alloy::consensus::Transaction as TransactionTrait;
use alloy::primitives::U256;
use alloy::primitives::{Address, U256};
use alloy::rpc::types::{Header, Transaction};
use revm::primitives::{BlockEnv, Env, TransactTo, TxEnv};
use lazy_static::lazy_static;
use revm::primitives::{BlobExcessGasAndPrice, BlockEnv, Env, TransactTo, TxEnv};

lazy_static! {
static ref COINBASE: Address = "0x1f9090aaE28b8a3dCeaDf281B0F12828e676c326".parse().unwrap();
}

pub fn env_for_block(block_id: u64, block_timestamp: u64) -> Env {
let mut env = Env::default();
env.block.timestamp = U256::from(block_timestamp);
env.block.number = U256::from(block_id);
env.block.coinbase = *COINBASE;
env
}

Expand All @@ -23,7 +29,7 @@ pub fn evm_env_from_tx<T: Into<Transaction>>(tx: T, block_header: &Header) -> En
basefee: U256::from(block_header.base_fee_per_gas.unwrap_or_default()),
difficulty: block_header.difficulty,
prevrandao: Some(block_header.parent_hash),
blob_excess_gas_and_price: None,
blob_excess_gas_and_price: Some(BlobExcessGasAndPrice::new(block_header.excess_blob_gas.unwrap())),
},
tx: TxEnv {
caller: tx.from,
Expand Down
Loading
Loading