Skip to content

Commit 1b3a21f

Browse files
committed
simln-lib/refactor: fully deterministic produce events
1 parent 27fd6e3 commit 1b3a21f

File tree

4 files changed

+119
-100
lines changed

4 files changed

+119
-100
lines changed

sim-cli/src/parsing.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use simln_lib::{
1414
Simulation, SimulationCfg, WriteResults,
1515
};
1616
use simln_lib::{ShortChannelID, SimulationError};
17-
use std::collections::HashMap;
17+
use std::collections::{BTreeMap, HashMap};
1818
use std::fs;
1919
use std::path::PathBuf;
2020
use std::sync::Arc;
@@ -357,12 +357,12 @@ async fn get_clients(
357357
nodes: Vec<NodeConnection>,
358358
) -> Result<
359359
(
360-
HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
360+
BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
361361
HashMap<PublicKey, NodeInfo>,
362362
),
363363
LightningError,
364364
> {
365-
let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
365+
let mut clients: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();
366366
let mut clients_info: HashMap<PublicKey, NodeInfo> = HashMap::new();
367367

368368
for connection in nodes {
@@ -574,7 +574,7 @@ pub async fn parse_sim_params(cli: &Cli) -> anyhow::Result<SimParams> {
574574
}
575575

576576
pub async fn get_validated_activities(
577-
clients: &HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
577+
clients: &BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
578578
nodes_info: HashMap<PublicKey, NodeInfo>,
579579
activity: Vec<ActivityParser>,
580580
) -> Result<Vec<ActivityDefinition>, LightningError> {

simln-lib/src/lib.rs

Lines changed: 106 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use rand::{Rng, RngCore, SeedableRng};
1111
use rand_chacha::ChaCha8Rng;
1212
use random_activity::RandomActivityError;
1313
use serde::{Deserialize, Serialize};
14-
use std::collections::HashSet;
14+
use std::cmp::Reverse;
15+
use std::collections::{BTreeMap, BinaryHeap, HashSet};
1516
use std::fmt::{Display, Formatter};
1617
use std::hash::{DefaultHasher, Hash, Hasher};
1718
use std::marker::Send;
@@ -591,7 +592,7 @@ pub struct Simulation<C: Clock + 'static> {
591592
/// Config for the simulation itself.
592593
cfg: SimulationCfg,
593594
/// The lightning node that is being simulated.
594-
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
595+
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
595596
/// Results logger that holds the simulation statistics.
596597
results: Arc<Mutex<PaymentResultLogger>>,
597598
/// Track all tasks spawned for use in the simulation. When used in the `run` method, it will wait for
@@ -619,14 +620,14 @@ struct ExecutorKit {
619620
source_info: NodeInfo,
620621
/// We use an arc mutex here because some implementations of the trait will be very expensive to clone.
621622
/// See [NetworkGraphView] for details.
622-
network_generator: Arc<Mutex<dyn DestinationGenerator>>,
623+
network_generator: Arc<dyn DestinationGenerator>,
623624
payment_generator: Box<dyn PaymentGenerator>,
624625
}
625626

626627
impl<C: Clock + 'static> Simulation<C> {
627628
pub fn new(
628629
cfg: SimulationCfg,
629-
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
630+
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
630631
tasks: TaskTracker,
631632
clock: Arc<C>,
632633
shutdown_trigger: Trigger,
@@ -954,7 +955,7 @@ impl<C: Clock + 'static> Simulation<C> {
954955
source_info: description.source.clone(),
955956
// Defined activities have very simple generators, so the traits required are implemented on
956957
// a single struct which we just cheaply clone.
957-
network_generator: Arc::new(Mutex::new(activity_generator.clone())),
958+
network_generator: Arc::new(activity_generator.clone()),
958959
payment_generator: Box::new(activity_generator),
959960
});
960961
}
@@ -971,7 +972,7 @@ impl<C: Clock + 'static> Simulation<C> {
971972
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
972973
// avoid double counting capacity (as each node has a counterparty in the channel).
973974
let mut generators = Vec::new();
974-
let mut active_nodes = HashMap::new();
975+
let mut active_nodes = BTreeMap::new();
975976

976977
// Do a first pass to get the capacity of each node which we need to be able to create a network generator.
977978
// While we're at it, we get the node info and store it with capacity to create activity generators in our
@@ -994,18 +995,12 @@ impl<C: Clock + 'static> Simulation<C> {
994995
active_nodes.insert(node_info.pubkey, (node_info, capacity));
995996
}
996997

997-
// Create a network generator with a shared RNG for all nodes.
998-
let network_generator = Arc::new(Mutex::new(
998+
let network_generator = Arc::new(
999999
NetworkGraphView::new(
10001000
active_nodes.values().cloned().collect(),
10011001
MutRng::new(self.cfg.seed.map(|seed| (seed, None))),
10021002
)
10031003
.map_err(SimulationError::RandomActivityError)?,
1004-
));
1005-
1006-
log::info!(
1007-
"Created network generator: {}.",
1008-
network_generator.lock().await
10091004
);
10101005

10111006
for (node_info, capacity) in active_nodes.values() {
@@ -1076,6 +1071,32 @@ impl<C: Clock + 'static> Simulation<C> {
10761071
channels
10771072
}
10781073

1074+
fn generate_payments<A: PaymentGenerator + ?Sized>(
1075+
&self,
1076+
heap: &mut BinaryHeap<Reverse<(Duration, usize)>>,
1077+
source: &NodeInfo,
1078+
node_generator: &A,
1079+
index: usize,
1080+
) -> Result<(), SimulationError> {
1081+
let mut current_count = 0;
1082+
loop {
1083+
let wait = get_payment_delay(current_count, source, node_generator)?;
1084+
heap.push(Reverse((wait, index)));
1085+
if node_generator.payment_count().is_none() {
1086+
return Ok(());
1087+
}
1088+
if let Some(c) = node_generator.payment_count() {
1089+
if c == current_count {
1090+
log::info!(
1091+
"Payment count has been met for {source}: {c} payments. Stopping the activity."
1092+
);
1093+
return Ok(());
1094+
}
1095+
}
1096+
current_count += 1;
1097+
}
1098+
}
1099+
10791100
/// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
10801101
/// for every source node in the set of executors.
10811102
async fn dispatch_producers(
@@ -1084,36 +1105,64 @@ impl<C: Clock + 'static> Simulation<C> {
10841105
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
10851106
tasks: &TaskTracker,
10861107
) -> Result<(), SimulationError> {
1087-
for executor in executors {
1108+
let mut heap: BinaryHeap<Reverse<(Duration, usize)>> = BinaryHeap::new();
1109+
for (index, executor) in executors.iter().enumerate() {
1110+
self.generate_payments(
1111+
&mut heap,
1112+
&executor.source_info,
1113+
executor.payment_generator.as_ref(),
1114+
index,
1115+
)?;
1116+
}
1117+
while let Some(Reverse((wait, index))) = heap.pop() {
1118+
let executor = executors
1119+
.get(index)
1120+
.ok_or(SimulationError::MissingNodeError(
1121+
"Missing executor".to_string(),
1122+
))?;
10881123
let sender = producer_channels.get(&executor.source_info.pubkey).ok_or(
10891124
SimulationError::RandomActivityError(RandomActivityError::ValueError(format!(
10901125
"Activity producer for: {} not found.",
10911126
executor.source_info.pubkey,
10921127
))),
10931128
)?;
1129+
let (destination, capacity) = executor
1130+
.network_generator
1131+
.choose_destination(executor.source_info.pubkey)
1132+
.map_err(SimulationError::DestinationGenerationError)?;
10941133

10951134
// pe: produce events
10961135
let pe_shutdown = self.shutdown_trigger.clone();
10971136
let pe_listener = self.shutdown_listener.clone();
10981137
let pe_sender = sender.clone();
10991138
let clock = self.clock.clone();
1100-
1139+
let source = executor.source_info.clone();
1140+
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
1141+
// a payment amount something has gone wrong (because we should have validated that we can always
1142+
// generate amounts), so we exit.
1143+
let amount = match executor.payment_generator.payment_amount(capacity) {
1144+
Ok(amt) => {
1145+
if amt == 0 {
1146+
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
1147+
continue;
1148+
}
1149+
amt
1150+
},
1151+
Err(e) => {
1152+
return Err(SimulationError::PaymentGenerationError(e));
1153+
},
1154+
};
11011155
tasks.spawn(async move {
1102-
let source = executor.source_info.clone();
1103-
1104-
log::info!(
1105-
"Starting activity producer for {}: {}.",
1106-
source,
1107-
executor.payment_generator
1108-
);
1156+
log::info!("Starting activity producer for {}.", source);
11091157

11101158
if let Err(e) = produce_events(
1111-
executor.source_info,
1112-
executor.network_generator,
1113-
executor.payment_generator,
1159+
source.clone(),
11141160
clock,
11151161
pe_sender,
11161162
pe_listener,
1163+
wait,
1164+
destination,
1165+
amount,
11171166
)
11181167
.await
11191168
{
@@ -1124,7 +1173,6 @@ impl<C: Clock + 'static> Simulation<C> {
11241173
}
11251174
});
11261175
}
1127-
11281176
Ok(())
11291177
}
11301178
}
@@ -1213,74 +1261,43 @@ async fn consume_events(
12131261

12141262
/// produce events generates events for the activity description provided. It accepts a shutdown listener so it can
12151263
/// exit if other threads signal that they have errored out.
1216-
async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
1264+
async fn produce_events(
12171265
source: NodeInfo,
1218-
network_generator: Arc<Mutex<N>>,
1219-
node_generator: Box<A>,
12201266
clock: Arc<dyn Clock>,
12211267
sender: Sender<SimulationEvent>,
12221268
listener: Listener,
1269+
wait: Duration,
1270+
destination: NodeInfo,
1271+
amount: u64,
12231272
) -> Result<(), SimulationError> {
1224-
let mut current_count = 0;
1225-
loop {
1226-
if let Some(c) = node_generator.payment_count() {
1227-
if c == current_count {
1228-
log::info!(
1229-
"Payment count has been met for {source}: {c} payments. Stopping the activity."
1230-
);
1231-
return Ok(());
1273+
select! {
1274+
biased;
1275+
_ = listener.clone() => {
1276+
return Ok(());
1277+
},
1278+
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1279+
// destination.
1280+
_ = clock.sleep(wait) => {
1281+
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);
1282+
1283+
// Send the payment, exiting if we can no longer send to the consumer.
1284+
let event = SimulationEvent::SendPayment(destination.clone(), amount);
1285+
select!{
1286+
biased;
1287+
_ = listener.clone() => {
1288+
return Ok(());
1289+
},
1290+
send_result = sender.send(event.clone()) => {
1291+
if send_result.is_err(){
1292+
return Err(SimulationError::MpscChannelError(
1293+
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1294+
}
1295+
},
12321296
}
1233-
}
1234-
1235-
let wait = get_payment_delay(current_count, &source, node_generator.as_ref())?;
12361297

1237-
select! {
1238-
biased;
1239-
_ = listener.clone() => {
1240-
return Ok(());
1241-
},
1242-
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
1243-
// destination.
1244-
_ = clock.sleep(wait) => {
1245-
let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey).map_err(SimulationError::DestinationGenerationError)?;
1246-
1247-
// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
1248-
// a payment amount something has gone wrong (because we should have validated that we can always
1249-
// generate amounts), so we exit.
1250-
let amount = match node_generator.payment_amount(capacity) {
1251-
Ok(amt) => {
1252-
if amt == 0 {
1253-
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
1254-
continue;
1255-
}
1256-
amt
1257-
},
1258-
Err(e) => {
1259-
return Err(SimulationError::PaymentGenerationError(e));
1260-
},
1261-
};
1262-
1263-
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);
1264-
1265-
// Send the payment, exiting if we can no longer send to the consumer.
1266-
let event = SimulationEvent::SendPayment(destination.clone(), amount);
1267-
select!{
1268-
biased;
1269-
_ = listener.clone() => {
1270-
return Ok(());
1271-
},
1272-
send_result = sender.send(event.clone()) => {
1273-
if send_result.is_err(){
1274-
return Err(SimulationError::MpscChannelError(
1275-
format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
1276-
}
1277-
},
1278-
}
1279-
1280-
current_count += 1;
1281-
},
1282-
}
12831298
}
1299+
};
1300+
Ok(())
12841301
}
12851302

12861303
/// Gets the wait time for the next payment. If this is the first payment being generated, and a specific start delay
@@ -1451,7 +1468,7 @@ async fn run_results_logger(
14511468
/// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the
14521469
/// consumer will not exit and a trigger is required.
14531470
async fn produce_simulation_results(
1454-
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
1471+
nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>>,
14551472
mut output_receiver: Receiver<SimulationOutput>,
14561473
results: Sender<(Payment, PaymentResult)>,
14571474
listener: Listener,
@@ -1570,7 +1587,7 @@ mod tests {
15701587
use bitcoin::secp256k1::PublicKey;
15711588
use bitcoin::Network;
15721589
use mockall::mock;
1573-
use std::collections::HashMap;
1590+
use std::collections::BTreeMap;
15741591
use std::fmt;
15751592
use std::sync::Arc;
15761593
use std::time::Duration;
@@ -1817,7 +1834,7 @@ mod tests {
18171834
/// "we don't control any nodes".
18181835
#[tokio::test]
18191836
async fn test_validate_node_network_empty_nodes() {
1820-
let empty_nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
1837+
let empty_nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();
18211838

18221839
let simulation = test_utils::create_simulation(empty_nodes);
18231840
let result = simulation.validate_node_network().await;

simln-lib/src/sim_node.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use bitcoin::secp256k1::PublicKey;
88
use bitcoin::{Network, ScriptBuf, TxOut};
99
use lightning::ln::chan_utils::make_funding_redeemscript;
1010
use serde::{Deserialize, Serialize};
11+
use std::collections::BTreeMap;
1112
use std::collections::{hash_map::Entry, HashMap};
1213
use std::fmt::Display;
1314
use std::sync::Arc;
@@ -1010,8 +1011,8 @@ impl SimGraph {
10101011
pub async fn ln_node_from_graph(
10111012
graph: Arc<Mutex<SimGraph>>,
10121013
routing_graph: Arc<NetworkGraph<&'_ WrappedLog>>,
1013-
) -> HashMap<PublicKey, Arc<Mutex<dyn LightningNode + '_>>> {
1014-
let mut nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
1014+
) -> BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode + '_>>> {
1015+
let mut nodes: BTreeMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = BTreeMap::new();
10151016

10161017
for pk in graph.lock().await.nodes.keys() {
10171018
nodes.insert(

0 commit comments

Comments
 (0)