@@ -514,7 +514,7 @@ impl PeerState {
514514 // We abort the flow, and prune any data kept.
515515 self . intercept_scid_by_channel_id . retain ( |_, iscid| intercept_scid != iscid) ;
516516 self . intercept_scid_by_user_channel_id . retain ( |_, iscid| intercept_scid != iscid) ;
517- // TODO: Remove peer state entry from the KVStore
517+ self . needs_persist |= true ;
518518 return false ;
519519 }
520520 true
@@ -1645,44 +1645,53 @@ where
16451645 // TODO: We should eventually persist in parallel, however, when we do, we probably want to
16461646 // introduce some batching to upper-bound the number of requests inflight at any given
16471647 // time.
1648- let need_persist: Vec < PublicKey > = {
1649- let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
1650- outer_state_lock
1651- . iter ( )
1652- . filter_map ( |( k, v) | if v. lock ( ) . unwrap ( ) . needs_persist { Some ( * k) } else { None } )
1653- . collect ( )
1654- } ;
1648+
1649+ let mut need_remove = Vec :: new ( ) ;
1650+ let mut need_persist = Vec :: new ( ) ;
1651+
1652+ {
1653+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1654+ outer_state_lock. retain ( |counterparty_node_id, inner_state_lock| {
1655+ let mut peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
1656+ peer_state_lock. prune_expired_request_state ( ) ;
1657+ let is_prunable = peer_state_lock. is_prunable ( ) ;
1658+ if is_prunable {
1659+ need_remove. push ( * counterparty_node_id) ;
1660+ } else if peer_state_lock. needs_persist {
1661+ need_persist. push ( * counterparty_node_id) ;
1662+ }
1663+ !is_prunable
1664+ } ) ;
1665+ }
16551666
16561667 for counterparty_node_id in need_persist. into_iter ( ) {
1668+ debug_assert ! ( !need_remove. contains( & counterparty_node_id) ) ;
16571669 self . persist_peer_state ( counterparty_node_id) . await ?;
16581670 }
16591671
1672+ for counterparty_node_id in need_remove {
1673+ let key = counterparty_node_id. to_string ( ) ;
1674+ self . kv_store
1675+ . remove (
1676+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
1677+ LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
1678+ & key,
1679+ true ,
1680+ )
1681+ . await ?;
1682+ }
1683+
16601684 Ok ( ( ) )
16611685 }
16621686
16631687 pub ( crate ) fn peer_disconnected ( & self , counterparty_node_id : PublicKey ) {
1664- let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1665- let is_prunable =
1666- if let Some ( inner_state_lock) = outer_state_lock. get ( & counterparty_node_id) {
1667- let mut peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
1668- peer_state_lock. prune_expired_request_state ( ) ;
1669- peer_state_lock. is_prunable ( )
1670- } else {
1671- return ;
1672- } ;
1673- if is_prunable {
1674- outer_state_lock. remove ( & counterparty_node_id) ;
1675- }
1676- }
1677-
1678- #[ allow( clippy:: bool_comparison) ]
1679- pub ( crate ) fn prune_peer_state ( & self ) {
1680- let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1681- outer_state_lock. retain ( |_, inner_state_lock| {
1688+ let outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1689+ if let Some ( inner_state_lock) = outer_state_lock. get ( & counterparty_node_id) {
16821690 let mut peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
1691+ // We clean up the peer state, but leave removing the peer entry to the prune logic in
1692+ // `persist` which removes it from the store.
16831693 peer_state_lock. prune_expired_request_state ( ) ;
1684- peer_state_lock. is_prunable ( ) == false
1685- } ) ;
1694+ }
16861695 }
16871696}
16881697
0 commit comments