Skip to content

feat(portalnet): handle content that doesn't depend on distance to content #1859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions crates/portalnet/src/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::collections::HashMap;

use discv5::Enr;
use ethportal_api::{
types::{distance::Metric, portal::MAX_CONTENT_KEYS_PER_OFFER},
utils::bytes::hex_encode_compact,
OverlayContentKey, RawContentKey, RawContentValue,
};
use itertools::{chain, Itertools};
use tracing::{debug, warn};

use crate::types::kbucket::SharedKBucketsTable;

mod neighborhood;
mod random;
Comment on lines +14 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we put these above imports?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that I was seeing them go under imports, but I can't find where. All that I found now is that there is no strict rule or even strong recommentation.

Then I checked some of the popular crates and it's not easy to find mod.rs files that have similar structure, but here are few that do it this way:

I also checked our code base and it seems that we are using both. So I'm just going to leave it as is. If we agree to use one style over the other, we should go and clean up entire codebase (but considering that rust core is doing imports first, I would prefer that one).


/// Selects peers and content that should be gossiped to them.
///
/// Every peer will have at least 1 and at most `MAX_CONTENT_KEYS_PER_OFFER` content assigned to
/// them. If more than `MAX_CONTENT_KEYS_PER_OFFER` is passed as an argument, it's possible that
/// no peers will be selected to propagate that content (even if they exist). The order of content
/// in the returned collection will be the same as the one that is passed (this might be important
/// for some content types).
///
/// This function is designed such that either all content is affected by radius (in which case we
/// use "neighborhood gossip" strategy) or not (in which case we use "random gossip" strategy).
/// If content contains the mix of the two types, then content is split and each gossip strategy is
/// applied to respective part. This can lead to suboptimal result when we select more peers than
/// it is needed.
pub fn gossip_recipients<TContentKey: OverlayContentKey, TMetric: Metric>(
content: Vec<(TContentKey, RawContentValue)>,
kbuckets: &SharedKBucketsTable,
) -> HashMap<Enr, Vec<(RawContentKey, RawContentValue)>> {
// Precalculate "content_id" and "raw_content_key" and use references going forward
let content = content
.into_iter()
.map(|(key, raw_value)| {
let id = key.content_id();
let raw_key = key.to_bytes();
(id, key, raw_key, raw_value)
})
.collect::<Vec<_>>();

debug!(
ids = ?content.iter().map(|(id, _, _, _)| hex_encode_compact(id)),
"selecing gossip recipients"
);

// Split content id+key depending on whether they are affected by radius
let (content_for_neighborhood_gossip, content_for_random_gossip) = content
.iter()
.map(|(id, key, _raw_key, _raw_value)| (id, key))
.partition::<Vec<_>, _>(|(_content_id, content_key)| content_key.affected_by_radius());
if !content_for_neighborhood_gossip.is_empty() && !content_for_random_gossip.is_empty() {
warn!("Expected to gossip content with both neighborhood_gossip and random_gossip");
}

// Combine results of "neighborhood gossip" and "random gossip".
let peer_to_content_ids = chain!(
neighborhood::gossip_recipients::<_, TMetric>(content_for_neighborhood_gossip, kbuckets),
random::gossip_recipients(content_for_random_gossip, kbuckets)
)
.into_grouping_map()
.reduce(|mut all_content_to_gossip, _enr, content_ids| {
all_content_to_gossip.extend(content_ids);
all_content_to_gossip
});

// Extract raw content key/value in hash map for easier lookup.
let raw_content = content
.iter()
.map(|(id, _key, raw_key, raw_value)| (id, (raw_key, raw_value)))
.collect::<HashMap<_, _>>();

peer_to_content_ids
.into_iter()
.map(|(enr, content_ids)| {
let raw_content_key_value = content_ids
.into_iter()
// Select at most `MAX_CONTENT_KEYS_PER_OFFER`
.take(MAX_CONTENT_KEYS_PER_OFFER)
.map(|content_id| {
let (raw_content_key, raw_content_value) = raw_content[content_id];
(raw_content_key.clone(), raw_content_value.clone())
})
.collect();
(enr, raw_content_key_value)
})
.collect()
}
222 changes: 222 additions & 0 deletions crates/portalnet/src/gossip/neighborhood.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use std::collections::HashMap;

use alloy::hex::ToHexExt;
use discv5::Enr;
use ethportal_api::{types::distance::Metric, OverlayContentKey};
use rand::{rng, seq::IteratorRandom};
use tracing::{debug, error};

use crate::types::kbucket::SharedKBucketsTable;

pub fn gossip_recipients<'c, TContentKey: OverlayContentKey, TMetric: Metric>(
content: Vec<(&'c [u8; 32], &TContentKey)>,
kbuckets: &SharedKBucketsTable,
) -> HashMap<Enr, Vec<&'c [u8; 32]>> {
if content.is_empty() {
return HashMap::new();
}

let content_ids = content
.iter()
.map(|(content_id, _content_key)| *content_id)
.collect::<Vec<_>>();

// Map from content_ids to interested ENRs
let mut content_id_to_interested_enrs = kbuckets.batch_interested_enrs::<TMetric>(&content_ids);

// Map from ENRs to content they will put content
let mut enrs_and_content: HashMap<Enr, Vec<&'c [u8; 32]>> = HashMap::new();
for (content_id, content_key) in content {
let interested_enrs = content_id_to_interested_enrs.remove(content_id).unwrap_or_else(|| {
error!("interested_enrs should contain all content ids, even if there are no interested ENRs");
vec![]
});
if interested_enrs.is_empty() {
debug!(
content.id = content_id.encode_hex_with_prefix(),
content.key = %content_key.to_bytes(),
"No peers eligible for neighborhood gossip"
);
continue;
};

// Select content recipients
for enr in select_content_recipients::<TMetric>(content_id, interested_enrs) {
enrs_and_content.entry(enr).or_default().push(content_id);
}
}
enrs_and_content
}

const NUM_CLOSEST_PEERS: usize = 4;
const NUM_FARTHER_PEERS: usize = 4;

/// Selects put content recipients from a vec of interested peers.
///
/// If number of peers is at most `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS`, then all are returned.
/// Otherwise, peers are sorted by distance from `content_id` and then:
///
/// 1. Closest `NUM_CLOSEST_PEERS` ENRs are selected
/// 2. Random `NUM_FARTHER_PEERS` ENRs are selected from the rest
fn select_content_recipients<TMetric: Metric>(
content_id: &[u8; 32],
mut peers: Vec<Enr>,
) -> Vec<Enr> {
// Check if we need to do any selection
if peers.len() <= NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS {
return peers;
}

// Sort peers by distance
peers.sort_by_cached_key(|peer| TMetric::distance(content_id, &peer.node_id().raw()));

// Split of at NUM_CLOSEST_PEERS
let farther_peers = peers.split_off(NUM_CLOSEST_PEERS);

// Select random NUM_FARTHER_PEERS
peers.extend(
farther_peers
.into_iter()
.choose_multiple(&mut rng(), NUM_FARTHER_PEERS),
);

peers
}

#[cfg(test)]
mod tests {
use std::iter;

use discv5::enr::NodeId;
use ethportal_api::{
types::{
distance::{Distance, XorMetric},
enr::generate_random_remote_enr,
},
IdentityContentKey,
};
use rand::random;
use rstest::rstest;

use super::*;

#[test]
fn empty() {
let kbuckets = SharedKBucketsTable::new_for_tests(NodeId::random());

for _ in 0..NUM_CLOSEST_PEERS {
let (_, peer) = generate_random_remote_enr();
let _ = kbuckets.insert_or_update_disconnected(&peer, Distance::MAX);
}

assert!(gossip_recipients::<IdentityContentKey, XorMetric>(vec![], &kbuckets).is_empty());
}

mod select_content_recipients {
use std::ops::RangeBounds;

use itertools::chain;

use super::*;

fn create_peers_with_distance(
count: usize,
content_id: &[u8; 32],
log2_distances: impl RangeBounds<usize>,
) -> Vec<Enr> {
iter::repeat_with(|| generate_random_remote_enr().1)
.filter(|peer| {
log2_distances.contains(
&XorMetric::distance(content_id, &peer.node_id().raw())
.log2()
.unwrap(),
)
})
.take(count)
.collect()
}

#[rstest]
#[case(0, 0)]
#[case(NUM_CLOSEST_PEERS - 1, NUM_CLOSEST_PEERS - 1)]
#[case(NUM_CLOSEST_PEERS, NUM_CLOSEST_PEERS)]
#[case(NUM_CLOSEST_PEERS + 1, NUM_CLOSEST_PEERS + 1)]
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS + 1, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
#[case(256, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)]
fn count(#[case] peers_count: usize, #[case] expected_content_recipients_count: usize) {
let content_id = random();
let peers = create_peers_with_distance(peers_count, &content_id, ..);
assert_eq!(
select_content_recipients::<XorMetric>(&content_id, peers).len(),
expected_content_recipients_count
);
}

#[test]
fn closest() {
let content_id = random();

const CLOSE_PEER_LOG2_DISTANCE: usize = 253;

// Create NUM_CLOSEST_PEERS peers with log2 distance less than CLOSE_PEER_LOG2_DISTANCE
let close_peers = create_peers_with_distance(
NUM_CLOSEST_PEERS,
&content_id,
..CLOSE_PEER_LOG2_DISTANCE,
);

// Create 1000 peers with log2 distance at least CLOSE_PEER_LOG2_DISTANCE
let far_peers =
create_peers_with_distance(1000, &content_id, CLOSE_PEER_LOG2_DISTANCE..);

let recipients = select_content_recipients::<XorMetric>(
&content_id,
chain!(close_peers.clone(), far_peers).collect(),
);

// Verify that `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS` peers selected
assert_eq!(recipients.len(), NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS);

// Verify that all close peers are selected
for close_peer in close_peers {
assert!(recipients.contains(&close_peer));
}
}

#[test]
fn closest_far_peer_is_not_selected() {
let content_id = random();
const TARGET_PEER_DISTANCE: usize = 253;

// Create NUM_CLOSEST_PEERS peers with log2 distance less than TARGET_PEER_DISTANCE
let close_peers =
create_peers_with_distance(NUM_CLOSEST_PEERS, &content_id, ..TARGET_PEER_DISTANCE);

// Create 1 peer with log2 distance exactly TARGET_PEER_DISTANCE
let target_peer = create_peers_with_distance(
1,
&content_id,
TARGET_PEER_DISTANCE..=TARGET_PEER_DISTANCE,
)
.remove(0);

// Create 1000 peers with log2 distance more than TARGET_PEER_DISTANCE
let far_peers =
create_peers_with_distance(1000, &content_id, TARGET_PEER_DISTANCE + 1..);

let all_peers =
chain!(close_peers, [target_peer.clone()], far_peers).collect::<Vec<_>>();

// We want to test that "target_peer" isn't selected.
// However, because far peers are selected randomly, there is a small chance of being
// selected anyway (0.4%). But we will just repeat the test up to 10 times, as it is
// extremely unlikely to be selected all 10 times.
let target_peer_is_not_selected = || {
!select_content_recipients::<XorMetric>(&content_id, all_peers.clone())
.contains(&target_peer)
};
assert!((0..10).any(|_| target_peer_is_not_selected()));
}
}
}
Loading
Loading