@@ -626,6 +626,7 @@ struct ExecutorKit {
626626}
627627
628628struct PaymentEvent {
629+ absolute_time : SystemTime ,
629630 wait_time : Duration ,
630631 destination : NodeInfo ,
631632 current_count : u64 ,
@@ -635,7 +636,7 @@ struct PaymentEvent {
635636
636637impl Ord for PaymentEvent {
637638 fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
638- self . wait_time . cmp ( & other. wait_time )
639+ self . absolute_time . cmp ( & other. absolute_time )
639640 }
640641}
641642
@@ -647,7 +648,7 @@ impl PartialOrd for PaymentEvent {
647648
648649impl PartialEq for PaymentEvent {
649650 fn eq ( & self , other : & Self ) -> bool {
650- self . wait_time == other. wait_time
651+ self . absolute_time == other. absolute_time
651652 }
652653}
653654
@@ -1110,7 +1111,7 @@ impl<C: Clock + 'static> Simulation<C> {
11101111 ) -> Result < ( ) , SimulationError > {
11111112 let mut heap: BinaryHeap < Reverse < PaymentEvent > > = BinaryHeap :: new ( ) ;
11121113 for executor in executors {
1113- generate_payments ( & mut heap, executor, 0 ) ?;
1114+ generate_payment ( & mut heap, executor, 0 ) ?;
11141115 }
11151116
11161117 let listener = self . shutdown_listener . clone ( ) ;
@@ -1128,19 +1129,19 @@ impl<C: Clock + 'static> Simulation<C> {
11281129 _ = async { } => {
11291130 match heap. pop( ) {
11301131 Some ( Reverse ( PaymentEvent {
1132+ absolute_time: _,
11311133 wait_time,
11321134 destination,
11331135 current_count,
11341136 capacity,
11351137 executor,
11361138 } ) ) => {
1137-
11381139 if let Some ( c) = executor. payment_generator. payment_count( ) {
11391140 if c == current_count {
11401141 log:: info!(
11411142 "Payment count has been met for {}: {c} payments. Stopping the activity."
11421143 , executor. source_info) ;
1143- return Ok ( ( ) ) ;
1144+ continue
11441145 }
11451146 }
11461147 let sender = producer_channels. get( & executor. source_info. pubkey) . ok_or(
@@ -1168,17 +1169,17 @@ impl<C: Clock + 'static> Simulation<C> {
11681169 log:: debug!(
11691170 "Skipping zero amount payment for {source} -> {destination}."
11701171 ) ;
1171- generate_payments ( & mut heap, executor, current_count) ?;
1172+ generate_payment ( & mut heap, executor, current_count) ?;
11721173 continue ;
1173- } else {
1174- generate_payments( & mut heap, executor, current_count + 1 ) ?;
1175- }
1174+ }
11761175 amt
11771176 } ,
11781177 Err ( e) => {
11791178 return Err ( SimulationError :: PaymentGenerationError ( e) ) ;
11801179 } ,
11811180 } ;
1181+ generate_payment( & mut heap, executor, current_count + 1 ) ?;
1182+
11821183 // Wait until our time to next payment has elapsed then execute a random amount payment to a random
11831184 // destination.
11841185 pe_clock. sleep( wait_time) . await ;
@@ -1599,21 +1600,25 @@ async fn track_payment_result(
15991600 Ok ( ( ) )
16001601}
16011602
1602- fn generate_payments (
1603+ fn generate_payment (
16031604 heap : & mut BinaryHeap < Reverse < PaymentEvent > > ,
16041605 executor : ExecutorKit ,
16051606 current_count : u64 ,
16061607) -> Result < ( ) , SimulationError > {
1608+ let now = SystemTime :: now ( ) ;
16071609 let wait_time = get_payment_delay (
16081610 current_count,
16091611 & executor. source_info ,
16101612 executor. payment_generator . as_ref ( ) ,
16111613 ) ?;
1614+ let absolute_time = now. checked_add ( wait_time) . ok_or ( "Overflow adding duration" ) . unwrap ( ) ;
1615+
16121616 let ( destination, capacity) = executor
16131617 . network_generator
16141618 . choose_destination ( executor. source_info . pubkey )
16151619 . map_err ( SimulationError :: DestinationGenerationError ) ?;
16161620 let payment_event = PaymentEvent {
1621+ absolute_time,
16171622 wait_time,
16181623 destination,
16191624 current_count,
0 commit comments