-
Notifications
You must be signed in to change notification settings - Fork 147
feat(portalnet): handle content that doesn't depend on distance to content #1859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+612
−314
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
use std::collections::HashMap; | ||
|
||
use discv5::Enr; | ||
use ethportal_api::{ | ||
types::{distance::Metric, portal::MAX_CONTENT_KEYS_PER_OFFER}, | ||
utils::bytes::hex_encode_compact, | ||
OverlayContentKey, RawContentKey, RawContentValue, | ||
}; | ||
use itertools::{chain, Itertools}; | ||
use tracing::{debug, warn}; | ||
|
||
use crate::types::kbucket::SharedKBucketsTable; | ||
|
||
mod neighborhood; | ||
mod random; | ||
|
||
/// Selects peers and content that should be gossiped to them. | ||
/// | ||
/// Every peer will have at least 1 and at most `MAX_CONTENT_KEYS_PER_OFFER` content assigned to | ||
/// them. If more than `MAX_CONTENT_KEYS_PER_OFFER` is passed as an argument, it's possible that | ||
/// no peers will be selected to propagate that content (even if they exist). The order of content | ||
/// in the returned collection will be the same as the one that is passed (this might be important | ||
/// for some content types). | ||
/// | ||
/// This function is designed such that either all content is affected by radius (in which case we | ||
/// use "neighborhood gossip" strategy) or not (in which case we use "random gossip" strategy). | ||
/// If content contains the mix of the two types, then content is split and each gossip strategy is | ||
/// applied to respective part. This can lead to suboptimal result when we select more peers than | ||
/// it is needed. | ||
pub fn gossip_recipients<TContentKey: OverlayContentKey, TMetric: Metric>( | ||
content: Vec<(TContentKey, RawContentValue)>, | ||
kbuckets: &SharedKBucketsTable, | ||
) -> HashMap<Enr, Vec<(RawContentKey, RawContentValue)>> { | ||
// Precalculate "content_id" and "raw_content_key" and use references going forward | ||
let content = content | ||
.into_iter() | ||
.map(|(key, raw_value)| { | ||
let id = key.content_id(); | ||
let raw_key = key.to_bytes(); | ||
(id, key, raw_key, raw_value) | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
debug!( | ||
ids = ?content.iter().map(|(id, _, _, _)| hex_encode_compact(id)), | ||
"selecing gossip recipients" | ||
); | ||
|
||
// Split content id+key depending on whether they are affected by radius | ||
let (content_for_neighborhood_gossip, content_for_random_gossip) = content | ||
.iter() | ||
.map(|(id, key, _raw_key, _raw_value)| (id, key)) | ||
.partition::<Vec<_>, _>(|(_content_id, content_key)| content_key.affected_by_radius()); | ||
if !content_for_neighborhood_gossip.is_empty() && !content_for_random_gossip.is_empty() { | ||
warn!("Expected to gossip content with both neighborhood_gossip and random_gossip"); | ||
} | ||
|
||
// Combine results of "neighborhood gossip" and "random gossip". | ||
let peer_to_content_ids = chain!( | ||
neighborhood::gossip_recipients::<_, TMetric>(content_for_neighborhood_gossip, kbuckets), | ||
random::gossip_recipients(content_for_random_gossip, kbuckets) | ||
) | ||
.into_grouping_map() | ||
.reduce(|mut all_content_to_gossip, _enr, content_ids| { | ||
all_content_to_gossip.extend(content_ids); | ||
all_content_to_gossip | ||
}); | ||
|
||
// Extract raw content key/value in hash map for easier lookup. | ||
let raw_content = content | ||
.iter() | ||
.map(|(id, _key, raw_key, raw_value)| (id, (raw_key, raw_value))) | ||
.collect::<HashMap<_, _>>(); | ||
|
||
peer_to_content_ids | ||
.into_iter() | ||
.map(|(enr, content_ids)| { | ||
let raw_content_key_value = content_ids | ||
.into_iter() | ||
// Select at most `MAX_CONTENT_KEYS_PER_OFFER` | ||
.take(MAX_CONTENT_KEYS_PER_OFFER) | ||
.map(|content_id| { | ||
let (raw_content_key, raw_content_value) = raw_content[content_id]; | ||
(raw_content_key.clone(), raw_content_value.clone()) | ||
}) | ||
.collect(); | ||
(enr, raw_content_key_value) | ||
}) | ||
.collect() | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
use std::collections::HashMap; | ||
|
||
use alloy::hex::ToHexExt; | ||
use discv5::Enr; | ||
use ethportal_api::{types::distance::Metric, OverlayContentKey}; | ||
use rand::{rng, seq::IteratorRandom}; | ||
use tracing::{debug, error}; | ||
|
||
use crate::types::kbucket::SharedKBucketsTable; | ||
|
||
pub fn gossip_recipients<'c, TContentKey: OverlayContentKey, TMetric: Metric>( | ||
content: Vec<(&'c [u8; 32], &TContentKey)>, | ||
kbuckets: &SharedKBucketsTable, | ||
) -> HashMap<Enr, Vec<&'c [u8; 32]>> { | ||
if content.is_empty() { | ||
return HashMap::new(); | ||
} | ||
|
||
let content_ids = content | ||
.iter() | ||
.map(|(content_id, _content_key)| *content_id) | ||
.collect::<Vec<_>>(); | ||
|
||
// Map from content_ids to interested ENRs | ||
let mut content_id_to_interested_enrs = kbuckets.batch_interested_enrs::<TMetric>(&content_ids); | ||
|
||
// Map from ENRs to content they will put content | ||
let mut enrs_and_content: HashMap<Enr, Vec<&'c [u8; 32]>> = HashMap::new(); | ||
for (content_id, content_key) in content { | ||
let interested_enrs = content_id_to_interested_enrs.remove(content_id).unwrap_or_else(|| { | ||
error!("interested_enrs should contain all content ids, even if there are no interested ENRs"); | ||
vec![] | ||
}); | ||
if interested_enrs.is_empty() { | ||
debug!( | ||
content.id = content_id.encode_hex_with_prefix(), | ||
content.key = %content_key.to_bytes(), | ||
"No peers eligible for neighborhood gossip" | ||
); | ||
continue; | ||
}; | ||
|
||
// Select content recipients | ||
for enr in select_content_recipients::<TMetric>(content_id, interested_enrs) { | ||
enrs_and_content.entry(enr).or_default().push(content_id); | ||
} | ||
} | ||
enrs_and_content | ||
} | ||
|
||
const NUM_CLOSEST_PEERS: usize = 4; | ||
const NUM_FARTHER_PEERS: usize = 4; | ||
|
||
/// Selects put content recipients from a vec of interested peers. | ||
/// | ||
/// If number of peers is at most `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS`, then all are returned. | ||
/// Otherwise, peers are sorted by distance from `content_id` and then: | ||
/// | ||
/// 1. Closest `NUM_CLOSEST_PEERS` ENRs are selected | ||
/// 2. Random `NUM_FARTHER_PEERS` ENRs are selected from the rest | ||
fn select_content_recipients<TMetric: Metric>( | ||
content_id: &[u8; 32], | ||
mut peers: Vec<Enr>, | ||
) -> Vec<Enr> { | ||
// Check if we need to do any selection | ||
if peers.len() <= NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS { | ||
return peers; | ||
} | ||
|
||
// Sort peers by distance | ||
peers.sort_by_cached_key(|peer| TMetric::distance(content_id, &peer.node_id().raw())); | ||
|
||
// Split of at NUM_CLOSEST_PEERS | ||
let farther_peers = peers.split_off(NUM_CLOSEST_PEERS); | ||
|
||
// Select random NUM_FARTHER_PEERS | ||
peers.extend( | ||
farther_peers | ||
.into_iter() | ||
.choose_multiple(&mut rng(), NUM_FARTHER_PEERS), | ||
); | ||
|
||
peers | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::iter; | ||
|
||
use discv5::enr::NodeId; | ||
use ethportal_api::{ | ||
types::{ | ||
distance::{Distance, XorMetric}, | ||
enr::generate_random_remote_enr, | ||
}, | ||
IdentityContentKey, | ||
}; | ||
use rand::random; | ||
use rstest::rstest; | ||
|
||
use super::*; | ||
|
||
#[test] | ||
fn empty() { | ||
let kbuckets = SharedKBucketsTable::new_for_tests(NodeId::random()); | ||
|
||
for _ in 0..NUM_CLOSEST_PEERS { | ||
let (_, peer) = generate_random_remote_enr(); | ||
let _ = kbuckets.insert_or_update_disconnected(&peer, Distance::MAX); | ||
} | ||
|
||
assert!(gossip_recipients::<IdentityContentKey, XorMetric>(vec![], &kbuckets).is_empty()); | ||
} | ||
|
||
mod select_content_recipients { | ||
use std::ops::RangeBounds; | ||
|
||
use itertools::chain; | ||
|
||
use super::*; | ||
|
||
fn create_peers_with_distance( | ||
count: usize, | ||
content_id: &[u8; 32], | ||
log2_distances: impl RangeBounds<usize>, | ||
) -> Vec<Enr> { | ||
iter::repeat_with(|| generate_random_remote_enr().1) | ||
.filter(|peer| { | ||
log2_distances.contains( | ||
&XorMetric::distance(content_id, &peer.node_id().raw()) | ||
.log2() | ||
.unwrap(), | ||
) | ||
}) | ||
.take(count) | ||
.collect() | ||
} | ||
|
||
#[rstest] | ||
#[case(0, 0)] | ||
#[case(NUM_CLOSEST_PEERS - 1, NUM_CLOSEST_PEERS - 1)] | ||
#[case(NUM_CLOSEST_PEERS, NUM_CLOSEST_PEERS)] | ||
#[case(NUM_CLOSEST_PEERS + 1, NUM_CLOSEST_PEERS + 1)] | ||
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)] | ||
#[case(NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS + 1, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)] | ||
#[case(256, NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS)] | ||
fn count(#[case] peers_count: usize, #[case] expected_content_recipients_count: usize) { | ||
let content_id = random(); | ||
let peers = create_peers_with_distance(peers_count, &content_id, ..); | ||
assert_eq!( | ||
select_content_recipients::<XorMetric>(&content_id, peers).len(), | ||
expected_content_recipients_count | ||
); | ||
} | ||
|
||
#[test] | ||
fn closest() { | ||
let content_id = random(); | ||
|
||
const CLOSE_PEER_LOG2_DISTANCE: usize = 253; | ||
|
||
// Create NUM_CLOSEST_PEERS peers with log2 distance less than CLOSE_PEER_LOG2_DISTANCE | ||
let close_peers = create_peers_with_distance( | ||
NUM_CLOSEST_PEERS, | ||
&content_id, | ||
..CLOSE_PEER_LOG2_DISTANCE, | ||
); | ||
|
||
// Create 1000 peers with log2 distance at least CLOSE_PEER_LOG2_DISTANCE | ||
let far_peers = | ||
create_peers_with_distance(1000, &content_id, CLOSE_PEER_LOG2_DISTANCE..); | ||
|
||
let recipients = select_content_recipients::<XorMetric>( | ||
&content_id, | ||
chain!(close_peers.clone(), far_peers).collect(), | ||
); | ||
|
||
// Verify that `NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS` peers selected | ||
assert_eq!(recipients.len(), NUM_CLOSEST_PEERS + NUM_FARTHER_PEERS); | ||
|
||
// Verify that all close peers are selected | ||
for close_peer in close_peers { | ||
assert!(recipients.contains(&close_peer)); | ||
} | ||
} | ||
|
||
#[test] | ||
fn closest_far_peer_is_not_selected() { | ||
let content_id = random(); | ||
const TARGET_PEER_DISTANCE: usize = 253; | ||
|
||
// Create NUM_CLOSEST_PEERS peers with log2 distance less than TARGET_PEER_DISTANCE | ||
let close_peers = | ||
create_peers_with_distance(NUM_CLOSEST_PEERS, &content_id, ..TARGET_PEER_DISTANCE); | ||
|
||
// Create 1 peer with log2 distance exactly TARGET_PEER_DISTANCE | ||
let target_peer = create_peers_with_distance( | ||
1, | ||
&content_id, | ||
TARGET_PEER_DISTANCE..=TARGET_PEER_DISTANCE, | ||
) | ||
.remove(0); | ||
|
||
// Create 1000 peers with log2 distance more than TARGET_PEER_DISTANCE | ||
let far_peers = | ||
create_peers_with_distance(1000, &content_id, TARGET_PEER_DISTANCE + 1..); | ||
|
||
let all_peers = | ||
chain!(close_peers, [target_peer.clone()], far_peers).collect::<Vec<_>>(); | ||
|
||
// We want to test that "target_peer" isn't selected. | ||
// However, because far peers are selected randomly, there is a small chance of being | ||
// selected anyway (0.4%). But we will just repeat the test up to 10 times, as it is | ||
// extremely unlikely to be selected all 10 times. | ||
let target_peer_is_not_selected = || { | ||
!select_content_recipients::<XorMetric>(&content_id, all_peers.clone()) | ||
.contains(&target_peer) | ||
}; | ||
assert!((0..10).any(|_| target_peer_is_not_selected())); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we put these above imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember that I was seeing them go under imports, but I can't find where. All that I found now is that there is no strict rule or even strong recommentation.
Then I checked some of the popular crates and it's not easy to find
mod.rs
files that have similar structure, but here are few that do it this way:core
:core::array
core::future
core::num
alloy-primitives::bytes
I also checked our code base and it seems that we are using both. So I'm just going to leave it as is. If we agree to use one style over the other, we should go and clean up entire codebase (but considering that rust core is doing imports first, I would prefer that one).