@@ -626,15 +626,15 @@ struct ExecutorKit {
626626}
627627
628628struct PaymentEvent {
629+ pubkey : PublicKey ,
629630 absolute_time : SystemTime ,
630- executor : ExecutorKit ,
631- }
632-
633- #[ derive( Clone ) ]
634- struct PaymentEventPayload {
635631 wait_time : Duration ,
636632 destination : NodeInfo ,
637633 capacity : Option < u64 > ,
634+ }
635+
636+ struct PaymentEventPayload {
637+ executor : ExecutorKit ,
638638 current_count : u64 ,
639639}
640640
@@ -1065,23 +1065,19 @@ impl<C: Clock + 'static> Simulation<C> {
10651065 tasks : & TaskTracker ,
10661066 ) -> Result < ( ) , SimulationError > {
10671067 let mut heap: BinaryHeap < Reverse < PaymentEvent > > = BinaryHeap :: new ( ) ;
1068- let payment_event_payloads: Arc < Mutex < HashMap < PublicKey , PaymentEventPayload > > > =
1069- Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
1068+ let mut payment_event_payloads: HashMap < PublicKey , PaymentEventPayload > = HashMap :: new ( ) ;
10701069
10711070 // in order to choose a deterministic destination is necessary to sort the nodes by its public key
10721071 executors. sort_by ( |ex1, ex2| ex1. source_info . pubkey . cmp ( & ex2. source_info . pubkey ) ) ;
1072+
10731073 for executor in executors {
1074+ let pubkey = executor. source_info . pubkey ;
10741075 let payload = PaymentEventPayload {
1075- wait_time : Duration :: from_secs ( 0 ) ,
1076- destination : executor. source_info . clone ( ) ,
1077- capacity : None ,
1076+ executor,
10781077 current_count : 0 ,
10791078 } ;
1080- payment_event_payloads
1081- . lock ( )
1082- . await
1083- . insert ( executor. source_info . pubkey , payload) ;
1084- generate_payment ( & mut heap, executor, 0 , payment_event_payloads. clone ( ) ) . await ?;
1079+ payment_event_payloads. insert ( pubkey, payload) ;
1080+ generate_payment ( & mut heap, pubkey, 0 , & mut payment_event_payloads) . await ?;
10851081 }
10861082
10871083 let listener = self . shutdown_listener . clone ( ) ;
@@ -1100,40 +1096,39 @@ impl<C: Clock + 'static> Simulation<C> {
11001096 _ = async { } => {
11011097 match heap. pop( ) {
11021098 Some ( Reverse ( PaymentEvent {
1099+ pubkey,
11031100 absolute_time: _,
1104- executor,
1101+ wait_time,
1102+ destination,
1103+ capacity
11051104 } ) ) => {
1106- let payload = payment_event_payloads
1107- . lock( )
1108- . await
1109- . get( & executor. source_info. pubkey)
1110- . ok_or( SimulationError :: PaymentGenerationError ( PaymentGenerationError ( format!( "executor {} not found" , executor. source_info) ) ) ) ?
1111- . clone( ) ;
1112- if let Some ( c) = executor. payment_generator. payment_count( ) {
1105+ let payload = payment_event_payloads. get( & pubkey) . ok_or( SimulationError :: PaymentGenerationError (
1106+ PaymentGenerationError ( format!( "executor {} not found" , pubkey) ) ,
1107+ ) ) ?;
1108+ if let Some ( c) = payload. executor. payment_generator. payment_count( ) {
11131109 if c == payload. current_count {
1114- log:: info!( "Payment count has been met for {}: {c} payments. Stopping the activity." , executor. source_info) ;
1110+ log:: info!( "Payment count has been met for {}: {c} payments. Stopping the activity." , payload . executor. source_info) ;
11151111 continue
11161112 }
11171113 }
1118- let node = nodes. get( & executor. source_info. pubkey) . ok_or( SimulationError :: MissingNodeError ( format!( "Source node not found, {}" , executor. source_info. pubkey) ) ) ?. clone( ) ;
1114+ let node = nodes. get( & payload . executor. source_info. pubkey) . ok_or( SimulationError :: MissingNodeError ( format!( "Source node not found, {}" , payload . executor. source_info. pubkey) ) ) ?. clone( ) ;
11191115
11201116 // pe: produce events
11211117 let pe_shutdown = shutdown. clone( ) ;
1122- // let pe_sender = sender.clone();
11231118 let pe_clock = clock. clone( ) ;
1124- let source = executor. source_info. clone( ) ;
1119+ let source = payload . executor. source_info. clone( ) ;
11251120 let pe_output_sender = output_sender. clone( ) ;
11261121 // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
11271122 // a payment amount something has gone wrong (because we should have validated that we can always
11281123 // generate amounts), so we exit.
1129- let payment_amount = executor. payment_generator. payment_amount( payload . capacity) ;
1124+ let payment_amount = payload . executor. payment_generator. payment_amount( capacity) ;
11301125 let amount = match payment_amount {
11311126 Ok ( amt) => {
11321127 if amt == 0 {
11331128 log:: debug!(
1134- "Skipping zero amount payment for {source} -> {}." , payload . destination
1129+ "Skipping zero amount payment for {source} -> {}." , destination
11351130 ) ;
1136- generate_payment( & mut heap, executor , payload. current_count, payment_event_payloads. clone ( ) ) . await ?;
1131+ generate_payment( & mut heap, pubkey , payload. current_count, & mut payment_event_payloads) . await ?;
11371132 continue ;
11381133 }
11391134 amt
@@ -1142,24 +1137,31 @@ impl<C: Clock + 'static> Simulation<C> {
11421137 return Err ( SimulationError :: PaymentGenerationError ( e) ) ;
11431138 } ,
11441139 } ;
1145- generate_payment( & mut heap, executor , payload. current_count + 1 , payment_event_payloads. clone ( ) ) . await ?;
1140+ generate_payment( & mut heap, pubkey , payload. current_count + 1 , & mut payment_event_payloads) . await ?;
11461141
11471142 // Wait until our time to next payment has elapsed then execute a random amount payment to a random
11481143 // destination.
1149- pe_clock. sleep( payload. wait_time) . await ;
1150- task_clone. spawn( async move {
1151- log:: debug!( "Generated payment: {source} -> {}: {amount} msat." , payload. destination) ;
1152-
1153- // Send the payment, exiting if we can no longer send to the consumer.
1154- let event = SimulationEvent :: SendPayment ( payload. destination. clone( ) , amount) ;
1155- if let Err ( e) = send_payment( node, pe_output_sender, event. clone( ) ) . await {
1156- pe_shutdown. trigger( ) ;
1157- log:: debug!( "Not able to send event payment for {amount}: {source} -> {}. Exited with error {e}." , payload. destination) ;
1158- } else {
1159-
1160- log:: debug!( "Send event payment for {source} completed successfully." ) ;
1144+ select! {
1145+ biased;
1146+ _ = listener. clone( ) => {
1147+ return Ok ( ( ) )
1148+ } ,
1149+ _ = pe_clock. sleep( wait_time) => {
1150+ task_clone. spawn( async move {
1151+ log:: debug!( "Generated payment: {source} -> {}: {amount} msat." , destination) ;
1152+
1153+ // Send the payment, exiting if we can no longer send to the consumer.
1154+ let event = SimulationEvent :: SendPayment ( destination. clone( ) , amount) ;
1155+ if let Err ( e) = send_payment( node, pe_output_sender, event. clone( ) ) . await {
1156+ pe_shutdown. trigger( ) ;
1157+ log:: debug!( "Not able to send event payment for {amount}: {source} -> {}. Exited with error {e}." , destination) ;
1158+ } else {
1159+
1160+ log:: debug!( "Send event payment for {source} completed successfully." ) ;
1161+ }
1162+ } ) ;
11611163 }
1162- } ) ;
1164+ }
11631165 } ,
11641166 None => {
11651167 break
@@ -1178,18 +1180,9 @@ impl<C: Clock + 'static> Simulation<C> {
11781180/// event being executed is piped into a channel to handle the result of the event.
11791181async fn send_payment (
11801182 node : Arc < Mutex < dyn LightningNode > > ,
1181- // mut receiver: Receiver<SimulationEvent>,
11821183 sender : Sender < SimulationOutput > ,
11831184 simulation_event : SimulationEvent ,
11841185) -> Result < ( ) , SimulationError > {
1185- // loop {
1186- // let simulation_event = receiver.recv().await;
1187- // if let Some(event) = simulation_event {
1188- // } else {
1189- // return Ok(());
1190- // }
1191- // }
1192-
11931186 match simulation_event {
11941187 SimulationEvent :: SendPayment ( dest, amt_msat) => {
11951188 let node = node. lock ( ) . await ;
@@ -1514,15 +1507,20 @@ async fn track_payment_result(
15141507
15151508async fn generate_payment (
15161509 heap : & mut BinaryHeap < Reverse < PaymentEvent > > ,
1517- executor : ExecutorKit ,
1510+ pubkey : PublicKey ,
15181511 current_count : u64 ,
1519- counters : Arc < Mutex < HashMap < PublicKey , PaymentEventPayload > > > ,
1512+ payloads : & mut HashMap < PublicKey , PaymentEventPayload > ,
15201513) -> Result < ( ) , SimulationError > {
1514+ let payload = payloads
1515+ . get ( & pubkey)
1516+ . ok_or ( SimulationError :: PaymentGenerationError (
1517+ PaymentGenerationError ( format ! ( "executor {} not found" , pubkey) ) ,
1518+ ) ) ?;
15211519 let now = SystemTime :: now ( ) ;
15221520 let wait_time = get_payment_delay (
15231521 current_count,
1524- & executor. source_info ,
1525- executor. payment_generator . as_ref ( ) ,
1522+ & payload . executor . source_info ,
1523+ payload . executor . payment_generator . as_ref ( ) ,
15261524 ) ?;
15271525 let absolute_time = now
15281526 . checked_add ( wait_time)
@@ -1533,29 +1531,20 @@ async fn generate_payment(
15331531 ) ) )
15341532 } ) ?;
15351533
1536- let ( destination, capacity) = executor
1534+ let ( destination, capacity) = payload
1535+ . executor
15371536 . network_generator
1538- . choose_destination ( executor. source_info . pubkey )
1537+ . choose_destination ( payload . executor . source_info . pubkey )
15391538 . map_err ( SimulationError :: DestinationGenerationError ) ?;
1540- let mut payload = counters
1541- . lock ( )
1542- . await
1543- . get ( & executor. source_info . pubkey )
1544- . ok_or ( SimulationError :: PaymentGenerationError (
1545- PaymentGenerationError ( format ! ( "executor {} not found" , executor. source_info) ) ,
1546- ) ) ?
1547- . clone ( ) ;
1548- payload. wait_time = wait_time;
1549- payload. destination = destination;
1550- payload. capacity = capacity;
1551- payload. current_count = current_count;
1552- counters
1553- . lock ( )
1554- . await
1555- . insert ( executor. source_info . pubkey , payload) ;
1539+ payloads
1540+ . entry ( payload. executor . source_info . pubkey )
1541+ . and_modify ( |p| p. current_count = current_count) ;
15561542 let payment_event = PaymentEvent {
1543+ pubkey,
15571544 absolute_time,
1558- executor,
1545+ wait_time,
1546+ destination,
1547+ capacity,
15591548 } ;
15601549 heap. push ( Reverse ( payment_event) ) ;
15611550 Ok ( ( ) )
@@ -2290,7 +2279,7 @@ mod tests {
22902279 "Simulation should have run at least for 25s, took {:?}" ,
22912280 elapsed
22922281 ) ;
2293- let expected_payment_list = vec ! [ pk1, pk2, pk1, pk1, pk1, pk3, pk3, pk2, pk4, pk1 ] ;
2282+ let expected_payment_list = vec ! [ pk1, pk2, pk1, pk1, pk1, pk3, pk3, pk2, pk4] ;
22942283 assert ! (
22952284 payments_list. lock( ) . unwrap( ) . as_ref( ) == expected_payment_list,
22962285 "The expected order of payments is not correct"
0 commit comments