Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous patches #6

Merged
merged 7 commits into from
Jul 19, 2023
Merged
12 changes: 9 additions & 3 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ macro_rules! define_run_body {
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
if $timer_elapsed(&mut last_prune_call, prune_timer) {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
#[cfg(feature = "std")] {
Expand All @@ -352,6 +353,8 @@ macro_rules! define_run_body {
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
have_pruned = true;
}
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
last_prune_call = $get_timer(prune_timer);
}

if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
Expand Down Expand Up @@ -451,7 +454,7 @@ where
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
{
let mut should_break = true;
let mut should_break = false;
let async_event_handler = |event| {
let network_graph = gossip_sync.network_graph();
let event_handler = &event_handler;
Expand Down Expand Up @@ -788,7 +791,10 @@ mod tests {

if key == "network_graph" {
if let Some(sender) = &self.graph_persistence_notifier {
sender.send(()).unwrap();
match sender.send(()) {
Tibo-lg marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => {},
Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
}
};

if let Some((error, message)) = self.graph_error {
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl Writeable for Option<Vec<Option<(usize, Signature)>>> {

#[cfg(anchors)]
/// The claim commonly referred to as the pre-signed second-stage HTLC transaction.
#[derive(PartialEq)]
pub(crate) struct ExternalHTLCClaim {
pub(crate) commitment_txid: Txid,
pub(crate) per_commitment_number: u64,
Expand All @@ -183,6 +184,7 @@ pub(crate) struct ExternalHTLCClaim {
// Represents the different types of claims for which events are yielded externally to satisfy said
// claims.
#[cfg(anchors)]
#[derive(PartialEq)]
pub(crate) enum ClaimEvent {
/// Event yielded to signal that the commitment transaction fee must be bumped to claim any
/// encumbered funds and proceed to HTLC resolution, if any HTLCs exist.
Expand Down
4 changes: 2 additions & 2 deletions lightning/src/ln/chan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ pub fn build_anchor_input_witness(funding_key: &PublicKey, funding_sig: &Signatu
///
/// Normally, this is converted to the broadcaster/countersignatory-organized DirectedChannelTransactionParameters
/// before use, via the as_holder_broadcastable and as_counterparty_broadcastable functions.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChannelTransactionParameters {
/// Holder public keys
pub holder_pubkeys: ChannelPublicKeys,
Expand All @@ -837,7 +837,7 @@ pub struct ChannelTransactionParameters {
}

/// Late-bound per-channel counterparty data used to build transactions.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CounterpartyChannelTransactionParameters {
/// Counter-party public keys
pub pubkeys: ChannelPublicKeys,
Expand Down
63 changes: 42 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use core::{cmp, mem};
use core::cell::RefCell;
use crate::io::Read;
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use core::time::Duration;
use core::ops::Deref;

Expand Down Expand Up @@ -863,6 +863,8 @@ where

/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_events: Mutex<Vec<events::Event>>,
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_background_events: Mutex<Vec<BackgroundEvent>>,
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
Expand Down Expand Up @@ -1230,7 +1232,7 @@ pub struct ChannelDetails {
///
pub holder_funding_pubkey: PublicKey,
///
pub counter_funding_pubkey: PublicKey,
pub counter_funding_pubkey: Option<PublicKey>,
///
pub original_funding_outpoint: Option<OutPoint>,
}
Expand Down Expand Up @@ -1616,6 +1618,7 @@ where
per_peer_state: FairRwLock::new(HashMap::new()),

pending_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down Expand Up @@ -1794,7 +1797,7 @@ where
fee_rate_per_kw: channel.get_feerate(),
funding_redeemscript,
holder_funding_pubkey: channel.channel_transaction_parameters.holder_pubkeys.funding_pubkey,
counter_funding_pubkey: channel.channel_transaction_parameters.counterparty_parameters.as_ref().unwrap().pubkeys.funding_pubkey,
counter_funding_pubkey: channel.channel_transaction_parameters.counterparty_parameters.as_ref().map(|params| params.pubkeys.funding_pubkey),
original_funding_outpoint: channel.channel_transaction_parameters.original_funding_outpoint,
});
}
Expand Down Expand Up @@ -5846,29 +5849,46 @@ where
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
&self, handler: H
) {
// We'll acquire our total consistency lock until the returned future completes so that
// we can be sure no other persists happen while processing events.
let _read_guard = self.total_consistency_lock.read().unwrap();
let mut processed_all_events = false;
while !processed_all_events {
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

let mut result = NotifyOption::SkipPersist;
let mut result = NotifyOption::SkipPersist;

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
{
// We'll acquire our total consistency lock so that we can be sure no other
// persists happen while processing monitor events.
let _read_guard = self.total_consistency_lock.read().unwrap();

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
}

let pending_events = mem::replace(&mut *self.pending_events.lock().unwrap(), vec![]);
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
let pending_events = self.pending_events.lock().unwrap().clone();
let num_events = pending_events.len();
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}

for event in pending_events {
handler(event).await;
}
for event in pending_events {
handler(event).await;
}

if result == NotifyOption::DoPersist {
self.persistence_notifier.notify();
{
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.drain(..num_events);
processed_all_events = pending_events.is_empty();
self.pending_events_processor.store(false, Ordering::Release);
}

if result == NotifyOption::DoPersist {
self.persistence_notifier.notify();
}
}
}
}
Expand Down Expand Up @@ -8012,6 +8032,7 @@ where
per_peer_state: FairRwLock::new(per_peer_state),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events_read),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down
4 changes: 2 additions & 2 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1942,14 +1942,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM

fn do_disconnect(&self, mut descriptor: Descriptor, peer: &Peer, reason: &'static str) {
if !peer.handshake_complete() {
log_trace!(self.logger, "Disconnecting peer which hasn't completed handshake due to {}", reason);
log_debug!(self.logger, "Disconnecting peer which hasn't completed handshake due to {}", reason);
descriptor.disconnect_socket();
return;
}

debug_assert!(peer.their_node_id.is_some());
if let Some((node_id, _)) = peer.their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason);
log_debug!(self.logger, "Disconnecting peer with id {} due to {}", node_id, reason);
self.message_handler.chan_handler.peer_disconnected(&node_id);
self.message_handler.onion_message_handler.peer_disconnected(&node_id);
}
Expand Down
9 changes: 9 additions & 0 deletions lightning/src/util/enforcing_trait_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ impl EcdsaChannelSigner for EnforcingSigner {
}
}

impl crate::chain::keysinterface::ExtraSign for EnforcingSigner {
fn sign_with_fund_key_callback<F>(&self, _: &mut F) where F: FnMut(&SecretKey) {
todo!()
}
fn set_channel_value_satoshis(&mut self, _: u64) {
todo!()
}
}

impl WriteableEcdsaChannelSigner for EnforcingSigner {}

impl Writeable for EnforcingSigner {
Expand Down
6 changes: 3 additions & 3 deletions lightning/src/util/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl_writeable_tlv_based_enum_upgradable!(HTLCDestination,

#[cfg(anchors)]
/// A descriptor used to sign for a commitment transaction's anchor output.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AnchorDescriptor {
/// A unique identifier used along with `channel_value_satoshis` to re-derive the
/// [`InMemorySigner`] required to sign `input`.
Expand All @@ -276,7 +276,7 @@ pub struct AnchorDescriptor {

#[cfg(anchors)]
/// A descriptor used to sign for a commitment transaction's HTLC output.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HTLCDescriptor {
/// A unique identifier used along with `channel_value_satoshis` to re-derive the
/// [`InMemorySigner`] required to sign `input`.
Expand Down Expand Up @@ -369,7 +369,7 @@ impl HTLCDescriptor {

#[cfg(anchors)]
/// Represents the different types of transactions, originating from LDK, to be bumped.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BumpTransactionEvent {
/// Indicates that a channel featuring anchor outputs is to be closed by broadcasting the local
/// commitment transaction. Since commitment transactions have a static feerate pre-agreed upon,
Expand Down
4 changes: 4 additions & 0 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ impl<'a> chain::Watch<EnforcingSigner> for TestChainMonitor<'a> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
return self.chain_monitor.release_pending_monitor_events();
}

fn update_channel_funding_txo(&self, _: OutPoint, new_funding_txo: OutPoint, _: u64) -> chain::ChannelMonitorUpdateStatus {
todo!()
}
}

pub struct TestPersister {
Expand Down
Loading