diff --git a/src/libp2p/peers.rs b/src/libp2p/peers.rs index cb0cd8d14b..7312e6874b 100644 --- a/src/libp2p/peers.rs +++ b/src/libp2p/peers.rs @@ -148,6 +148,21 @@ pub struct Peers { /// Keys are combinations of `(peer_index, notifications_protocol_index)`. Values are the /// state of the corresponding outbound notifications substream. peers_notifications_out: BTreeMap<(usize, usize), NotificationsOutState>, + + /// Subset of [`Peers::peers_notifications_out`]. Only contains entries that are desired + /// and not open, and for which there exists a non-shutting down established connection with + /// the peer. + /// Keys are combinations of `(peer_index, notifications_protocol_index)`. Values indicate + /// whether the substream is in the `ClosedByRemote` state. + unfulfilled_desired_outbound_substreams: + hashbrown::HashMap<(usize, usize), bool, fnv::FnvBuildHasher>, + + /// Subset of [`Peers::peers_notifications_out`]. Only contains entries that are not desired + /// and open or pending. + /// Keys are combinations of `(peer_index, notifications_protocol_index)`. Values are the + /// state of the corresponding outbound notifications substream. + fulfilled_undesired_outbound_substreams: + hashbrown::HashMap<(usize, usize), OpenOrPending, fnv::FnvBuildHasher>, } /// See [`Peers::peers_notifications_out`]. @@ -223,6 +238,14 @@ where Default::default(), ), peers_notifications_out: BTreeMap::new(), + unfulfilled_desired_outbound_substreams: hashbrown::HashMap::with_capacity_and_hasher( + config.peers_capacity, + Default::default(), + ), + fulfilled_undesired_outbound_substreams: hashbrown::HashMap::with_capacity_and_hasher( + config.peers_capacity, + Default::default(), + ), peers_notifications_in: BTreeSet::new(), } } @@ -355,6 +378,32 @@ where None }; + for ((_, notifications_protocol_index), state) in self + .peers_notifications_out + .range( + (actual_peer_index, usize::min_value()) + ..=(actual_peer_index, usize::max_value()), + ) + .filter(|(_, state)| { + state.desired + && matches!( + state.open, + NotificationsOutOpenState::NotOpen + | NotificationsOutOpenState::ClosedByRemote + ) + }) + { + let _prev_value = self.unfulfilled_desired_outbound_substreams.insert( + (actual_peer_index, *notifications_protocol_index), + match state.open { + NotificationsOutOpenState::NotOpen => false, + NotificationsOutOpenState::ClosedByRemote => true, + _ => unreachable!(), + }, + ); + debug_assert!(_prev_value.is_none()); + } + let num_healthy_peer_connections = { let num = self .connections_by_peer @@ -443,6 +492,29 @@ where u32::try_from(num).unwrap() }; + if num_healthy_peer_connections == 0 { + for ((_, notifications_protocol_index), _) in self + .peers_notifications_out + .range( + (peer_index, usize::min_value()) + ..=(peer_index, usize::max_value()), + ) + .filter(|(_, state)| { + state.desired + && matches!( + state.open, + NotificationsOutOpenState::NotOpen + | NotificationsOutOpenState::ClosedByRemote + ) + }) + { + let _was_in = self + .unfulfilled_desired_outbound_substreams + .remove(&(peer_index, *notifications_protocol_index)); + debug_assert!(_was_in.is_some()); + } + } + ShutdownPeer::Established { peer_id, num_healthy_peer_connections, @@ -613,17 +685,44 @@ where if result.is_ok() { notification_out.open = NotificationsOutOpenState::Open(substream_id); - // TODO: close if `!desired` } else { notification_out.open = NotificationsOutOpenState::ClosedByRemote; self.inner_notification_substreams .remove(&substream_id) .unwrap(); - // Remove entry from map if it has become useless. + // Update the map entries. if !desired { self.peers_notifications_out .remove(&(peer_index, notifications_protocol_index)); + debug_assert!(!self + .unfulfilled_desired_outbound_substreams + .contains_key(&(peer_index, notifications_protocol_index))); + let _was_in = self + .fulfilled_undesired_outbound_substreams + .remove(&(peer_index, notifications_protocol_index)); + debug_assert!(matches!(_was_in, Some(OpenOrPending::Pending))); + } else { + if self + .connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + { + let _prev_value = self + .unfulfilled_desired_outbound_substreams + .insert((peer_index, notifications_protocol_index), true); + debug_assert!(_prev_value.is_none()); + } + + debug_assert!(!self + .fulfilled_undesired_outbound_substreams + .contains_key(&(peer_index, notifications_protocol_index))); } } @@ -659,10 +758,38 @@ where )); notification_out.open = NotificationsOutOpenState::ClosedByRemote; - // Remove entry from map if it has become useless. + // Update the maps. if !notification_out.desired { self.peers_notifications_out .remove(&(peer_index, notifications_protocol_index)); + debug_assert!(!self + .unfulfilled_desired_outbound_substreams + .contains_key(&(peer_index, notifications_protocol_index))); + let _was_in = self + .fulfilled_undesired_outbound_substreams + .remove(&(peer_index, notifications_protocol_index)); + debug_assert!(matches!(_was_in, Some(OpenOrPending::Open))); + } else { + if self + .connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + { + let _prev_value = self + .unfulfilled_desired_outbound_substreams + .insert((peer_index, notifications_protocol_index), true); + debug_assert!(_prev_value.is_none()); + } + + debug_assert!(!self + .fulfilled_undesired_outbound_substreams + .contains_key(&(peer_index, notifications_protocol_index))); } return Some(Event::NotificationsOutClose { @@ -1066,6 +1193,46 @@ where current_state.open = NotificationsOutOpenState::NotOpen; } + // Add to `unfulfilled_desired_outbound_substreams` if there exists a connection. + if matches!( + current_state.open, + NotificationsOutOpenState::NotOpen | NotificationsOutOpenState::ClosedByRemote + ) { + if self + .connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + { + let _prev_value = self.unfulfilled_desired_outbound_substreams.insert( + (peer_index, notification_protocol), + match current_state.open { + NotificationsOutOpenState::NotOpen => false, + NotificationsOutOpenState::ClosedByRemote => true, + _ => unreachable!(), + }, + ); + debug_assert!(_prev_value.is_none()); + } + } + + // Remove substream from `fulfilled_undesired_outbound_substreams`, as it is + // no longer undesired. + if matches!( + current_state.open, + NotificationsOutOpenState::Open(_) | NotificationsOutOpenState::Opening(_) + ) { + let _was_in = self + .fulfilled_undesired_outbound_substreams + .remove(&(peer_index, notification_protocol)); + debug_assert!(_was_in.is_some()); + } + // Insert in `unfulfilled_desired_peers` if there is no non-shutting-down established // or handshaking connection of that peer. if !self @@ -1091,6 +1258,46 @@ where current_state.get_mut().desired = false; + // Remove substream from `unfulfilled_desired_outbound_substreams`, as it is no longer + // desired. + if matches!( + current_state.get().open, + NotificationsOutOpenState::NotOpen | NotificationsOutOpenState::ClosedByRemote + ) { + let _was_in = self + .unfulfilled_desired_outbound_substreams + .remove(&(peer_index, notification_protocol)); + debug_assert_eq!( + _was_in.is_some(), + self.connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + ); + } + + // Insert substream into `fulfilled_undesired_outbound_substreams`, as it is + // now undesired. + if matches!( + current_state.get().open, + NotificationsOutOpenState::Open(_) | NotificationsOutOpenState::Opening(_) + ) { + let _pre_value = self.fulfilled_undesired_outbound_substreams.insert( + (peer_index, notification_protocol), + match current_state.get().open { + NotificationsOutOpenState::Open(_) => OpenOrPending::Open, + NotificationsOutOpenState::Opening(_) => OpenOrPending::Pending, + _ => unreachable!(), + }, + ); + debug_assert!(_pre_value.is_none()); + } + // Clean up the entry altogether if it is no longer needed. if matches!( current_state.get().open, @@ -1165,47 +1372,15 @@ where &'_ self, include_already_tried: bool, ) -> impl Iterator + '_ { - // TODO: this is O(n), maybe add a cache - self.peers_notifications_out.iter().filter_map( - move |((peer_index, notifications_protocol_index), state)| { - if !state.desired { - return None; - } - - if !matches!( - state.open, - NotificationsOutOpenState::NotOpen | NotificationsOutOpenState::ClosedByRemote - ) { - return None; - } - - if !include_already_tried - && matches!(state.open, NotificationsOutOpenState::ClosedByRemote) - { - return None; - } - - if !self - .connections_by_peer - .range( - (*peer_index, collection::ConnectionId::min_value()) - ..=(*peer_index, collection::ConnectionId::max_value()), - ) - .map(|(_, connection_id)| *connection_id) - .any(|connection_id| { - let state = self.inner.connection_state(connection_id); - state.established && !state.shutting_down - }) - { - return None; - } - - Some(( + self.unfulfilled_desired_outbound_substreams + .iter() + .filter(move |((_, _), already_tried)| **already_tried == include_already_tried) + .map(|((peer_index, notifications_protocol_index), _)| { + ( &self.peers[*peer_index].peer_id, *notifications_protocol_index, - )) - }, - ) + ) + }) } /// Open a new outgoing substream to the given peer. The peer-protocol combination must have @@ -1263,6 +1438,11 @@ where .insert(substream_id, (connection_id, notifications_protocol_index)); debug_assert!(_prev_value.is_none()); + let _was_in = self + .unfulfilled_desired_outbound_substreams + .remove(&(peer_index, notifications_protocol_index)); + debug_assert!(_was_in.is_some()); + notif_state.open = NotificationsOutOpenState::Opening(substream_id); } @@ -1273,24 +1453,13 @@ where pub fn fulfilled_undesired_outbound_substreams( &'_ self, ) -> impl Iterator + '_ { - // TODO: this is O(n), maybe add a cache - self.peers_notifications_out.iter().filter_map( - move |((peer_index, notifications_protocol_index), state)| { - if state.desired { - return None; - } - - let open_or_pending = match state.open { - NotificationsOutOpenState::Open(_) => OpenOrPending::Open, - NotificationsOutOpenState::Opening(_) => OpenOrPending::Pending, - _ => return None, - }; - - Some(( + self.fulfilled_undesired_outbound_substreams.iter().map( + |((peer_index, notifications_protocol_index), open_or_pending)| { + ( &self.peers[*peer_index].peer_id, *notifications_protocol_index, - open_or_pending, - )) + *open_or_pending, + ) }, ) } @@ -1346,6 +1515,30 @@ where self.inner.close_out_notifications(substream_id); entry.get_mut().open = NotificationsOutOpenState::NotOpen; + if entry.get().desired { + if self + .connections_by_peer + .range( + (peer_index, ConnectionId::min_value()) + ..=(peer_index, ConnectionId::max_value()), + ) + .any(|(_, connection_id)| { + let state = self.inner.connection_state(*connection_id); + state.established && !state.shutting_down + }) + { + let _prev_value = self + .unfulfilled_desired_outbound_substreams + .insert((peer_index, notifications_protocol_index), false); + debug_assert!(_prev_value.is_none()); + } + } else { + let _was_in = self + .fulfilled_undesired_outbound_substreams + .remove(&(peer_index, notifications_protocol_index)); + debug_assert_eq!(_was_in, Some(open_or_pending)); + } + // Clean up the data structure. if !entry.get_mut().desired { entry.remove();