@@ -625,6 +625,7 @@ struct ExecutorKit {
625625}
626626
627627struct PaymentEvent {
628+ absolute_time : SystemTime ,
628629 wait_time : Duration ,
629630 destination : NodeInfo ,
630631 current_count : u64 ,
@@ -634,7 +635,7 @@ struct PaymentEvent {
634635
635636impl Ord for PaymentEvent {
636637 fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
637- self . wait_time . cmp ( & other. wait_time )
638+ self . absolute_time . cmp ( & other. absolute_time )
638639 }
639640}
640641
@@ -646,7 +647,7 @@ impl PartialOrd for PaymentEvent {
646647
647648impl PartialEq for PaymentEvent {
648649 fn eq ( & self , other : & Self ) -> bool {
649- self . wait_time == other. wait_time
650+ self . absolute_time == other. absolute_time
650651 }
651652}
652653
@@ -1109,7 +1110,7 @@ impl<C: Clock + 'static> Simulation<C> {
11091110 ) -> Result < ( ) , SimulationError > {
11101111 let mut heap: BinaryHeap < Reverse < PaymentEvent > > = BinaryHeap :: new ( ) ;
11111112 for executor in executors {
1112- generate_payments ( & mut heap, executor, 0 ) ?;
1113+ generate_payment ( & mut heap, executor, 0 ) ?;
11131114 }
11141115
11151116 let listener = self . shutdown_listener . clone ( ) ;
@@ -1127,19 +1128,19 @@ impl<C: Clock + 'static> Simulation<C> {
11271128 _ = async { } => {
11281129 match heap. pop( ) {
11291130 Some ( Reverse ( PaymentEvent {
1131+ absolute_time: _,
11301132 wait_time,
11311133 destination,
11321134 current_count,
11331135 capacity,
11341136 executor,
11351137 } ) ) => {
1136-
11371138 if let Some ( c) = executor. payment_generator. payment_count( ) {
11381139 if c == current_count {
11391140 log:: info!(
11401141 "Payment count has been met for {}: {c} payments. Stopping the activity."
11411142 , executor. source_info) ;
1142- return Ok ( ( ) ) ;
1143+ continue
11431144 }
11441145 }
11451146 let sender = producer_channels. get( & executor. source_info. pubkey) . ok_or(
@@ -1167,17 +1168,17 @@ impl<C: Clock + 'static> Simulation<C> {
11671168 log:: debug!(
11681169 "Skipping zero amount payment for {source} -> {destination}."
11691170 ) ;
1170- generate_payments ( & mut heap, executor, current_count) ?;
1171+ generate_payment ( & mut heap, executor, current_count) ?;
11711172 continue ;
1172- } else {
1173- generate_payments( & mut heap, executor, current_count + 1 ) ?;
1174- }
1173+ }
11751174 amt
11761175 } ,
11771176 Err ( e) => {
11781177 return Err ( SimulationError :: PaymentGenerationError ( e) ) ;
11791178 } ,
11801179 } ;
1180+ generate_payment( & mut heap, executor, current_count + 1 ) ?;
1181+
11811182 // Wait until our time to next payment has elapsed then execute a random amount payment to a random
11821183 // destination.
11831184 pe_clock. sleep( wait_time) . await ;
@@ -1610,21 +1611,25 @@ async fn track_payment_result(
16101611 Ok ( ( ) )
16111612}
16121613
1613- fn generate_payments (
1614+ fn generate_payment (
16141615 heap : & mut BinaryHeap < Reverse < PaymentEvent > > ,
16151616 executor : ExecutorKit ,
16161617 current_count : u64 ,
16171618) -> Result < ( ) , SimulationError > {
1619+ let now = SystemTime :: now ( ) ;
16181620 let wait_time = get_payment_delay (
16191621 current_count,
16201622 & executor. source_info ,
16211623 executor. payment_generator . as_ref ( ) ,
16221624 ) ?;
1625+ let absolute_time = now. checked_add ( wait_time) . ok_or ( "Overflow adding duration" ) . unwrap ( ) ;
1626+
16231627 let ( destination, capacity) = executor
16241628 . network_generator
16251629 . choose_destination ( executor. source_info . pubkey )
16261630 . map_err ( SimulationError :: DestinationGenerationError ) ?;
16271631 let payment_event = PaymentEvent {
1632+ absolute_time,
16281633 wait_time,
16291634 destination,
16301635 current_count,
0 commit comments