@@ -10,8 +10,6 @@ use lightning::ln::chan_utils::make_funding_redeemscript;
1010use serde:: { Deserialize , Serialize } ;
1111use std:: collections:: { hash_map:: Entry , HashMap } ;
1212use std:: fmt:: Display ;
13- use std:: future:: Future ;
14- use std:: pin:: Pin ;
1513use std:: sync:: Arc ;
1614use std:: time:: UNIX_EPOCH ;
1715use tokio:: task:: JoinSet ;
@@ -1216,84 +1214,65 @@ impl SimNetwork for SimGraph {
12161214 }
12171215}
12181216
1217+ type AddHtlcsResult = Result < Result < ( ) , ( Option < usize > , ForwardingError ) > , CriticalError > ;
1218+ type RemoveHtlcsResult = Result < ( ) , CriticalError > ;
1219+
1220+ pub struct AddHtlcsInnerRequest {
1221+ pub nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1222+ pub source : PublicKey ,
1223+ pub route : Path ,
1224+ pub payment_hash : PaymentHash ,
1225+ pub interceptors : Vec < Arc < dyn Interceptor > > ,
1226+ pub custom_records : CustomRecords ,
1227+ pub shutdown_listener : Listener ,
1228+ }
1229+
1230+ pub struct RemoveHtlcsInnerRequest {
1231+ pub nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1232+ pub resolution_idx : usize ,
1233+ pub source : PublicKey ,
1234+ pub route : Path ,
1235+ pub payment_hash : PaymentHash ,
1236+ pub success : bool ,
1237+ pub interceptors : Vec < Arc < dyn Interceptor > > ,
1238+ }
1239+
12191240/// This trait defines the "interface" for payment propagation logic
1241+ #[ async_trait]
12201242pub trait PaymentPropagator : Send + Sync + ' static {
1221- fn add_htlcs (
1222- & self ,
1223- nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1224- source : PublicKey ,
1225- route : Path ,
1226- payment_hash : PaymentHash ,
1227- interceptors : Vec < Arc < dyn Interceptor > > ,
1228- custom_records : CustomRecords ,
1229- shutdown_listener : Listener ,
1230- ) -> Pin <
1231- Box <
1232- dyn Future < Output = Result < Result < ( ) , ( Option < usize > , ForwardingError ) > , CriticalError > >
1233- + Send ,
1234- > ,
1235- > ;
1236-
1237- fn remove_htlcs (
1238- & self ,
1239- nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1240- resolution_idx : usize ,
1241- source : PublicKey ,
1242- route : Path ,
1243- payment_hash : PaymentHash ,
1244- success : bool ,
1245- interceptors : Vec < Arc < dyn Interceptor > > ,
1246- ) -> Pin < Box < dyn Future < Output = Result < ( ) , CriticalError > > + Send > > ;
1243+ async fn add_htlcs ( & self , request : AddHtlcsInnerRequest ) -> AddHtlcsResult ;
1244+
1245+ async fn remove_htlcs ( & self , request : RemoveHtlcsInnerRequest ) -> RemoveHtlcsResult ;
12471246}
12481247
12491248pub struct DefaultPaymentPropagator ;
12501249
1250+ #[ async_trait]
12511251impl PaymentPropagator for DefaultPaymentPropagator {
1252- fn add_htlcs (
1253- & self ,
1254- nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1255- source : PublicKey ,
1256- route : Path ,
1257- payment_hash : PaymentHash ,
1258- interceptors : Vec < Arc < dyn Interceptor > > ,
1259- custom_records : CustomRecords ,
1260- shutdown_listener : Listener ,
1261- ) -> Pin <
1262- Box <
1263- dyn Future < Output = Result < Result < ( ) , ( Option < usize > , ForwardingError ) > , CriticalError > >
1264- + Send ,
1265- > ,
1266- > {
1267- Box :: pin ( add_htlcs (
1268- nodes,
1269- source,
1270- route,
1271- payment_hash,
1272- interceptors,
1273- custom_records,
1274- shutdown_listener,
1275- ) )
1252+ async fn add_htlcs ( & self , request : AddHtlcsInnerRequest ) -> AddHtlcsResult {
1253+ add_htlcs (
1254+ request. nodes ,
1255+ request. source ,
1256+ request. route ,
1257+ request. payment_hash ,
1258+ request. interceptors ,
1259+ request. custom_records ,
1260+ request. shutdown_listener ,
1261+ )
1262+ . await
12761263 }
12771264
1278- fn remove_htlcs (
1279- & self ,
1280- nodes : Arc < Mutex < HashMap < ShortChannelID , SimulatedChannel > > > ,
1281- resolution_idx : usize ,
1282- source : PublicKey ,
1283- route : Path ,
1284- payment_hash : PaymentHash ,
1285- success : bool ,
1286- interceptors : Vec < Arc < dyn Interceptor > > ,
1287- ) -> Pin < Box < dyn Future < Output = Result < ( ) , CriticalError > > + Send > > {
1288- Box :: pin ( remove_htlcs (
1289- nodes,
1290- resolution_idx,
1291- source,
1292- route,
1293- payment_hash,
1294- success,
1295- interceptors,
1296- ) )
1265+ async fn remove_htlcs ( & self , request : RemoveHtlcsInnerRequest ) -> RemoveHtlcsResult {
1266+ remove_htlcs (
1267+ request. nodes ,
1268+ request. resolution_idx ,
1269+ request. source ,
1270+ request. route ,
1271+ request. payment_hash ,
1272+ request. success ,
1273+ request. interceptors ,
1274+ )
1275+ . await
12971276 }
12981277}
12991278
@@ -1530,35 +1509,50 @@ struct PropagatePaymentRequest {
15301509/// ie a breakdown of our state machine, it will still notify the payment outcome and will use the shutdown trigger
15311510/// to signal that we should exit.
15321511async fn propagate_payment ( request : PropagatePaymentRequest ) {
1512+ let nodes_for_add = request. nodes . clone ( ) ;
1513+ let route_for_add = request. route . clone ( ) ;
1514+ let interceptors_for_add = request. interceptors . clone ( ) ;
1515+ let custom_records_for_add = request. custom_records . clone ( ) ;
1516+ let shutdown_listener_for_add = request. shutdown_signal . 1 . clone ( ) ;
1517+ let shutdown_trigger = request. shutdown_signal . 0 ;
1518+
1519+ let sender_for_final_result = request. sender ;
1520+ let source_for_add_and_remove = request. source ;
1521+ let payment_hash_for_add_and_remove = request. payment_hash ;
1522+
15331523 let notify_result = match request
15341524 . propagator
1535- . add_htlcs (
1536- request . nodes . clone ( ) ,
1537- request . source ,
1538- request . route . clone ( ) ,
1539- request . payment_hash ,
1540- request . interceptors . clone ( ) ,
1541- request . custom_records ,
1542- request . shutdown_signal . 1 ,
1543- )
1525+ . add_htlcs ( AddHtlcsInnerRequest {
1526+ nodes : nodes_for_add ,
1527+ source : source_for_add_and_remove ,
1528+ route : route_for_add ,
1529+ payment_hash : payment_hash_for_add_and_remove ,
1530+ interceptors : interceptors_for_add ,
1531+ custom_records : custom_records_for_add ,
1532+ shutdown_listener : shutdown_listener_for_add ,
1533+ } )
15441534 . await
15451535 {
15461536 Ok ( Ok ( _) ) => {
15471537 // If we successfully added the htlc, go ahead and remove all the htlcs in the route with successful resolution.
1538+ let nodes_for_remove = request. nodes . clone ( ) ;
1539+ let route_for_remove = request. route . clone ( ) ;
1540+ let interceptors_for_remove = request. interceptors . clone ( ) ;
1541+
15481542 if let Err ( e) = request
15491543 . propagator
1550- . remove_htlcs (
1551- request . nodes ,
1552- request. route . hops . len ( ) - 1 ,
1553- request . source ,
1554- request . route ,
1555- request . payment_hash ,
1556- true ,
1557- request . interceptors ,
1558- )
1544+ . remove_htlcs ( RemoveHtlcsInnerRequest {
1545+ nodes : nodes_for_remove ,
1546+ resolution_idx : request. route . hops . len ( ) - 1 ,
1547+ source : source_for_add_and_remove ,
1548+ route : route_for_remove ,
1549+ payment_hash : payment_hash_for_add_and_remove ,
1550+ success : true ,
1551+ interceptors : interceptors_for_remove ,
1552+ } )
15591553 . await
15601554 {
1561- request . shutdown_signal . 0 . trigger ( ) ;
1555+ shutdown_trigger . trigger ( ) ;
15621556 log:: error!( "Could not remove htlcs from channel: {e}." ) ;
15631557 }
15641558 PaymentResult {
@@ -1569,22 +1563,26 @@ async fn propagate_payment(request: PropagatePaymentRequest) {
15691563 Ok ( Err ( ( fail_idx, fwd_err) ) ) => {
15701564 // If we partially added HTLCs along the route, we need to fail them back to the source to clean up our partial
15711565 // state. It's possible that we failed with the very first add, and then we don't need to clean anything up.
1566+ let nodes_for_remove = request. nodes . clone ( ) ;
1567+ let route_for_remove = request. route . clone ( ) ;
1568+ let interceptors_for_remove = request. interceptors . clone ( ) ;
1569+
15721570 if let Some ( resolution_idx) = fail_idx {
15731571 if request
15741572 . propagator
1575- . remove_htlcs (
1576- request . nodes ,
1573+ . remove_htlcs ( RemoveHtlcsInnerRequest {
1574+ nodes : nodes_for_remove ,
15771575 resolution_idx,
1578- request . source ,
1579- request . route ,
1580- request . payment_hash ,
1581- false ,
1582- request . interceptors ,
1583- )
1576+ source : source_for_add_and_remove ,
1577+ route : route_for_remove ,
1578+ payment_hash : payment_hash_for_add_and_remove ,
1579+ success : false ,
1580+ interceptors : interceptors_for_remove ,
1581+ } )
15841582 . await
15851583 . is_err ( )
15861584 {
1587- request . shutdown_signal . 0 . trigger ( ) ;
1585+ shutdown_trigger . trigger ( ) ;
15881586 }
15891587 }
15901588
@@ -1598,7 +1596,7 @@ async fn propagate_payment(request: PropagatePaymentRequest) {
15981596 }
15991597 } ,
16001598 Err ( critical_err) => {
1601- request . shutdown_signal . 0 . trigger ( ) ;
1599+ shutdown_trigger . trigger ( ) ;
16021600 log:: debug!(
16031601 "Critical error in simulated payment {}: {critical_err}" ,
16041602 hex:: encode( request. payment_hash. 0 )
@@ -1610,7 +1608,7 @@ async fn propagate_payment(request: PropagatePaymentRequest) {
16101608 } ,
16111609 } ;
16121610
1613- if let Err ( e) = request . sender . send ( Ok ( notify_result) ) {
1611+ if let Err ( e) = sender_for_final_result . send ( Ok ( notify_result) ) {
16141612 log:: error!( "Could not notify payment result: {:?}." , e) ;
16151613 }
16161614}
@@ -2135,28 +2133,17 @@ mod tests {
21352133 mock ! {
21362134 pub TestPaymentPropagator { }
21372135
2136+ #[ async_trait]
21382137 impl PaymentPropagator for TestPaymentPropagator {
2139- fn add_htlcs(
2138+ async fn add_htlcs(
21402139 & self ,
2141- nodes: Arc <tokio:: sync:: Mutex <HashMap <ShortChannelID , SimulatedChannel >>>,
2142- source: PublicKey ,
2143- route: Path ,
2144- payment_hash: PaymentHash ,
2145- interceptors: Vec <Arc <dyn Interceptor >>,
2146- custom_records: CustomRecords ,
2147- shutdown_listener: Listener ,
2148- ) -> Pin <Box <dyn Future <Output = Result <Result <( ) , ( Option <usize >, ForwardingError ) >, CriticalError >> + Send >>;
2140+ request: AddHtlcsInnerRequest
2141+ ) -> AddHtlcsResult ;
21492142
2150- fn remove_htlcs(
2143+ async fn remove_htlcs(
21512144 & self ,
2152- nodes: Arc <tokio:: sync:: Mutex <HashMap <ShortChannelID , SimulatedChannel >>>,
2153- resolution_idx: usize ,
2154- source: PublicKey ,
2155- route: Path ,
2156- payment_hash: PaymentHash ,
2157- success: bool ,
2158- interceptors: Vec <Arc <dyn Interceptor >>,
2159- ) -> Pin <Box <dyn Future <Output = Result <( ) , CriticalError >> + Send >>;
2145+ request: RemoveHtlcsInnerRequest
2146+ ) -> RemoveHtlcsResult ;
21602147 }
21612148 }
21622149
@@ -2167,12 +2154,12 @@ mod tests {
21672154 mock_propagator
21682155 . expect_add_htlcs ( )
21692156 . once ( )
2170- . return_once ( |_, _ , _ , _ , _ , _ , _| Box :: pin ( async { Ok ( Ok ( ( ) ) ) } ) ) ;
2157+ . return_once ( |_| Ok ( Ok ( ( ) ) ) ) ;
21712158
21722159 mock_propagator
21732160 . expect_remove_htlcs ( )
21742161 . once ( )
2175- . return_once ( |_, _ , _ , _ , _ , _ , _| Box :: pin ( async { Ok ( ( ) ) } ) ) ;
2162+ . return_once ( |_| Ok ( ( ) ) ) ;
21762163
21772164 let ( sender, mut receiver) = oneshot:: channel ( ) ;
21782165 let ( trigger, listener) = triggered:: trigger ( ) ;
@@ -2214,25 +2201,13 @@ mod tests {
22142201 mock_propagator
22152202 . expect_add_htlcs ( )
22162203 . once ( )
2217- . returning ( |_, _, _, _, _, _, _| {
2218- Box :: pin (
2219- async move { Ok ( Err ( ( Some ( 0 ) , ForwardingError :: InsufficientBalance ( 0 , 0 ) ) ) ) } ,
2220- )
2221- } ) ;
2204+ . returning ( |_| Ok ( Err ( ( Some ( 0 ) , ForwardingError :: InsufficientBalance ( 0 , 0 ) ) ) ) ) ;
22222205
22232206 mock_propagator
22242207 . expect_remove_htlcs ( )
22252208 . once ( )
2226- . with (
2227- predicate:: always ( ) ,
2228- predicate:: eq ( 0 ) ,
2229- predicate:: always ( ) ,
2230- predicate:: always ( ) ,
2231- predicate:: always ( ) ,
2232- predicate:: eq ( false ) ,
2233- predicate:: always ( ) ,
2234- )
2235- . returning ( |_, _, _, _, _, _, _| Box :: pin ( async move { Ok ( ( ) ) } ) ) ;
2209+ . with ( predicate:: always ( ) )
2210+ . returning ( |_| Ok ( ( ) ) ) ;
22362211
22372212 let ( sender, mut receiver) = oneshot:: channel ( ) ;
22382213
0 commit comments