Skip to content

Commit a99bbff

Browse files
committed
simln-lib/refactor: fully deterministic produce events
1 parent 1b3a21f commit a99bbff

File tree

1 file changed

+154
-97
lines changed

1 file changed

+154
-97
lines changed

simln-lib/src/lib.rs

Lines changed: 154 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ pub trait LightningNode: Send {
352352
pub struct DestinationGenerationError(String);
353353

354354
/// A trait for selecting destination nodes for payments in the Lightning Network.
355-
pub trait DestinationGenerator: Send {
355+
pub trait DestinationGenerator: Send + Sync {
356356
/// choose_destination picks a destination node within the network, returning the node's information and its
357357
/// capacity (if available).
358358
fn choose_destination(
@@ -624,6 +624,34 @@ struct ExecutorKit {
624624
payment_generator: Box<dyn PaymentGenerator>,
625625
}
626626

627+
struct PaymentEvent {
628+
wait_time: Duration,
629+
destination: NodeInfo,
630+
current_count: u64,
631+
capacity: Option<u64>,
632+
executor: ExecutorKit,
633+
}
634+
635+
impl Ord for PaymentEvent {
636+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
637+
self.wait_time.cmp(&other.wait_time)
638+
}
639+
}
640+
641+
impl PartialOrd for PaymentEvent {
642+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
643+
Some(self.cmp(other))
644+
}
645+
}
646+
647+
impl PartialEq for PaymentEvent {
648+
fn eq(&self, other: &Self) -> bool {
649+
self.wait_time == other.wait_time
650+
}
651+
}
652+
653+
impl Eq for PaymentEvent {}
654+
627655
impl<C: Clock + 'static> Simulation<C> {
628656
pub fn new(
629657
cfg: SimulationCfg,
@@ -1071,32 +1099,6 @@ impl<C: Clock + 'static> Simulation<C> {
10711099
channels
10721100
}
10731101

1074-
fn generate_payments<A: PaymentGenerator + ?Sized>(
1075-
&self,
1076-
heap: &mut BinaryHeap<Reverse<(Duration, usize)>>,
1077-
source: &NodeInfo,
1078-
node_generator: &A,
1079-
index: usize,
1080-
) -> Result<(), SimulationError> {
1081-
let mut current_count = 0;
1082-
loop {
1083-
let wait = get_payment_delay(current_count, source, node_generator)?;
1084-
heap.push(Reverse((wait, index)));
1085-
if node_generator.payment_count().is_none() {
1086-
return Ok(());
1087-
}
1088-
if let Some(c) = node_generator.payment_count() {
1089-
if c == current_count {
1090-
log::info!(
1091-
"Payment count has been met for {source}: {c} payments. Stopping the activity."
1092-
);
1093-
return Ok(());
1094-
}
1095-
}
1096-
current_count += 1;
1097-
}
1098-
}
1099-
11001102
/// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
11011103
/// for every source node in the set of executors.
11021104
async fn dispatch_producers(
@@ -1105,74 +1107,108 @@ impl<C: Clock + 'static> Simulation<C> {
11051107
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
11061108
tasks: &TaskTracker,
11071109
) -> Result<(), SimulationError> {
1108-
let mut heap: BinaryHeap<Reverse<(Duration, usize)>> = BinaryHeap::new();
1109-
for (index, executor) in executors.iter().enumerate() {
1110-
self.generate_payments(
1111-
&mut heap,
1112-
&executor.source_info,
1113-
executor.payment_generator.as_ref(),
1114-
index,
1115-
)?;
1110+
let mut heap: BinaryHeap<Reverse<PaymentEvent>> = BinaryHeap::new();
1111+
for executor in executors {
1112+
generate_payments(&mut heap, executor, 0)?;
11161113
}
1117-
while let Some(Reverse((wait, index))) = heap.pop() {
1118-
let executor = executors
1119-
.get(index)
1120-
.ok_or(SimulationError::MissingNodeError(
1121-
"Missing executor".to_string(),
1122-
))?;
1123-
let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
1124-
SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
1125-
"Activity producer for: {} not found.",
1126-
executor.source_info.pubkey,
1127-
))),
1128-
)?;
1129-
let (destination, capacity) = executor
1130-
.network_generator
1131-
.choose_destination(executor.source_info.pubkey)
1132-
.map_err(SimulationError::DestinationGenerationError)?;
1133-
1134-
// pe: produce events
1135-
let pe_shutdown = self.shutdown_trigger.clone();
1136-
let pe_listener = self.shutdown_listener.clone();
1137-
let pe_sender = sender.clone();
1138-
let clock = self.clock.clone();
1139-
let source = executor.source_info.clone();
1140-
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
1141-
// a payment amount something has gone wrong (because we should have validated that we can always
1142-
// generate amounts), so we exit.
1143-
let amount = match executor.payment_generator.payment_amount(capacity) {
1144-
Ok(amt) => {
1145-
if amt == 0 {
1146-
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
1147-
continue;
1148-
}
1149-
amt
1150-
},
1151-
Err(e) => {
1152-
return Err(SimulationError::PaymentGenerationError(e));
1114+
1115+
let listener = self.shutdown_listener.clone();
1116+
let shutdown = self.shutdown_trigger.clone();
1117+
let clock = self.clock.clone();
1118+
let t = tasks.clone();
1119+
1120+
tasks.spawn(async move {
1121+
loop {
1122+
select! {
1123+
biased;
1124+
_ = listener.clone() => {
1125+
return Ok(())
11531126
},
1154-
};
1155-
tasks.spawn(async move {
1156-
log::info!("Starting activity producer for {}.", source);
1157-
1158-
if let Err(e) = produce_events(
1159-
source.clone(),
1160-
clock,
1161-
pe_sender,
1162-
pe_listener,
1163-
wait,
1164-
destination,
1165-
amount,
1166-
)
1167-
.await
1168-
{
1169-
pe_shutdown.trigger();
1170-
log::debug!("Activity producer for {source} exited with error {e}.");
1171-
} else {
1172-
log::debug!("Activity producer for {source} completed successfully.");
1127+
_ = async {} => {
1128+
match heap.pop() {
1129+
Some(Reverse(PaymentEvent {
1130+
wait_time,
1131+
destination,
1132+
current_count,
1133+
capacity,
1134+
executor,
1135+
})) => {
1136+
1137+
if let Some(c) = executor.payment_generator.payment_count() {
1138+
if c == current_count {
1139+
log::info!(
1140+
"Payment count has been met for {}: {c} payments. Stopping the activity."
1141+
, executor.source_info);
1142+
return Ok(());
1143+
}
1144+
}
1145+
let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
1146+
SimulationError::RandomActivityError(RandomActivityError::ValueError(
1147+
format!(
1148+
"Activity producer for: {} not found.",
1149+
executor.source_info.pubkey,
1150+
),
1151+
)),
1152+
)?;
1153+
1154+
// pe: produce events
1155+
let pe_shutdown = shutdown.clone();
1156+
let pe_listener = listener.clone();
1157+
let pe_sender = sender.clone();
1158+
let pe_clock = clock.clone();
1159+
let source = executor.source_info.clone();
1160+
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
1161+
// a payment amount something has gone wrong (because we should have validated that we can always
1162+
// generate amounts), so we exit.
1163+
let payment_amount = executor.payment_generator.payment_amount(capacity);
1164+
let amount = match payment_amount {
1165+
Ok(amt) => {
1166+
if amt == 0 {
1167+
log::debug!(
1168+
"Skipping zero amount payment for {source} -> {destination}."
1169+
);
1170+
generate_payments(&mut heap, executor, current_count)?;
1171+
continue;
1172+
} else {
1173+
generate_payments(&mut heap, executor, current_count + 1)?;
1174+
}
1175+
amt
1176+
},
1177+
Err(e) => {
1178+
return Err(SimulationError::PaymentGenerationError(e));
1179+
},
1180+
};
1181+
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1182+
// destination.
1183+
pe_clock.sleep(wait_time).await;
1184+
t.spawn(async move {
1185+
log::info!("Starting activity producer for {}.", source);
1186+
1187+
if let Err(e) = produce_events(
1188+
source.clone(),
1189+
pe_sender,
1190+
pe_listener,
1191+
destination,
1192+
amount,
1193+
)
1194+
.await
1195+
{
1196+
pe_shutdown.trigger();
1197+
log::debug!("Activity producer for {source} exited with error {e}.");
1198+
} else {
1199+
log::debug!("Activity producer for {source} completed successfully.");
1200+
}
1201+
});
1202+
},
1203+
None => {
1204+
break
1205+
},
1206+
}
11731207
}
1174-
});
1175-
}
1208+
}
1209+
};
1210+
Ok(())
1211+
});
11761212
Ok(())
11771213
}
11781214
}
@@ -1263,10 +1299,8 @@ async fn consume_events(
12631299
/// exit if other threads signal that they have errored out.
12641300
async fn produce_events(
12651301
source: NodeInfo,
1266-
clock: Arc<dyn Clock>,
12671302
sender: Sender<SimulationEvent>,
12681303
listener: Listener,
1269-
wait: Duration,
12701304
destination: NodeInfo,
12711305
amount: u64,
12721306
) -> Result<(), SimulationError> {
@@ -1275,9 +1309,7 @@ async fn produce_events(
12751309
_ = listener.clone() => {
12761310
return Ok(());
12771311
},
1278-
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1279-
// destination.
1280-
_ = clock.sleep(wait) => {
1312+
_ = async {} => {
12811313
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);
12821314

12831315
// Send the payment, exiting if we can no longer send to the consumer.
@@ -1578,6 +1610,31 @@ async fn track_payment_result(
15781610
Ok(())
15791611
}
15801612

1613+
fn generate_payments(
1614+
heap: &mut BinaryHeap<Reverse<PaymentEvent>>,
1615+
executor: ExecutorKit,
1616+
current_count: u64,
1617+
) -> Result<(), SimulationError> {
1618+
let wait_time = get_payment_delay(
1619+
current_count,
1620+
&executor.source_info,
1621+
executor.payment_generator.as_ref(),
1622+
)?;
1623+
let (destination, capacity) = executor
1624+
.network_generator
1625+
.choose_destination(executor.source_info.pubkey)
1626+
.map_err(SimulationError::DestinationGenerationError)?;
1627+
let payment_event = PaymentEvent {
1628+
wait_time,
1629+
destination,
1630+
current_count,
1631+
capacity,
1632+
executor,
1633+
};
1634+
heap.push(Reverse(payment_event));
1635+
Ok(())
1636+
}
1637+
15811638
#[cfg(test)]
15821639
mod tests {
15831640
use crate::{

0 commit comments

Comments
 (0)