Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Split Peerset into reputation store & ProtocolControllers #13611

Merged
merged 106 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
143bc98
WIP: Introduce `ProtocolController`
dmitry-markin Mar 11, 2023
5256f19
Code review suggestions: docs and `trace`->`info`
dmitry-markin Mar 16, 2023
c333aac
Apply suggestions from code review
dmitry-markin Mar 16, 2023
b50f134
Rename `ConnectionState`->`PeerState` and reduce states to `Connected…
dmitry-markin Mar 16, 2023
2670615
Get rid of `Peer` abstraction
dmitry-markin Mar 16, 2023
f6b9bca
Apply suggestions from code review
dmitry-markin Mar 16, 2023
6c6fb75
Rework peer management + implement `alloc_slots`
dmitry-markin Mar 17, 2023
7d7d1d8
minor: naming
dmitry-markin Mar 17, 2023
de8e6d2
Apply suggestions from code review
dmitry-markin Mar 20, 2023
4ab043d
Apply review suggestions
dmitry-markin Mar 20, 2023
770a38a
Refactor `on_peer_dropped()`
dmitry-markin Mar 20, 2023
f4401cf
minor: fix docs
dmitry-markin Mar 20, 2023
91a66ec
Add more docs
dmitry-markin Mar 20, 2023
fd43963
Don't put connected peers into the list of regular nodes
dmitry-markin Mar 20, 2023
63385ff
minor: docs
dmitry-markin Mar 20, 2023
368eaed
WIP: add tests
dmitry-markin Mar 20, 2023
4c95216
minor: take no more connection candidates than slots available
dmitry-markin Mar 20, 2023
d09dc00
Test both connect/accept for reserved nodes
dmitry-markin Mar 21, 2023
7a25e8a
Test banned reserved nodes, fix `alloc_slots`
dmitry-markin Mar 21, 2023
9747eaf
Add more tests
dmitry-markin Mar 21, 2023
0b4a4c2
minor: remove unneded tokio dev dependecy
dmitry-markin Mar 21, 2023
29d6e35
Add test for disconnecting reserved peers
dmitry-markin Mar 22, 2023
d95562f
minor: clean `PeerSet` from `PeerStore` methods
dmitry-markin Mar 22, 2023
7a58b7e
Pass ignored nodes as hash set of references to `PeerStore`
dmitry-markin Mar 22, 2023
c1bb601
minor test fixes
dmitry-markin Mar 22, 2023
013ee1d
Better validation of `PeerStore` output
dmitry-markin Mar 22, 2023
7551cd0
Rename trait `PeerStore`->`PeerReputationProvider`
dmitry-markin Mar 22, 2023
4fb10a1
WIP: introduce `PeerStore` reputation storage
dmitry-markin Mar 22, 2023
67ae9cb
Apply suggestions from code review
dmitry-markin Mar 24, 2023
57d7ebd
minor: rustfmt
dmitry-markin Mar 24, 2023
7ac4180
Remove `DropReason` from peer dropped events
dmitry-markin Mar 24, 2023
4636536
Fix handling of invalid `PeerStore` output in `alloc_slots`
dmitry-markin Mar 24, 2023
e865c19
Implement `PeerStore`
dmitry-markin Mar 24, 2023
2587cdc
minor: docs
dmitry-markin Mar 24, 2023
06f8fdf
minor: TODO
dmitry-markin Mar 24, 2023
56d380b
minor: another TODO
dmitry-markin Mar 24, 2023
85bdadb
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin Mar 28, 2023
9c69f2d
Retain peer info for one hour after it was last updated
dmitry-markin Mar 31, 2023
554dbf8
Construct `PeerStore` from the list of bootnodes
dmitry-markin Mar 31, 2023
72ec71f
WIP: plug `PeerStore` and `ProtocolController` into `Peerset`
dmitry-markin Apr 4, 2023
086fb5b
WIP: implement `PeerStore` and `ProtocolController` polling via `Peer…
dmitry-markin Apr 4, 2023
7e370d9
Apply suggestions from code review
dmitry-markin Apr 5, 2023
64ee662
minor: improve error reporting
dmitry-markin Apr 5, 2023
c9109d7
Apply review suggestions
dmitry-markin Apr 5, 2023
8677480
WIP: update `PeerSet` use to match `PeerStore` and `ProtocolControlle…
dmitry-markin Apr 7, 2023
ff43c19
WIP: try to make `sc-network` compile
dmitry-markin Apr 7, 2023
f53e1a1
Restore the original API of `Peerset`
dmitry-markin Apr 10, 2023
4e1b773
docs: apply review suggestions
dmitry-markin Apr 10, 2023
23bf89b
Make substrate compile
dmitry-markin Apr 10, 2023
d74f8b9
Get rid of `PeersState`
dmitry-markin Apr 10, 2023
bfb9c7f
Fix bug with removing reserved node in `ProtocolController`, improve …
dmitry-markin Apr 11, 2023
c7aaf9f
minor: comment
dmitry-markin Apr 11, 2023
14c4f53
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin Apr 11, 2023
82c3351
Apply suggestions from code review
dmitry-markin Apr 13, 2023
90b018c
Apply more code review suggestions
dmitry-markin Apr 13, 2023
d839590
minor: use boxing instead of generics for `PeerStoreHandle` reference…
dmitry-markin Apr 14, 2023
7c8ac45
Test `PeerInfo::decay_reputation`
dmitry-markin Apr 17, 2023
06cb192
Make concurrency issues slightly milder
dmitry-markin Apr 20, 2023
bc4f608
Improve situation a little more
dmitry-markin Apr 20, 2023
8ed59d3
Try to make it work using rendezvous channels
dmitry-markin Apr 20, 2023
a2b9ca4
Make fuzz test skip over commands between `incoming` and `accept`/`re…
dmitry-markin Apr 26, 2023
14315a0
Update comment re concurrency issues with `ProtocolController`<->`Not…
dmitry-markin Apr 26, 2023
27178f8
Clean things up and fix warnings
dmitry-markin Apr 27, 2023
1ef145c
Fix error with assigning to local variable instead of reference
dmitry-markin Apr 27, 2023
535410b
Update tests with new incoming request handling logic
dmitry-markin Apr 27, 2023
1065fc6
Make `Notifications` skip over `Peerset` commands between `incoming` …
dmitry-markin Apr 28, 2023
9730a41
Fix `reconnect_after_disconnect` test
dmitry-markin Apr 29, 2023
92001f6
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin Apr 30, 2023
42f14b9
Fix rustdoc
dmitry-markin Apr 30, 2023
752d70b
Remove unused `ProtocolHandle::disconnect_peer`
dmitry-markin Apr 30, 2023
2fbfe6c
Add logging of reputation changes
dmitry-markin Apr 30, 2023
dc85f1f
Fix warnings
dmitry-markin Apr 30, 2023
9b33297
minor: change log level
dmitry-markin Apr 30, 2023
198504f
minor: remove unused import
dmitry-markin Apr 30, 2023
9e86b38
Apply suggestions from code review
dmitry-markin May 8, 2023
b3a72ad
minor: add more logging
dmitry-markin May 8, 2023
e03f852
Document event prioritization requirements in `ProtocolController`
dmitry-markin May 9, 2023
9a51e22
Rename `PeerReputationProvider` -> `PeerStoreProvider`
dmitry-markin May 9, 2023
a4e32a6
Add comment re dropped entries and `num_known_peers`
dmitry-markin May 9, 2023
0322d3d
Revert "Remove unused `ProtocolHandle::disconnect_peer`"
dmitry-markin May 9, 2023
0944c8e
Disconnect peers with reputation below `BANNED_THRESHOLD`
dmitry-markin May 9, 2023
fba6f2d
Switch to `parking_lot::Mutex`
dmitry-markin May 9, 2023
c94ea21
WIP: introduce outstanding events/actions handling
dmitry-markin May 10, 2023
34b2d1b
WIP: defer/process outstanding `SetReservedPeers` and `SetReservedOnly`
dmitry-markin May 11, 2023
ace2dd0
WIP: implement ACKing of `ProtocolController` messages
dmitry-markin May 11, 2023
76d4132
WIP: ACKs in `ProtocolController` (mostly done, but fuzz fails)
dmitry-markin May 12, 2023
4066995
minor: remove debugging logging
dmitry-markin May 12, 2023
80d3ff6
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin May 12, 2023
1fc523d
Fix failed merge of `master`
dmitry-markin May 12, 2023
ccdde89
minor: revert number of iterations in `fuzz` test
dmitry-markin May 12, 2023
2415e2a
Relax event order requirements in fuzz test
dmitry-markin May 15, 2023
4ee2a32
Relax `ProtocolController` message order requirements in `Notifications`
dmitry-markin May 15, 2023
6fcd68c
Get rid of ACKs
dmitry-markin May 15, 2023
8a268b2
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin May 15, 2023
9dfd6fe
Update docs to match the implementation
dmitry-markin May 15, 2023
4a2257f
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin May 15, 2023
601e4f6
Fix `Notifications` tests
dmitry-markin May 16, 2023
a99847c
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin May 16, 2023
263db27
Delete `Peerset` test duplicating `ProtocolController` tests
dmitry-markin May 16, 2023
47edf31
minor: add issue references in TODOs
dmitry-markin May 18, 2023
b50d998
Apply suggestions from code review
dmitry-markin May 18, 2023
2cd0188
rustfmt
dmitry-markin May 18, 2023
a0dcac4
monor: format after `rustfmt`
dmitry-markin May 18, 2023
6960b8f
Add in/out slot counters logging
dmitry-markin May 22, 2023
b7a5a48
Merge remote-tracking branch 'origin/master' into dm-peerset-splitting
dmitry-markin May 22, 2023
a03b9a0
Make clippy happy
dmitry-markin May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP: ACKs in ProtocolController (mostly done, but fuzz fails)
  • Loading branch information
dmitry-markin committed May 12, 2023
commit 76d41325c9edcf89ed9f9acdd5b9e413a57eb372
133 changes: 98 additions & 35 deletions client/peerset/src/protocol_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,27 +310,19 @@ impl ProtocolController {
///
/// Intended for tests only. Use `run` for driving [`ProtocolController`].
pub async fn next_action(&mut self) -> bool {
// Perform tasks prioritizing connection events processing
// (see the module documentation for details).
let mut emptied_outstanding = None;
let either = 'outer: loop {
// Receive ACKs from `Notifications` and process outstanding events & actions.
for (peer_id, outstanding) in self.outstanding.iter_mut() {
if outstanding.try_resolve_ack() {
if let Some(event) = outstanding.events.pop_front() {
emptied_outstanding = outstanding.is_empty().then_some(*peer_id);
break 'outer Either::Left(event)
}
if let Some(action) = outstanding.actions.pop_front() {
emptied_outstanding = outstanding.is_empty().then_some(*peer_id);
break 'outer Either::Right(action)
}
let either = loop {
log::debug!(target: LOG_TARGET, "Outstanding: {:?}", self.outstanding);

// Resolve ACKs & process outstanding events/actions. We might want to do this after
// `alloc_slots`
if let Some(either) = self.resolve_acks_once() {
match either {
Either::Left(event) => self.process_event(event),
Either::Right(action) => self.process_action(action),
}
return true
}

// If we have received some ACKs, we might be able to remove obsolete entries.
self.outstanding.retain(|_, outstanding| !outstanding.is_empty());

// Apply postponed `SetReservedOnly` action
// TODO: we might need to do it after all events processing.
if self.outstanding.is_empty() {
Expand Down Expand Up @@ -359,39 +351,82 @@ impl ProtocolController {
}
};

// Remove outstanding entry that became empty.
emptied_outstanding.map(|peer_id| self.outstanding.remove(&peer_id));

match either {
Either::Left(event) => self.process_event(event),
Either::Right(action) => self.process_action(action),
Either::Left(event) => self.push_event(event),
Either::Right(action) => self.push_action(action),
}

true
}

/// Process connection event.
fn process_event(&mut self, event: Event) {
/// Receive ACKs from `Notifications` and process outstanding events & actions.
/// This function must be called repeatedly intil it yields `None`.
fn resolve_acks_once(&mut self) -> Option<Either<Event, Action>> {
let mut result = None;
let mut emptied_entries = Vec::new();

for (peer_id, outstanding) in self.outstanding.iter_mut() {
if outstanding.try_resolve_ack() {
if let Some(event) = outstanding.events.pop_front() {
if outstanding.is_empty() {
emptied_entries.push(*peer_id);
}
result = Some(Either::Left(event));
break
}

if let Some(action) = outstanding.actions.pop_front() {
if outstanding.is_empty() {
emptied_entries.push(*peer_id);
}
result = Some(Either::Right(action));
break
}

emptied_entries.push(*peer_id);
}
}

// Remove outstanding entries that became empty.
emptied_entries.iter().for_each(|peer_id| {
self.outstanding.remove(&peer_id);
});

result
}

/// Push connection event for processing
fn push_event(&mut self, event: Event) {
if let Some(outstanding) = self.outstanding.get_mut(event.peer_id()) {
outstanding.events.push_back(event);
} else {
match event {
Event::IncomingConnection(peer_id, index) =>
self.on_incoming_connection(peer_id, index),
Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
}
self.process_event(event);
}
}

/// Process action command.
fn process_action(&mut self, action: Action) {
/// Process connection event.
fn process_event(&mut self, event: Event) {
match event {
Event::IncomingConnection(peer_id, index) =>
self.on_incoming_connection(peer_id, index),
Event::Dropped(peer_id) => self.on_peer_dropped(peer_id),
}
}

/// Push action command for processing
fn push_action(&mut self, action: Action) {
if let Some(peer_id) = action.peer_id() {
if let Some(outstanding) = self.outstanding.get_mut(peer_id) {
outstanding.actions.push_back(action);
return
}
}

self.process_action(action);
}

/// Process action command.
fn process_action(&mut self, action: Action) {
match action {
Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id),
Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id),
Expand Down Expand Up @@ -438,9 +473,10 @@ impl ProtocolController {
trace!(target: LOG_TARGET, "Connecting to {peer_id}.");

let tx_ack = self.insert_outstanding_ack(peer_id);
let _ = self
.to_notifications
.unbounded_send(AckedMessage(Message::Connect { set_id: self.set_id, peer_id }, tx_ack));
let _ = self.to_notifications.unbounded_send(AckedMessage(
Message::Connect { set_id: self.set_id, peer_id },
tx_ack,
));
}

/// Send "drop" message to `Notifications`.
Expand Down Expand Up @@ -928,6 +964,9 @@ mod tests {
assert_eq!(controller.num_out, 0);
assert_eq!(controller.num_in, 0);

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Drop connections to be able to accept reserved nodes.
controller.on_peer_dropped(reserved1);
controller.on_peer_dropped(reserved2);
Expand Down Expand Up @@ -1040,6 +1079,9 @@ mod tests {
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved1 }));
assert!(messages.contains(&Message::Connect { set_id: SetId(0), peer_id: reserved2 }));

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Drop both reserved nodes.
controller.on_peer_dropped(reserved1);
controller.on_peer_dropped(reserved2);
Expand Down Expand Up @@ -1393,6 +1435,9 @@ mod tests {
assert_eq!(controller.num_out, 1);
assert_eq!(controller.num_in, 1);

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Switch to reserved-only mode.
controller.on_set_reserved_only(true);

Expand Down Expand Up @@ -1478,6 +1523,9 @@ mod tests {
assert!(controller.reserved_nodes.contains_key(&reserved2));
assert!(controller.nodes.is_empty());

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Remove reserved node
controller.on_remove_reserved_peer(reserved1);
assert_eq!(
Expand Down Expand Up @@ -1621,6 +1669,9 @@ mod tests {
assert_eq!(controller.num_in, 1);
assert_eq!(controller.num_out, 1);

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

controller.on_disconnect_peer(peer1);
assert_eq!(
rx.try_recv().unwrap().take(),
Expand Down Expand Up @@ -1797,6 +1848,9 @@ mod tests {
Some(PeerState::Connected(Direction::Outbound))
));

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Incoming request for `reserved1`.
controller.on_incoming_connection(reserved1, IncomingIndex(2));
assert_eq!(rx.try_recv().ok().unwrap().take(), Message::Accept(IncomingIndex(2)));
Expand Down Expand Up @@ -1856,6 +1910,9 @@ mod tests {
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Incoming request for `regular1`.
controller.on_incoming_connection(regular1, IncomingIndex(1));
assert_eq!(rx.try_recv().ok().unwrap().take(), Message::Accept(IncomingIndex(1)));
Expand Down Expand Up @@ -1910,6 +1967,9 @@ mod tests {
assert_eq!(rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert!(matches!(controller.nodes.get(&regular2).unwrap(), Direction::Inbound,));

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Incoming request for `regular1`.
controller.on_incoming_connection(regular1, IncomingIndex(1));
assert_eq!(rx.try_recv().ok().unwrap().take(), Message::Reject(IncomingIndex(1)));
Expand Down Expand Up @@ -1965,6 +2025,9 @@ mod tests {

controller.max_in = 0;

// Resolve ACKs before manually injecting more events for the same peers.
assert!(controller.resolve_acks_once().is_none());

// Incoming request for `regular1`.
controller.on_incoming_connection(regular1, IncomingIndex(1));
assert_eq!(rx.try_recv().ok().unwrap().take(), Message::Reject(IncomingIndex(1)));
Expand Down
44 changes: 21 additions & 23 deletions client/peerset/tests/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,23 @@ fn test_once() {

let (mut peerset, peerset_handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
// bootnodes: (0..Uniform::new_inclusive(0, 4).sample(&mut rng))
// .map(|_| {
// let id = PeerId::random();
// known_nodes.insert(id, State::Disconnected);
// id
// })
// .collect(),
// reserved_nodes: {
// (0..Uniform::new_inclusive(0, 2).sample(&mut rng))
// .map(|_| {
// let id = PeerId::random();
// known_nodes.insert(id, State::Disconnected);
// reserved_nodes.insert(id);
// id
// })
// .collect()
// },
bootnodes: Vec::new(),
reserved_nodes: HashSet::new(),
bootnodes: (0..Uniform::new_inclusive(0, 4).sample(&mut rng))
.map(|_| {
let id = PeerId::random();
known_nodes.insert(id, State::Disconnected);
id
})
.collect(),
reserved_nodes: {
(0..Uniform::new_inclusive(0, 2).sample(&mut rng))
.map(|_| {
let id = PeerId::random();
known_nodes.insert(id, State::Disconnected);
reserved_nodes.insert(id);
id
})
.collect()
},
in_peers: Uniform::new_inclusive(0, 25).sample(&mut rng),
out_peers: Uniform::new_inclusive(0, 25).sample(&mut rng),
reserved_only: Uniform::new_inclusive(0, 10).sample(&mut rng) == 0,
Expand All @@ -168,7 +166,7 @@ fn test_once() {

// Perform a certain number of actions while checking that the state is consistent. If we
// reach the end of the loop, the run has succeeded.
for _ in 0..2500 {
for _ in 0..25000 {
// Peer we are working with.
let mut current_peer = None;
// Current event for event bigrams validation.
Expand Down Expand Up @@ -291,9 +289,9 @@ fn test_once() {

// If we generate 1, discover a new node.
1 => {
// let new_id = PeerId::random();
// known_nodes.insert(new_id, State::Disconnected);
// peerset_handle.add_known_peer(new_id);
let new_id = PeerId::random();
known_nodes.insert(new_id, State::Disconnected);
peerset_handle.add_known_peer(new_id);
},

// If we generate 2, adjust a random reputation.
Expand Down