-
Notifications
You must be signed in to change notification settings - Fork 411
Move persist into async part of the sweeper #3819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -382,7 +382,7 @@ where | |
output_spender: O, change_destination_source: D, kv_store: K, logger: L, | ||
) -> Self { | ||
let outputs = Vec::new(); | ||
let sweeper_state = Mutex::new(SweeperState { outputs, best_block }); | ||
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false }); | ||
Self { | ||
sweeper_state, | ||
pending_sweep: AtomicBool::new(false), | ||
|
@@ -444,7 +444,7 @@ where | |
|
||
state_lock.outputs.push(output_info); | ||
} | ||
self.persist_state(&*state_lock).map_err(|e| { | ||
self.persist_state(&mut *state_lock).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}) | ||
} | ||
|
@@ -472,7 +472,19 @@ where | |
return Ok(()); | ||
} | ||
|
||
let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await; | ||
let result = { | ||
self.regenerate_and_broadcast_spend_if_necessary_internal().await?; | ||
|
||
// If there is still dirty state, we need to persist it. | ||
let mut sweeper_state = self.sweeper_state.lock().unwrap(); | ||
if sweeper_state.dirty { | ||
self.persist_state(&mut *sweeper_state).map_err(|e| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would rather take a non-mutable reference here and update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That works for me! |
||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}) | ||
} else { | ||
Ok(()) | ||
} | ||
}; | ||
|
||
// Release the pending sweep flag again, regardless of result. | ||
self.pending_sweep.store(false, Ordering::Release); | ||
|
@@ -560,7 +572,7 @@ where | |
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone()); | ||
} | ||
|
||
self.persist_state(&sweeper_state).map_err(|e| { | ||
self.persist_state(&mut sweeper_state).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
})?; | ||
|
||
|
@@ -590,7 +602,7 @@ where | |
}); | ||
} | ||
|
||
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { | ||
fn persist_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> { | ||
self.kv_store | ||
.write( | ||
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, | ||
|
@@ -609,6 +621,9 @@ where | |
); | ||
e | ||
}) | ||
.map(|_| { | ||
sweeper_state.dirty = false; | ||
}) | ||
} | ||
|
||
fn spend_outputs( | ||
|
@@ -674,9 +689,7 @@ where | |
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); | ||
self.best_block_updated_internal(&mut *state_lock, header, height); | ||
|
||
let _ = self.persist_state(&*state_lock).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
|
||
fn block_disconnected(&self, header: &Header, height: u32) { | ||
|
@@ -698,9 +711,7 @@ where | |
} | ||
} | ||
|
||
self.persist_state(&*state_lock).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
} | ||
|
||
|
@@ -720,9 +731,7 @@ where | |
) { | ||
let mut state_lock = self.sweeper_state.lock().unwrap(); | ||
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); | ||
self.persist_state(&*state_lock).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
|
||
fn transaction_unconfirmed(&self, txid: &Txid) { | ||
|
@@ -743,18 +752,14 @@ where | |
.filter(|o| o.status.confirmation_height() >= Some(unconf_height)) | ||
.for_each(|o| o.status.unconfirmed()); | ||
|
||
self.persist_state(&*state_lock).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
} | ||
|
||
fn best_block_updated(&self, header: &Header, height: u32) { | ||
let mut state_lock = self.sweeper_state.lock().unwrap(); | ||
self.best_block_updated_internal(&mut *state_lock, header, height); | ||
let _ = self.persist_state(&*state_lock).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
|
||
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> { | ||
|
@@ -783,11 +788,13 @@ where | |
struct SweeperState { | ||
outputs: Vec<TrackedSpendableOutput>, | ||
best_block: BestBlock, | ||
dirty: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had that discussion before: I'd really prefer it if we don't mix in runtime state with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last time, I created that isolation, but then reverted in favor of an atomic boolean. Which direction would you suggest taking it with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I would much prefer to just have another There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The type of flows that I'd like to avoid is stuff like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try out here: main...joostjager:rust-lightning:sweeper-async-persist-atomicbool I definitely feel all those cases pop up within me if I use that atomic bool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know this area of the code well, but I tend to agree with Joost that if we had put both flags inside the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So there a two aspects here why I dislike the idea of pushing the runtime flags into
It might also be noteworthy that post- TLDR: I'd prefer to continue to have the runtime |
||
} | ||
|
||
impl_writeable_tlv_based!(SweeperState, { | ||
(0, outputs, required_vec), | ||
(2, best_block, required), | ||
(_unused, dirty, (static_value, false)), | ||
}); | ||
|
||
/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a weird pattern. Why not move persistence out of
regenerate_and_broadcast_spend_if_necessary_internal
and just set the dirty flag there?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at that, but I think we have to persist before we broadcast? Or is that not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure if necessary, but yes, it's probably cleaner to persist that we broadcasted before we attempt it.
However, I think you can avoid the entire 'if it's still dirty'-pattern if you'd trigger the repersistence via a
Notifier
rather than through the call toregenerate_and_broadcast_if_necessary
, as discussed below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Probably still need a dirty flag to prevent unnecessary persists when only sweeps need to be checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this was never the question, the question was around whether we need to run the 'if it's still dirty'-pattern after we may have just persisted. And to avoid that, we should just switch to use the notifier, as we intend to do that anyways.