Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lookup disconnect peer #5815

Merged
merged 2 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
{
for &peer_id in lookup.all_used_peers() {
for &peer_id in lookup.all_peers() {
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
Expand Down Expand Up @@ -387,8 +387,15 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
self.single_block_lookups.retain(|_, lookup| {
if lookup.remove_peer(peer_id) {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
lookup.remove_peer(peer_id);

// Note: this condition should be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
if lookup.has_no_peers() {
debug!(self.log,
"Dropping single lookup after peer disconnection";
"block_root" => ?lookup.block_root()
);
false
} else {
true
Expand Down Expand Up @@ -545,7 +552,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx)
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
let peers = lookup.all_peers().copied().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "id" => lookup.id, "block_root" => ?block_root, "parent_root" => ?parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Expand Down
105 changes: 29 additions & 76 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::sync::network_context::{
};
use beacon_chain::BeaconChainTypes;
use derivative::Derivative;
use itertools::Itertools;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;
Expand Down Expand Up @@ -64,6 +63,9 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub id: Id,
pub block_request_state: BlockRequestState<T::EthSpec>,
pub blob_request_state: BlobRequestState<T::EthSpec>,
/// Peers that claim to have imported this set of block components
#[derivative(Debug(format_with = "fmt_peer_set_as_len"))]
peers: HashSet<PeerId>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
Expand All @@ -78,8 +80,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
) -> Self {
Self {
id,
block_request_state: BlockRequestState::new(requested_block_root, peers),
blob_request_state: BlobRequestState::new(requested_block_root, peers),
block_request_state: BlockRequestState::new(requested_block_root),
blob_request_state: BlobRequestState::new(requested_block_root),
peers: HashSet::from_iter(peers.iter().copied()),
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
Expand Down Expand Up @@ -134,22 +137,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}

/// Get all unique used peers across block and blob requests.
pub fn all_used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_used_peers()
.chain(self.blob_request_state.state.get_used_peers())
.unique()
}

/// Get all unique available peers across block and blob requests.
pub fn all_available_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_available_peers()
.chain(self.blob_request_state.state.get_available_peers())
.unique()
/// Get all unique peers that claim to have imported this set of block components
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
}

/// Makes progress on all requests of this lookup. Any error is not recoverable and must result
Expand Down Expand Up @@ -198,7 +188,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
return Err(LookupRequestError::TooManyAttempts { cannot_process });
}

let Some(peer_id) = request.get_state_mut().use_rand_available_peer() else {
let Some(peer_id) = self.use_rand_available_peer() else {
if awaiting_parent {
// Allow lookups awaiting for a parent to have zero peers. If when the parent
// resolve they still have zero peers the lookup will fail gracefully.
Expand All @@ -208,6 +198,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
}
};

let request = R::request_state_mut(self);
match request.make_request(id, peer_id, downloaded_block_expected_blobs, cx)? {
LookupRequestResult::RequestSent(req_id) => {
request.get_state_mut().on_download_start(req_id)?
Expand Down Expand Up @@ -238,9 +229,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Add peer to all request states. The peer must be able to serve this request.
/// Returns true if the peer was newly inserted into some request state.
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
let inserted_block = self.block_request_state.state.add_peer(&peer_id);
let inserted_blob = self.blob_request_state.state.add_peer(&peer_id);
inserted_block || inserted_blob
self.peers.insert(peer_id)
}

/// Returns true if the block has already been downloaded.
Expand All @@ -252,8 +241,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id)
&& self.blob_request_state.state.remove_peer(peer_id)
self.peers.remove(peer_id)
}

/// Returns true if this lookup has zero peers
pub fn has_no_peers(&self) -> bool {
self.peers.is_empty()
}

/// Selects a random peer from available peers if any
fn use_rand_available_peer(&mut self) -> Option<PeerId> {
self.peers.iter().choose(&mut rand::thread_rng()).copied()
}
}

Expand All @@ -267,10 +265,10 @@ pub struct BlobRequestState<E: EthSpec> {
}

impl<E: EthSpec> BlobRequestState<E> {
pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self {
pub fn new(block_root: Hash256) -> Self {
Self {
block_root,
state: SingleLookupRequestState::new(peer_source),
state: SingleLookupRequestState::new(),
}
}
}
Expand All @@ -285,10 +283,10 @@ pub struct BlockRequestState<E: EthSpec> {
}

impl<E: EthSpec> BlockRequestState<E> {
pub fn new(block_root: Hash256, peers: &[PeerId]) -> Self {
pub fn new(block_root: Hash256) -> Self {
Self {
requested_block_root: block_root,
state: SingleLookupRequestState::new(peers),
state: SingleLookupRequestState::new(),
}
}
}
Expand Down Expand Up @@ -318,29 +316,16 @@ pub enum State<T: Clone> {
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State<T>,
/// Peers that should have this block or blob.
#[derivative(Debug(format_with = "fmt_peer_set"))]
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
#[derivative(Debug = "ignore")]
used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
/// How many times have we attempted to download this block or blob.
failed_downloading: u8,
}

impl<T: Clone> SingleLookupRequestState<T> {
pub fn new(peers: &[PeerId]) -> Self {
let mut available_peers = HashSet::default();
for peer in peers.iter().copied() {
available_peers.insert(peer);
}

pub fn new() -> Self {
Self {
state: State::AwaitingDownload,
available_peers,
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
}
Expand Down Expand Up @@ -518,38 +503,6 @@ impl<T: Clone> SingleLookupRequestState<T> {
pub fn more_failed_processing_attempts(&self) -> bool {
self.failed_processing >= self.failed_downloading
}

/// Add peer to this request states. The peer must be able to serve this request.
/// Returns true if the peer is newly inserted.
pub fn add_peer(&mut self, peer_id: &PeerId) -> bool {
self.available_peers.insert(*peer_id)
}

/// Remove peer from available peers. Return true if there are no more available peers and the
/// request is not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
self.available_peers.remove(disconnected_peer_id);
self.available_peers.is_empty() && self.is_awaiting_download()
}

pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
self.used_peers.iter()
}

pub fn get_available_peers(&self) -> impl Iterator<Item = &PeerId> {
self.available_peers.iter()
}

/// Selects a random peer from available peers if any, inserts it in used peers and returns it.
pub fn use_rand_available_peer(&mut self) -> Option<PeerId> {
let peer_id = self
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()?;
self.used_peers.insert(peer_id);
Some(peer_id)
}
}

// Display is used in the BadState assertions above
Expand All @@ -573,7 +526,7 @@ impl<T: Clone> std::fmt::Debug for State<T> {
}
}

fn fmt_peer_set(
fn fmt_peer_set_as_len(
peer_set: &HashSet<PeerId>,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
Expand Down
22 changes: 19 additions & 3 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,10 @@ impl TestRig {

fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));
}

// Return RPCErrors for all active requests of peer
/// Return RPCErrors for all active requests of peer
fn rpc_error_all_active_requests(&mut self, disconnected_peer_id: PeerId) {
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
Expand Down Expand Up @@ -1265,27 +1267,41 @@ fn test_parent_lookup_too_deep() {
}

#[test]
fn test_parent_lookup_disconnection_no_peers_left() {
fn test_lookup_peer_disconnected_no_peers_left_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
rig.rpc_error_all_active_requests(peer_id);
rig.expect_no_active_lookups();
}

#[test]
fn test_lookup_peer_disconnected_no_peers_left_not_while_request() {
let mut rig = TestRig::test_setup();
let peer_id = rig.new_connected_peer();
let trigger_block = rig.rand_block();
rig.trigger_unknown_parent_block(peer_id, trigger_block.into());
rig.peer_disconnected(peer_id);
// Note: this test case may be removed in the future. It's not strictly necessary to drop a
// lookup if there are no peers left. Lookup should only be dropped if it can not make progress
rig.expect_no_active_lookups();
}

#[test]
fn test_lookup_disconnection_peer_left() {
let mut rig = TestRig::test_setup();
let peer_ids = (0..2).map(|_| rig.new_connected_peer()).collect::<Vec<_>>();
let disconnecting_peer = *peer_ids.first().unwrap();
let block_root = Hash256::random();
// lookup should have two peers associated with the same block
for peer_id in peer_ids.iter() {
rig.trigger_unknown_block_from_attestation(block_root, *peer_id);
}
// Disconnect the first peer only, which is the one handling the request
rig.peer_disconnected(*peer_ids.first().unwrap());
rig.peer_disconnected(disconnecting_peer);
rig.rpc_error_all_active_requests(disconnecting_peer);
rig.assert_single_lookups_count(1);
}

Expand Down
Loading