Skip to content

Commit

Permalink
Return closest nodes for topics
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Jun 16, 2022
1 parent 9d641c3 commit 93cca33
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct Discv5Config {
pub topic_radius: u64,

pub topic_query_timeout: Duration,
pub topics_num_results: usize,
pub topic_query_num_results: usize,

/// A custom executor which can spawn the discv5 tasks. This must be a tokio runtime, with
/// timing support. By default, the executor that created the discv5 struct will be used.
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Default for Discv5Config {
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
topic_radius: 256,
topic_query_timeout: Duration::from_secs(60),
topics_num_results: 16,
topic_query_num_results: 16,
ip_mode: IpMode::default(),
executor: None,
}
Expand Down
4 changes: 4 additions & 0 deletions src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,10 @@ impl Handler {
// The request is reinserted for either another nodes response, a ticket or a
// register confirmation response that may come, otherwise the request times out.
RequestBody::RegisterTopic { .. } => remaining_responses >= &mut 0,
RequestBody::TopicQuery { .. } => {
// remove from some map of NODES and AD NODES
remaining_responses >= &mut 0
}
_ => remaining_responses > &mut 0,
};
if reinsert {
Expand Down
76 changes: 54 additions & 22 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl Service {
ticket_pools: TicketPools::default(),
active_topic_queries: ActiveTopicQueries::new(
config.topic_query_timeout,
config.topics_num_results,
config.topic_query_num_results,
),
exit,
config: config.clone(),
Expand Down Expand Up @@ -666,12 +666,6 @@ impl Service {
if callback.send(found_enrs).is_err() {
warn!("Callback dropped for query {}. Results dropped", *id);
}
} else {
let QueryType::FindNode(node_id) = result.target.query_type;
let topic = TopicHash::from_raw(node_id.raw());
if self.topics.contains_key(&topic){
// add to topic kbuckets?
}
}
}
}
Expand Down Expand Up @@ -978,18 +972,12 @@ impl Service {
if enr.node_id() == node_address.node_id
&& enr.udp4_socket().map(SocketAddr::V4) == Some(node_address.socket_addr)
{
let local_key: kbucket::Key<NodeId> = self.local_enr.read().node_id().into();
let topic_key: kbucket::Key<NodeId> = NodeId::new(&topic.as_bytes()).into();
let distance_to_topic = local_key.log2_distance(&topic_key);

let mut closest_peers: Vec<Enr> = Vec::new();
if let Some(distance) = distance_to_topic {
self.kbuckets
.write()
.nodes_by_distances(&[distance], self.config.max_nodes_response)
.iter()
.for_each(|entry| closest_peers.push(entry.node.value.clone()));
}
self.send_topic_nodes_response(
topic,
node_address.clone(),
id.clone(),
"REGTOPIC".into(),
);

let wait_time = self
.ads
Expand All @@ -1006,14 +994,12 @@ impl Service {

// According to spec, a ticket should always be issued upon receiving a REGTOPIC request.
self.send_ticket_response(
node_address.clone(),
node_address,
id.clone(),
new_ticket.clone(),
wait_time,
);

self.send_nodes_response(closest_peers, node_address, id.clone(), "REGTOPIC");

// If the wait time has expired, the TICKET is added to the matching ticket pool. If this is
// the first REGTOPIC request from a given node for a given topic, the newly created ticket
// is used to add the registration attempt to to the ticket pool.
Expand Down Expand Up @@ -1075,6 +1061,12 @@ impl Service {
}
}
RequestBody::TopicQuery { topic } => {
self.send_topic_nodes_response(
topic,
node_address.clone(),
id.clone(),
"REGTOPIC".into(),
);
self.send_topic_query_response(node_address, id, topic);
}
}
Expand Down Expand Up @@ -1648,6 +1640,46 @@ impl Service {
self.send_nodes_response(nodes_to_send, node_address, rpc_id, "TOPICQUERY");
}

fn send_topic_nodes_response(
&mut self,
topic: TopicHash,
node_address: NodeAddress,
id: RequestId,
req_type: String,
) {
let local_key: kbucket::Key<NodeId> = self.local_enr.read().node_id().into();
let topic_key: kbucket::Key<NodeId> = NodeId::new(&topic.as_bytes()).into();
let distance_to_topic = local_key.log2_distance(&topic_key);

let mut closest_peers: Vec<Enr> = Vec::new();
let closest_peers_length = closest_peers.len();
if let Some(distance) = distance_to_topic {
self.kbuckets
.write()
.nodes_by_distances(&[distance], self.config.max_nodes_response)
.iter()
.for_each(|entry| closest_peers.push(entry.node.value.clone()));

if closest_peers_length < self.config.max_nodes_response {
for entry in self
.kbuckets
.write()
.nodes_by_distances(
&[distance - 1, distance + 1],
self.config.max_nodes_response - closest_peers_length,
)
.iter()
{
if closest_peers_length > self.config.max_nodes_response {
break;
}
closest_peers.push(entry.node.value.clone());
}
}
}
self.send_nodes_response(closest_peers, node_address, id, &req_type);
}

/// Sends a NODES response, given a list of found ENR's. This function splits the nodes up
/// into multiple responses to ensure the response stays below the maximum packet size.
fn send_find_nodes_response(
Expand Down
2 changes: 1 addition & 1 deletion src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn build_service(
ticket_pools: TicketPools::default(),
active_topic_queries: ActiveTopicQueries::new(
config.topic_query_timeout,
config.topics_num_results,
config.topic_query_num_results,
),
exit,
config,
Expand Down

0 comments on commit 93cca33

Please sign in to comment.