Skip to content

Commit 10a19ec

Browse files
authored
Merge of #6975
2 parents 4b9c16f + e21590a commit 10a19ec

File tree

5 files changed

+106
-8
lines changed

5 files changed

+106
-8
lines changed

beacon_node/lighthouse_network/src/peer_manager/mod.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -712,8 +712,9 @@ impl<E: EthSpec> PeerManager<E> {
712712
}
713713

714714
/// Received a metadata response from a peer.
715-
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
715+
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) -> bool {
716716
let mut invalid_meta_data = false;
717+
let mut updated_cgc = false;
717718

718719
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
719720
if let Some(known_meta_data) = &peer_info.meta_data() {
@@ -729,12 +730,16 @@ impl<E: EthSpec> PeerManager<E> {
729730
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
730731
}
731732

733+
let known_custody_group_count = peer_info
734+
.meta_data()
735+
.and_then(|meta_data| meta_data.custody_group_count().copied().ok());
736+
732737
let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
733738
peer_info.set_meta_data(meta_data);
734739

735740
if self.network_globals.spec.is_peer_das_scheduled() {
736-
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
737-
// prioritize PeerDAS peers.
741+
// Gracefully ignore metadata/v2 peers.
742+
// We only send metadata v3 requests when PeerDAS is scheduled
738743
if let Some(custody_group_count) = custody_group_count_opt {
739744
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
740745
Ok(custody_groups) => {
@@ -755,6 +760,8 @@ impl<E: EthSpec> PeerManager<E> {
755760
})
756761
.collect();
757762
peer_info.set_custody_subnets(custody_subnets);
763+
764+
updated_cgc = Some(custody_group_count) != known_custody_group_count;
758765
}
759766
Err(err) => {
760767
debug!(
@@ -777,6 +784,8 @@ impl<E: EthSpec> PeerManager<E> {
777784
if invalid_meta_data {
778785
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
779786
}
787+
788+
updated_cgc
780789
}
781790

782791
/// Updates the gossipsub scores for all known peers in gossipsub.
@@ -1487,6 +1496,15 @@ impl<E: EthSpec> PeerManager<E> {
14871496
pub fn remove_trusted_peer(&mut self, enr: Enr) {
14881497
self.trusted_peers.remove(&enr);
14891498
}
1499+
1500+
#[cfg(test)]
1501+
fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option<usize> {
1502+
self.network_globals
1503+
.peers
1504+
.read()
1505+
.peer_info(peer_id)
1506+
.map(|peer_info| peer_info.custody_subnets_iter().count())
1507+
}
14901508
}
14911509

14921510
enum ConnectingType {
@@ -1507,8 +1525,9 @@ enum ConnectingType {
15071525
#[cfg(test)]
15081526
mod tests {
15091527
use super::*;
1528+
use crate::rpc::MetaDataV3;
15101529
use crate::NetworkConfig;
1511-
use types::MainnetEthSpec as E;
1530+
use types::{ChainSpec, ForkName, MainnetEthSpec as E};
15121531

15131532
async fn build_peer_manager(target_peer_count: usize) -> PeerManager<E> {
15141533
build_peer_manager_with_trusted_peers(vec![], target_peer_count).await
@@ -1517,6 +1536,15 @@ mod tests {
15171536
async fn build_peer_manager_with_trusted_peers(
15181537
trusted_peers: Vec<PeerId>,
15191538
target_peer_count: usize,
1539+
) -> PeerManager<E> {
1540+
let spec = Arc::new(E::default_spec());
1541+
build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await
1542+
}
1543+
1544+
async fn build_peer_manager_with_opts(
1545+
trusted_peers: Vec<PeerId>,
1546+
target_peer_count: usize,
1547+
spec: Arc<ChainSpec>,
15201548
) -> PeerManager<E> {
15211549
let config = config::Config {
15221550
target_peer_count,
@@ -1527,7 +1555,6 @@ mod tests {
15271555
target_peers: target_peer_count,
15281556
..Default::default()
15291557
});
1530-
let spec = Arc::new(E::default_spec());
15311558
let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec);
15321559
PeerManager::new(config, Arc::new(globals)).unwrap()
15331560
}
@@ -1878,6 +1905,44 @@ mod tests {
18781905
assert!(peers_should_have_removed.is_empty());
18791906
}
18801907

1908+
#[tokio::test]
1909+
/// Test a metadata response should update custody subnets
1910+
async fn test_peer_manager_update_custody_subnets() {
1911+
// PeerDAS is enabled from Fulu.
1912+
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
1913+
let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await;
1914+
let pubkey = Keypair::generate_secp256k1().public();
1915+
let peer_id = PeerId::from_public_key(&pubkey);
1916+
peer_manager.inject_connect_ingoing(
1917+
&peer_id,
1918+
Multiaddr::empty().with_p2p(peer_id).unwrap(),
1919+
None,
1920+
);
1921+
1922+
// A newly connected peer should have no custody subnets before metadata is received.
1923+
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
1924+
assert_eq!(custody_subnet_count, Some(0));
1925+
1926+
// Metadata should update the custody subnets.
1927+
let peer_cgc = 4;
1928+
let meta_data = MetaData::V3(MetaDataV3 {
1929+
seq_number: 0,
1930+
attnets: Default::default(),
1931+
syncnets: Default::default(),
1932+
custody_group_count: peer_cgc,
1933+
});
1934+
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone());
1935+
assert!(cgc_updated);
1936+
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
1937+
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
1938+
1939+
// Make another update and assert that CGC is not updated.
1940+
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data);
1941+
assert!(!cgc_updated);
1942+
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
1943+
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
1944+
}
1945+
18811946
#[tokio::test]
18821947
/// Test the pruning logic to remove grouped subnet peers
18831948
async fn test_peer_manager_prune_grouped_subnet_peers() {

beacon_node/lighthouse_network/src/service/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ pub enum NetworkEvent<E: EthSpec> {
103103
StatusPeer(PeerId),
104104
NewListenAddr(Multiaddr),
105105
ZeroListeners,
106+
/// A peer has an updated custody group count from MetaData.
107+
PeerUpdatedCustodyGroupCount(PeerId),
106108
}
107109

108110
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
@@ -1655,7 +1657,7 @@ impl<E: EthSpec> Network<E> {
16551657
return None;
16561658
}
16571659

1658-
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
1660+
// The PING RPC responses are handled within the behaviour and not propagated
16591661
match event.message {
16601662
Err(handler_err) => {
16611663
match handler_err {
@@ -1858,9 +1860,11 @@ impl<E: EthSpec> Network<E> {
18581860
None
18591861
}
18601862
RpcSuccessResponse::MetaData(meta_data) => {
1861-
self.peer_manager_mut()
1863+
let updated_cgc = self
1864+
.peer_manager_mut()
18621865
.meta_data_response(&peer_id, meta_data.as_ref().clone());
1863-
None
1866+
// Send event after calling into peer_manager so the PeerDB is updated.
1867+
updated_cgc.then(|| NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id))
18641868
}
18651869
/* Network propagated protocols */
18661870
RpcSuccessResponse::Status(msg) => {

beacon_node/network/src/router.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub enum RouterMessage<E: EthSpec> {
7373
PubsubMessage(MessageId, PeerId, PubsubMessage<E>, bool),
7474
/// The peer manager has requested we re-status a peer.
7575
StatusPeer(PeerId),
76+
/// The peer has an updated custody group count from METADATA.
77+
PeerUpdatedCustodyGroupCount(PeerId),
7678
}
7779

7880
impl<T: BeaconChainTypes> Router<T> {
@@ -155,6 +157,10 @@ impl<T: BeaconChainTypes> Router<T> {
155157
RouterMessage::PeerDisconnected(peer_id) => {
156158
self.send_to_sync(SyncMessage::Disconnect(peer_id));
157159
}
160+
// A peer has updated CGC
161+
RouterMessage::PeerUpdatedCustodyGroupCount(peer_id) => {
162+
self.send_to_sync(SyncMessage::UpdatedPeerCgc(peer_id));
163+
}
158164
RouterMessage::RPCRequestReceived {
159165
peer_id,
160166
inbound_request_id,

beacon_node/network/src/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,9 @@ impl<T: BeaconChainTypes> NetworkService<T> {
485485
NetworkEvent::PeerDisconnected(peer_id) => {
486486
self.send_to_router(RouterMessage::PeerDisconnected(peer_id));
487487
}
488+
NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id) => {
489+
self.send_to_router(RouterMessage::PeerUpdatedCustodyGroupCount(peer_id));
490+
}
488491
NetworkEvent::RequestReceived {
489492
peer_id,
490493
inbound_request_id,

beacon_node/network/src/sync/manager.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ pub enum SyncMessage<E: EthSpec> {
106106
head_slot: Option<Slot>,
107107
},
108108

109+
/// Peer manager has received a MetaData of a peer with a new or updated CGC value.
110+
UpdatedPeerCgc(PeerId),
111+
109112
/// A block has been received from the RPC.
110113
RpcBlock {
111114
sync_request_id: SyncRequestId,
@@ -476,6 +479,16 @@ impl<T: BeaconChainTypes> SyncManager<T> {
476479
}
477480
}
478481

482+
fn updated_peer_cgc(&mut self, _peer_id: PeerId) {
483+
// Try to make progress on custody requests that are waiting for peers
484+
for (id, result) in self.network.continue_custody_by_root_requests() {
485+
self.on_custody_by_root_result(id, result);
486+
}
487+
488+
// Attempt to resume range sync too
489+
self.range_sync.resume(&mut self.network);
490+
}
491+
479492
/// Handles RPC errors related to requests that were emitted from the sync manager.
480493
fn inject_error(&mut self, peer_id: PeerId, sync_request_id: SyncRequestId, error: RPCError) {
481494
trace!("Sync manager received a failed RPC");
@@ -748,6 +761,13 @@ impl<T: BeaconChainTypes> SyncManager<T> {
748761
} => {
749762
self.add_peers_force_range_sync(&peers, head_root, head_slot);
750763
}
764+
SyncMessage::UpdatedPeerCgc(peer_id) => {
765+
debug!(
766+
peer_id = ?peer_id,
767+
"Received updated peer CGC message"
768+
);
769+
self.updated_peer_cgc(peer_id);
770+
}
751771
SyncMessage::RpcBlock {
752772
sync_request_id,
753773
peer_id,

0 commit comments

Comments
 (0)