Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peer_id and channel_id explicitly to log records #2314

Merged
merged 9 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ mod tests {
pub lines: Mutex<HashMap<(String, String), usize>>,
}
impl Logger for TrackingLogger {
fn log(&self, record: &Record) {
fn log(&self, record: Record) {
*self.lines.lock().unwrap().entry((record.module_path.to_string(), format!("{}", record.args))).or_insert(0) += 1;
println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
}
Expand Down
2 changes: 1 addition & 1 deletion fuzz/src/onion_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ mod tests {
pub lines: Mutex<HashMap<(String, String), usize>>,
}
impl Logger for TrackingLogger {
fn log(&self, record: &Record) {
fn log(&self, record: Record) {
*self.lines.lock().unwrap().entry((record.module_path.to_string(), format!("{}", record.args))).or_insert(0) += 1;
println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
}
Expand Down
2 changes: 1 addition & 1 deletion fuzz/src/utils/test_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a, Out: Output> Write for LockedWriteAdapter<'a, Out> {
}

impl<Out: Output> Logger for TestLogger<Out> {
fn log(&self, record: &Record) {
fn log(&self, record: Record) {
write!(LockedWriteAdapter(&self.out),
"{:<5} {} [{} : {}] {}\n", record.level.to_string(), self.id, record.module_path, record.line, record.args)
.unwrap();
Expand Down
26 changes: 25 additions & 1 deletion lightning-invoice/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use lightning::ln::channelmanager::{PhantomRouteHints, MIN_CLTV_EXPIRY_DELTA};
use lightning::ln::inbound_payment::{create, create_from_hash, ExpandedKey};
use lightning::routing::gossip::RoutingFees;
use lightning::routing::router::{RouteHint, RouteHintHop, Router};
use lightning::util::logger::Logger;
use lightning::util::logger::{Logger, Record};
use secp256k1::PublicKey;
use core::ops::Deref;
use core::time::Duration;
Expand Down Expand Up @@ -626,6 +626,7 @@ where

log_trace!(logger, "Considering {} channels for invoice route hints", channels.len());
for channel in channels.into_iter().filter(|chan| chan.is_channel_ready) {
let logger = WithChannelDetails::from(logger, &channel);
if channel.get_inbound_payment_scid().is_none() || channel.counterparty.forwarding_info.is_none() {
log_trace!(logger, "Ignoring channel {} for invoice route hints", &channel.channel_id);
continue;
Expand Down Expand Up @@ -710,6 +711,7 @@ where
.into_iter()
.map(|(_, channel)| channel)
.filter(|channel| {
let logger = WithChannelDetails::from(logger, &channel);
let has_enough_capacity = channel.inbound_capacity_msat >= min_inbound_capacity;
let include_channel = if has_pub_unconf_chan {
// If we have a public channel, but it doesn't have enough confirmations to (yet)
Expand Down Expand Up @@ -790,6 +792,28 @@ fn prefer_current_channel(min_inbound_capacity_msat: Option<u64>, current_channe
current_channel > candidate_channel
}

/// Adds relevant context to a [`Record`] before passing it to the wrapped [`Logger`].
struct WithChannelDetails<'a, 'b, L: Deref> where L::Target: Logger {
/// The logger to delegate to after adding context to the record.
logger: &'a L,
/// The [`ChannelDetails`] for adding relevant context to the logged record.
details: &'b ChannelDetails
}

impl<'a, 'b, L: Deref> Logger for WithChannelDetails<'a, 'b, L> where L::Target: Logger {
fn log(&self, mut record: Record) {
record.peer_id = Some(self.details.counterparty.node_id);
record.channel_id = Some(self.details.channel_id);
self.logger.log(record)
}
}

impl<'a, 'b, L: Deref> WithChannelDetails<'a, 'b, L> where L::Target: Logger {
fn from(logger: &'a L, details: &'b ChannelDetails) -> Self {
Self { logger, details }
}
}

#[cfg(test)]
mod test {
use core::cell::RefCell;
Expand Down
2 changes: 1 addition & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ mod tests {

pub struct TestLogger();
impl lightning::util::logger::Logger for TestLogger {
fn log(&self, record: &lightning::util::logger::Record) {
fn log(&self, record: lightning::util::logger::Record) {
println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lightning-rapid-gossip-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
//! # use lightning::util::logger::{Logger, Record};
//! # struct FakeLogger {}
//! # impl Logger for FakeLogger {
//! # fn log(&self, record: &Record) { }
//! # fn log(&self, record: Record) { }
//! # }
//! # let logger = FakeLogger {};
//!
Expand Down
79 changes: 44 additions & 35 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
use crate::chain;
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
use crate::events;
Expand Down Expand Up @@ -359,6 +359,7 @@ where C::Target: chain::Filter,
process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
Expand All @@ -375,12 +376,12 @@ where C::Target: chain::Filter,
}
}

log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
Expand Down Expand Up @@ -619,8 +620,9 @@ where C::Target: chain::Filter,
pub fn rebroadcast_pending_claims(&self) {
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
let logger = WithChannelMonitor::from(&self.logger, &monitor_holder.monitor);
monitor_holder.monitor.rebroadcast_pending_claims(
&*self.broadcaster, &*self.fee_estimator, &*self.logger
&*self.broadcaster, &*self.fee_estimator, &logger
)
}
}
Expand All @@ -638,17 +640,19 @@ where
fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}

fn block_disconnected(&self, header: &Header, height: u32) {
let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
for monitor_state in monitor_states.values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
monitor_state.monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
header, height, &*self.broadcaster, &*self.fee_estimator, &logger);
}
}
}
Expand All @@ -665,27 +669,30 @@ where
fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
self.process_chain_data(header, None, txdata, |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}

fn transaction_unconfirmed(&self, txid: &Txid) {
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &logger);
}
}

fn best_block_updated(&self, header: &Header, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
// While in practice there shouldn't be any recursive calls when given empty txdata,
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, height, &*self.broadcaster, &*self.fee_estimator, &logger)
});
}

Expand All @@ -711,29 +718,30 @@ where C::Target: chain::Filter,
P::Target: Persist<ChannelSigner>,
{
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::Completed => {
log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(self.logger, "{}", err_str);
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
Expand All @@ -750,8 +758,9 @@ where C::Target: chain::Filter,

fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
let ret = match monitors.get(&funding_txo) {
let monitors_lock = self.monitors.read().unwrap();
let monitors = monitors_lock.deref();
match monitors.get(&funding_txo) {
None => {
log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");

Expand All @@ -765,7 +774,8 @@ where C::Target: chain::Filter,
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let logger = WithChannelMonitor::from(&self.logger, &monitor);
log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);

let update_id = MonitorUpdateId::from_monitor_update(update);
Expand All @@ -776,49 +786,48 @@ where C::Target: chain::Filter,
// We don't want to persist a `monitor_update` which results in a failure to apply later
// while reading `channel_monitor` with updates from storage. Instead, we should persist
// the entire `channel_monitor` here.
log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
} else {
self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
};
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::Completed => {
log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitors);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this no longer drops the lock itself, we'll just hang on the next line rather than actually panicking.

let _poison = self.monitors.write().unwrap();
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ },
}
if update_res.is_err() {
ChannelMonitorUpdateStatus::InProgress
} else {
persist_res
}
}
};
if let ChannelMonitorUpdateStatus::UnrecoverableError = ret {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitors);
let _poison = self.monitors.write().unwrap();
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
ret
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
if is_pending_monitor_update {
log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(logger, " This may cause duplicate payment events to be generated.");
}
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
Expand Down
Loading