Skip to content

Move persist into async part of the sweeper #3819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions lightning/src/util/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
) -> Self {
let outputs = Vec::new();
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
Self {
sweeper_state,
pending_sweep: AtomicBool::new(false),
Expand Down Expand Up @@ -444,7 +444,7 @@ where

state_lock.outputs.push(output_info);
}
self.persist_state(&*state_lock).map_err(|e| {
self.persist_state(&mut *state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})
}
Expand Down Expand Up @@ -472,7 +472,19 @@ where
return Ok(());
}

let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
let result = {
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;

// If there is still dirty state, we need to persist it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird pattern. Why not move persistence out of regenerate_and_broadcast_spend_if_necessary_internal and just set the dirty flag there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at that, but I think we have to persist before we broadcast? Or is that not necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at that, but I think we have to persist before we broadcast? Or is that not necessary?

Hmm, not sure if necessary, but yes, it's probably cleaner to persist that we broadcasted before we attempt it.

However, I think you can avoid the entire 'if it's still dirty'-pattern if you'd trigger the repersistence via a Notifier rather than through the call to regenerate_and_broadcast_if_necessary, as discussed below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Probably still need a dirty flag to prevent unnecessary persists when only sweeps need to be checked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Probably still need a dirty flag to prevent unnecessary persists when only sweeps need to be checked.

Well, this was never the question, the question was around whether we need to run the 'if it's still dirty'-pattern after we may have just persisted. And to avoid that, we should just switch to use the notifier, as we intend to do that anyways.

let mut sweeper_state = self.sweeper_state.lock().unwrap();
if sweeper_state.dirty {
self.persist_state(&mut *sweeper_state).map_err(|e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather take a non-mutable reference here and update the dirty flag afterwards, since persist_state seems like it should be "read-only" on the state. Or we could rename the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps flush_state is an option? Updating the dirty flag afterwards means some code duplication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works for me!

log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})
} else {
Ok(())
}
};

// Release the pending sweep flag again, regardless of result.
self.pending_sweep.store(false, Ordering::Release);
Expand Down Expand Up @@ -560,7 +572,7 @@ where
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
}

self.persist_state(&sweeper_state).map_err(|e| {
self.persist_state(&mut sweeper_state).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
})?;

Expand Down Expand Up @@ -590,7 +602,7 @@ where
});
}

fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
fn persist_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
self.kv_store
.write(
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -609,6 +621,9 @@ where
);
e
})
.map(|_| {
sweeper_state.dirty = false;
})
}

fn spend_outputs(
Expand Down Expand Up @@ -674,9 +689,7 @@ where
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.best_block_updated_internal(&mut *state_lock, header, height);

let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}

fn block_disconnected(&self, header: &Header, height: u32) {
Expand All @@ -698,9 +711,7 @@ where
}
}

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

Expand All @@ -720,9 +731,7 @@ where
) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}

fn transaction_unconfirmed(&self, txid: &Txid) {
Expand All @@ -743,18 +752,14 @@ where
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
.for_each(|o| o.status.unconfirmed());

self.persist_state(&*state_lock).unwrap_or_else(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}
}

fn best_block_updated(&self, header: &Header, height: u32) {
let mut state_lock = self.sweeper_state.lock().unwrap();
self.best_block_updated_internal(&mut *state_lock, header, height);
let _ = self.persist_state(&*state_lock).map_err(|e| {
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
});
state_lock.dirty = true;
}

fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
Expand Down Expand Up @@ -783,11 +788,13 @@ where
struct SweeperState {
outputs: Vec<TrackedSpendableOutput>,
best_block: BestBlock,
dirty: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had that discussion before: I'd really prefer it if we don't mix in runtime state with the SweeperState, which is precisely the object we use to isolated the persisted state from the non-persisted state, which also avoid having to hand a mutable state to persist_state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last time, I created that isolation, but then reverted in favor of an atomic boolean. Which direction would you suggest taking it with the dirty flag? I don't think I'd like another atomic boolean. Already didn't like the first one, but two independent sync primitives is expanding the state space even further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I would much prefer to just have another needs_persist: AtomicBool on OutputSweeper directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type of flows that I'd like to avoid is stuff like: update state, unlock state, mark dirty and then concurrently a persist is happening in between unlock and mark dirty, ultimately leading to clean state marked as dirty that will be re-persisted without changes. Ofc the re-persist isn't the biggest problem, but I am cautious of requiring devs to reason through scenarios like the one above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try out here: main...joostjager:rust-lightning:sweeper-async-persist-atomicbool

I definitely feel all those cases pop up within me if I use that atomic bool.

Copy link
Contributor

@valentinewallace valentinewallace Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this area of the code well, but I tend to agree with Joost that if we had put both flags inside the SweeperState then it would be easier to reason about -- everything would have to be changed under the same lock so would definitely be no concerns about concurrency. At face value, having a separate lock seems like it asks for a race condition?

Copy link
Contributor

@tnull tnull Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there a two aspects here why I dislike the idea of pushing the runtime flags into SweeperState:

  1. We intentionally had SweeperState hold the 'actual'/persisted state of the sweeper, not any runtime-specific behavior. The (_unused, dirty, (static_value, false)), in the persistence logic really just shows that you unnecessarily broke the separation of data and logic we had here. If we think that all should be locked under a single Mutex, we'd need to create a wrapper struct holding both the SweeperState and the runtime-specific bool to maintain that.
  2. However, secondly, I don't think we should introduce the lock contention and block the background processor that is woken and processing a 'I need persist' notification just to check if it actually still needs to re-persist. We don't have strong guarantees when the BP responds to a notification, so if it's mid-loop already it might take a while until it gets back to actually process the persist. Also note that what we do in this PR is effectively splitting the persistence in two: sync in-line persistence for stuff that really needs to happen before we return (track_spendable_outputs) and 'lazy'/async persistence that will happen some time after block connection. For the latter we have relaxed consistency guarantees anyways, and we basically increase chances of missing a persistence anyways. So I don't quite understand where the concern for race conditions in this 'lazy' case comes from. I don't see why we favor lock contention over (theoretical) relaxed consistency guarantees for a case where we already opt into the latter knowingly.

It might also be noteworthy that post-async KVStore we might need to rework the current pattern anyways, as we wouldn't be able to hold the MutexGuard across the write().await boundary. We'll figure that out when we get there, but it could mean that we need to clone the to-be persisted state before dropping the lock, and actually making the call, which would be another reason to not include ~unrelated fields in the state object.

TLDR: I'd prefer to continue to have the runtime bools live as AtomicBools on OutputSweeper directly, but if you guys really worry about any races for the already-lazy case, we should at the very least solve it by wrapping the two fields and SweeperState in yet another struct, maintaining the data/logic separation.

}

impl_writeable_tlv_based!(SweeperState, {
(0, outputs, required_vec),
(2, best_block, required),
(_unused, dirty, (static_value, false)),
});

/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
Expand Down
Loading