@@ -12,7 +12,7 @@ use rand_chacha::ChaCha8Rng;
1212use random_activity:: RandomActivityError ;
1313use serde:: { Deserialize , Serialize } ;
1414use std:: cmp:: Reverse ;
15- use std:: collections:: { BinaryHeap , HashSet } ;
15+ use std:: collections:: BinaryHeap ;
1616use std:: fmt:: { Display , Formatter } ;
1717use std:: hash:: { DefaultHasher , Hash , Hasher } ;
1818use std:: marker:: Send ;
@@ -814,20 +814,12 @@ impl<C: Clock + 'static> Simulation<C> {
814814 return Err ( e) ;
815815 } ,
816816 } ;
817- let consumer_channels = self . dispatch_consumers (
818- activities
819- . iter ( )
820- . map ( |generator| generator. source_info . pubkey )
821- . collect ( ) ,
822- event_sender. clone ( ) ,
823- & self . tasks ,
824- ) ;
825817
826818 // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
827819 // The producers will use their own TaskTracker so that the simulation can be shutdown if they all finish.
828820 let producer_tasks = TaskTracker :: new ( ) ;
829821 match self
830- . dispatch_producers ( activities, consumer_channels , & producer_tasks)
822+ . dispatch_producers ( activities, event_sender . clone ( ) , & producer_tasks)
831823 . await
832824 {
833825 Ok ( _) => { } ,
@@ -1064,54 +1056,12 @@ impl<C: Clock + 'static> Simulation<C> {
10641056 Ok ( generators)
10651057 }
10661058
1067- /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation
1068- /// has already ensured that we have execution on every nodes listed in consuming_nodes.
1069- fn dispatch_consumers (
1070- & self ,
1071- consuming_nodes : HashSet < PublicKey > ,
1072- output_sender : Sender < SimulationOutput > ,
1073- tasks : & TaskTracker ,
1074- ) -> HashMap < PublicKey , Sender < SimulationEvent > > {
1075- let mut channels = HashMap :: new ( ) ;
1076-
1077- for ( id, node) in self
1078- . nodes
1079- . iter ( )
1080- . filter ( |( id, _) | consuming_nodes. contains ( id) )
1081- {
1082- // For each node we have execution on, we'll create a sender and receiver channel to produce and consumer
1083- // events and insert producer in our tracking map. We do not buffer channels as we expect events to clear
1084- // quickly.
1085- let ( sender, receiver) = channel ( 1 ) ;
1086- channels. insert ( * id, sender. clone ( ) ) ;
1087-
1088- // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull
1089- // events from and the results sender to report the events it has triggered for further monitoring.
1090- // ce: consume event.
1091- let ce_shutdown = self . shutdown_trigger . clone ( ) ;
1092- let ce_output_sender = output_sender. clone ( ) ;
1093- let ce_node = node. clone ( ) ;
1094- tasks. spawn ( async move {
1095- let node_info = ce_node. lock ( ) . await . get_info ( ) . clone ( ) ;
1096- log:: debug!( "Starting events consumer for {}." , node_info) ;
1097- if let Err ( e) = consume_events ( ce_node, receiver, ce_output_sender) . await {
1098- ce_shutdown. trigger ( ) ;
1099- log:: error!( "Event consumer for node {node_info} exited with error: {e:?}." ) ;
1100- } else {
1101- log:: debug!( "Event consumer for node {node_info} completed successfully." ) ;
1102- }
1103- } ) ;
1104- }
1105-
1106- channels
1107- }
1108-
11091059 /// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
11101060 /// for every source node in the set of executors.
11111061 async fn dispatch_producers (
11121062 & self ,
11131063 mut executors : Vec < ExecutorKit > ,
1114- producer_channels : HashMap < PublicKey , Sender < SimulationEvent > > ,
1064+ output_sender : Sender < SimulationOutput > ,
11151065 tasks : & TaskTracker ,
11161066 ) -> Result < ( ) , SimulationError > {
11171067 let mut heap: BinaryHeap < Reverse < PaymentEvent > > = BinaryHeap :: new ( ) ;
@@ -1138,6 +1088,7 @@ impl<C: Clock + 'static> Simulation<C> {
11381088 let shutdown = self . shutdown_trigger . clone ( ) ;
11391089 let clock = self . clock . clone ( ) ;
11401090 let task_clone = tasks. clone ( ) ;
1091+ let nodes = self . nodes . clone ( ) ;
11411092
11421093 tasks. spawn ( async move {
11431094 loop {
@@ -1160,26 +1111,18 @@ impl<C: Clock + 'static> Simulation<C> {
11601111 . clone( ) ;
11611112 if let Some ( c) = executor. payment_generator. payment_count( ) {
11621113 if c == payload. current_count {
1163- log:: info!(
1164- "Payment count has been met for {}: {c} payments. Stopping the activity."
1165- , executor. source_info) ;
1114+ log:: info!( "Payment count has been met for {}: {c} payments. Stopping the activity." , executor. source_info) ;
11661115 continue
11671116 }
11681117 }
1169- let sender = producer_channels. get( & executor. source_info. pubkey) . ok_or(
1170- SimulationError :: RandomActivityError ( RandomActivityError :: ValueError (
1171- format!(
1172- "Activity producer for: {} not found." ,
1173- executor. source_info. pubkey,
1174- ) ,
1175- ) ) ,
1176- ) ?;
1118+ let node = nodes. get( & executor. source_info. pubkey) . ok_or( SimulationError :: MissingNodeError ( format!( "Source node not found, {}" , executor. source_info. pubkey) ) ) ?. clone( ) ;
11771119
11781120 // pe: produce events
11791121 let pe_shutdown = shutdown. clone( ) ;
1180- let pe_sender = sender. clone( ) ;
1122+ // let pe_sender = sender.clone();
11811123 let pe_clock = clock. clone( ) ;
11821124 let source = executor. source_info. clone( ) ;
1125+ let pe_output_sender = output_sender. clone( ) ;
11831126 // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
11841127 // a payment amount something has gone wrong (because we should have validated that we can always
11851128 // generate amounts), so we exit.
@@ -1209,7 +1152,7 @@ impl<C: Clock + 'static> Simulation<C> {
12091152
12101153 // Send the payment, exiting if we can no longer send to the consumer.
12111154 let event = SimulationEvent :: SendPayment ( payload. destination. clone( ) , amount) ;
1212- if let Err ( e) = pe_sender . send ( event. clone( ) ) . await {
1155+ if let Err ( e) = send_payment ( node , pe_output_sender , event. clone( ) ) . await {
12131156 pe_shutdown. trigger( ) ;
12141157 log:: debug!( "Not able to send event payment for {amount}: {source} -> {}. Exited with error {e}." , payload. destination) ;
12151158 } else {
@@ -1233,71 +1176,74 @@ impl<C: Clock + 'static> Simulation<C> {
12331176
12341177/// events that are crated for a lightning node that we can execute events on. Any output that is generated from the
12351178/// event being executed is piped into a channel to handle the result of the event.
1236- async fn consume_events (
1179+ async fn send_payment (
12371180 node : Arc < Mutex < dyn LightningNode > > ,
1238- mut receiver : Receiver < SimulationEvent > ,
1181+ // mut receiver: Receiver<SimulationEvent>,
12391182 sender : Sender < SimulationOutput > ,
1183+ simulation_event : SimulationEvent ,
12401184) -> Result < ( ) , SimulationError > {
1241- loop {
1242- let simulation_event = receiver. recv ( ) . await ;
1243- if let Some ( event) = simulation_event {
1244- match event {
1245- SimulationEvent :: SendPayment ( dest, amt_msat) => {
1246- let node = node. lock ( ) . await ;
1185+ // loop {
1186+ // let simulation_event = receiver.recv().await;
1187+ // if let Some(event) = simulation_event {
1188+ // } else {
1189+ // return Ok(());
1190+ // }
1191+ // }
1192+
1193+ match simulation_event {
1194+ SimulationEvent :: SendPayment ( dest, amt_msat) => {
1195+ let node = node. lock ( ) . await ;
1196+
1197+ let mut payment = Payment {
1198+ source : node. get_info ( ) . pubkey ,
1199+ hash : None ,
1200+ amount_msat : amt_msat,
1201+ destination : dest. pubkey ,
1202+ dispatch_time : SystemTime :: now ( ) ,
1203+ } ;
12471204
1248- let mut payment = Payment {
1249- source : node. get_info ( ) . pubkey ,
1250- hash : None ,
1251- amount_msat : amt_msat,
1252- destination : dest. pubkey ,
1253- dispatch_time : SystemTime :: now ( ) ,
1254- } ;
1255-
1256- let outcome = match node. send_payment ( dest. pubkey , amt_msat) . await {
1257- Ok ( payment_hash) => {
1258- log:: debug!(
1259- "Send payment: {} -> {}: ({})." ,
1260- node. get_info( ) ,
1261- dest,
1262- hex:: encode( payment_hash. 0 )
1263- ) ;
1264- // We need to track the payment outcome using the payment hash that we have received.
1265- payment. hash = Some ( payment_hash) ;
1266- SimulationOutput :: SendPaymentSuccess ( payment)
1267- } ,
1268- Err ( e) => {
1269- log:: error!(
1270- "Error while sending payment {} -> {}." ,
1271- node. get_info( ) ,
1272- dest
1273- ) ;
1274-
1275- match e {
1276- LightningError :: PermanentError ( s) => {
1277- return Err ( SimulationError :: LightningError (
1278- LightningError :: PermanentError ( s) ,
1279- ) ) ;
1280- } ,
1281- _ => SimulationOutput :: SendPaymentFailure (
1282- payment,
1283- PaymentResult :: not_dispatched ( ) ,
1284- ) ,
1285- }
1286- } ,
1287- } ;
1205+ let outcome = match node. send_payment ( dest. pubkey , amt_msat) . await {
1206+ Ok ( payment_hash) => {
1207+ log:: debug!(
1208+ "Send payment: {} -> {}: ({})." ,
1209+ node. get_info( ) ,
1210+ dest,
1211+ hex:: encode( payment_hash. 0 )
1212+ ) ;
1213+ // We need to track the payment outcome using the payment hash that we have received.
1214+ payment. hash = Some ( payment_hash) ;
1215+ SimulationOutput :: SendPaymentSuccess ( payment)
1216+ } ,
1217+ Err ( e) => {
1218+ log:: error!(
1219+ "Error while sending payment {} -> {}." ,
1220+ node. get_info( ) ,
1221+ dest
1222+ ) ;
12881223
1289- let send_result = sender. send ( outcome. clone ( ) ) . await ;
1290- if send_result. is_err ( ) {
1291- return Err ( SimulationError :: MpscChannelError ( format ! (
1292- "Error sending simulation output {outcome:?}."
1293- ) ) ) ;
1224+ match e {
1225+ LightningError :: PermanentError ( s) => {
1226+ return Err ( SimulationError :: LightningError (
1227+ LightningError :: PermanentError ( s) ,
1228+ ) ) ;
1229+ } ,
1230+ _ => SimulationOutput :: SendPaymentFailure (
1231+ payment,
1232+ PaymentResult :: not_dispatched ( ) ,
1233+ ) ,
12941234 }
12951235 } ,
1236+ } ;
1237+
1238+ let send_result = sender. send ( outcome. clone ( ) ) . await ;
1239+ if send_result. is_err ( ) {
1240+ return Err ( SimulationError :: MpscChannelError ( format ! (
1241+ "Error sending simulation output {outcome:?}."
1242+ ) ) ) ;
12961243 }
1297- } else {
1298- return Ok ( ( ) ) ;
1299- }
1300- }
1244+ } ,
1245+ } ;
1246+ Ok ( ( ) )
13011247}
13021248
13031249/// Gets the wait time for the next payment. If this is the first payment being generated, and a specific start delay
0 commit comments