Skip to content

Commit

Permalink
refactor: wallet updates now atomic with mempool
Browse files Browse the repository at this point in the history
removes the mempool broadcast channel and wallet listener task.

Instead all mempool mutations go through GlobalState methods
which inform wallet of the changes.  This makes changes atomic
over mempool+wallet so they are always in sync.

Changes:
 * remove Mempool::event_channel
 * Mempool &mut methods only callable by super
 * Mempool &mut methods return MempoolEvent(s)
 * add MempoolEvent::UpdateTxMutatorSet.  (unused)
 * add GlobalState methods: mempool_clear, mempool_insert,
    mempool_prune_stale_transactions
 * remove spawn_wallet_task from lib.rs
 * add/improve doc-comments
  • Loading branch information
dan-da committed Sep 26, 2024
1 parent 98f8071 commit da50084
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 120 deletions.
55 changes: 12 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
DataDirectory::create_dir_if_not_exists(&data_dir.root_dir_path()).await?;
info!("Data directory is {}", data_dir);

// Get wallet object, create various wallet secret files
let wallet_dir = data_dir.wallet_directory_path();
DataDirectory::create_dir_if_not_exists(&wallet_dir).await?;
let (wallet_secret, _) =
WalletSecret::read_from_file_or_create(&data_dir.wallet_directory_path())?;
info!("Now getting wallet state. This may take a while if the database needs pruning.");
let wallet_state =
WalletState::new_from_wallet_secret(&data_dir, wallet_secret, &cli_args).await;
info!("Got wallet state.");

// Connect to or create databases for block index, peers, mutator set, block sync
let block_index_db = ArchivalState::initialize_block_index_database(&data_dir).await?;
info!("Got block index database");
Expand All @@ -101,7 +111,7 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
info!("Got archival mutator set");

let archival_state = ArchivalState::new(
data_dir.clone(),
data_dir,
block_index_db,
archival_mutator_set,
cli_args.network,
Expand Down Expand Up @@ -139,17 +149,6 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
};
let blockchain_state = BlockchainState::Archival(blockchain_archival_state);
let mempool = Mempool::new(cli_args.max_mempool_size, latest_block.hash());

// Get wallet object, create various wallet secret files
let wallet_dir = data_dir.wallet_directory_path();
DataDirectory::create_dir_if_not_exists(&wallet_dir).await?;
let (wallet_secret, _) =
WalletSecret::read_from_file_or_create(&data_dir.wallet_directory_path())?;
info!("Now getting wallet state. This may take a while if the database needs pruning.");
let wallet_state =
WalletState::new_from_wallet_secret(&data_dir, wallet_secret, &cli_args).await;
info!("Got wallet state.");

let mut global_state_lock = GlobalStateLock::new(
wallet_state,
blockchain_state,
Expand Down Expand Up @@ -177,11 +176,8 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
.await?;
info!("UTXO restoration check complete");

let mut task_join_handles = vec![];

task_join_handles.push(spawn_wallet_task(global_state_lock.clone()).await?);

// Connect to peers, and provide each peer task with a thread-safe copy of the state
let mut task_join_handles = vec![];
for peer_address in global_state_lock.cli().peers.clone() {
let peer_state_var = global_state_lock.clone(); // bump arc refcount
let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> =
Expand Down Expand Up @@ -288,33 +284,6 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result<()> {
.await
}

pub(crate) async fn spawn_wallet_task(
mut global_state_lock: GlobalStateLock,
) -> Result<tokio::task::JoinHandle<()>> {
let mut mempool_subscriber = global_state_lock.lock_guard().await.mempool.subscribe();

let wallet_join_handle = tokio::task::Builder::new()
.name("wallet_mempool_listener")
.spawn(async move {
let mut events: std::collections::VecDeque<_> = Default::default();

while let Ok(event) = mempool_subscriber.recv().await {
events.push_back(event);

if let Ok(mut gs) = global_state_lock.try_lock_guard_mut() {
while let Some(e) = events.pop_front() {
gs.wallet_state
.handle_mempool_event(e)
.await
.expect("Wallet should handle mempool event without error");
}
}
}
})?;

Ok(wallet_join_handle)
}

/// Time a fn call. Duration is returned as a float in seconds.
pub fn time_fn_call<O>(f: impl FnOnce() -> O) -> (O, f64) {
let start = Instant::now();
Expand Down
3 changes: 3 additions & 0 deletions src/locks/tokio/atomic_rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ impl<T> AtomicRw<T> {
AtomicRwWriteGuard::new(guard, &self.lock_callback_info)
}

/// Attempt to acquire write lock immediately.
///
/// If the lock cannot be acquired without waiting, an error is returned.
pub fn try_lock_guard_mut(&mut self) -> Result<AtomicRwWriteGuard<T>, TryLockError> {
self.try_acquire_write_cb();
let guard = self.inner.try_write()?;
Expand Down
10 changes: 5 additions & 5 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ impl MainLoopHandler {

// Insert into mempool
global_state_mut
.mempool
.insert(pt2m_transaction.transaction.to_owned())?;
.mempool_insert(pt2m_transaction.transaction.to_owned())
.await?;

// send notification to peers
let transaction_notification: TransactionNotification =
Expand Down Expand Up @@ -970,7 +970,7 @@ impl MainLoopHandler {
// Handle mempool cleanup, i.e. removing stale/too old txs from mempool
_ = &mut mempool_cleanup_timer => {
debug!("Timer: mempool-cleaner job");
self.global_state_lock.lock_guard_mut().await.mempool.prune_stale_transactions()?;
self.global_state_lock.lock_guard_mut().await.mempool_prune_stale_transactions().await?;

// Reset the timer to run this branch again in P seconds
mempool_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + mempool_cleanup_timer_interval);
Expand Down Expand Up @@ -1028,8 +1028,8 @@ impl MainLoopHandler {
self.global_state_lock
.lock_guard_mut()
.await
.mempool
.insert(*transaction)?;
.mempool_insert(*transaction)
.await?;

// do not shut down
Ok(false)
Expand Down
4 changes: 2 additions & 2 deletions src/mine_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ mod mine_loop_tests {
.await?;

premine_receiver_global_state
.mempool
.insert(tx_by_preminer)?;
.mempool_insert(tx_by_preminer)
.await?;
assert_eq!(1, premine_receiver_global_state.mempool.len());

// Build transaction
Expand Down
Loading

0 comments on commit da50084

Please sign in to comment.