Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Expose all peer records of GET_VALUE query #96

Merged
merged 10 commits into from
May 16, 2024
16 changes: 14 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,

/// Found record.
record: PeerRecord,
/// Found records.
records: RecordsType,
},

/// `PUT_VALUE` query succeeded.
Expand All @@ -173,6 +173,18 @@ pub enum KademliaEvent {
},
}

/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network.
Network(Vec<PeerRecord>),
}

/// Handle for communicating with the Kademlia protocol.
pub struct KademliaHandle {
/// TX channel for sending commands to `Kademlia`.
Expand Down
33 changes: 28 additions & 5 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};

pub use self::handle::RecordsType;

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";

Expand Down Expand Up @@ -636,11 +638,32 @@ impl Kademlia {

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, record } => {
self.store.put(record.record.clone());
QueryAction::GetRecordQueryDone { query_id, records } => {
// Considering this gives a view of all peers and their records, some peers may have
// outdated records. Store only the record which is backed by most
// peers.
let rec = records
.iter()
.map(|peer_record| &peer_record.record)
.fold(HashMap::new(), |mut acc, rec| {
*acc.entry(rec).or_insert(0) += 1;
acc
})
.into_iter()
.max_by_key(|(_, v)| *v)
.map(|(k, _)| k);
Copy link
Collaborator

@dmitry-markin dmitry-markin May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have tests for correctly updating the local store and also discarding the expired records. At least as a follow-up planned / TODO issue if it's too much work to implement straight away.


if let Some(record) = rec {
self.store.put(record.clone());
}

let _ =
self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await;
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::Network(records),
})
.await;
Ok(())
}
QueryAction::QueryFailed { query } => {
Expand Down Expand Up @@ -782,7 +805,7 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } })
.send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
.await;
}
(record, _) => {
Expand Down
17 changes: 9 additions & 8 deletions src/protocol/libp2p/kademlia/query/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ impl GetRecordContext {
}
}

/// Get the found record.
pub fn found_record(mut self) -> PeerRecord {
self.found_records.pop().expect("record to exist since query succeeded")
/// Get the found records.
pub fn found_records(mut self) -> Vec<PeerRecord> {
self.found_records
}

/// Register response failure for `peer`.
Expand All @@ -136,12 +136,13 @@ impl GetRecordContext {
return;
};

// TODO: validate record
if let Some(record) = record {
self.found_records.push(PeerRecord {
record,
peer: Some(peer.peer),
});
if !record.is_expired(std::time::Instant::now()) {
self.found_records.push(PeerRecord {
peer: peer.peer,
record,
});
}
}

// add the queried peer to `queried` and all new peers which haven't been
Expand Down
29 changes: 20 additions & 9 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::{
protocol::libp2p::kademlia::{
message::KademliaMessage,
query::{find_node::FindNodeContext, get_record::GetRecordContext},
record::{Key as RecordKey, PeerRecord, Record},
record::{Key as RecordKey, Record},
types::{KademliaPeer, Key},
Quorum,
PeerRecord, Quorum,
},
PeerId,
};
Expand Down Expand Up @@ -124,8 +124,8 @@ pub enum QueryAction {
/// Query ID.
query_id: QueryId,

/// Found record.
record: PeerRecord,
/// Found records.
records: Vec<PeerRecord>,
},

// TODO: remove
Expand Down Expand Up @@ -396,7 +396,7 @@ impl QueryEngine {
},
QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
query_id: context.query,
record: context.found_record(),
records: context.found_records(),
},
}
}
Expand Down Expand Up @@ -748,10 +748,21 @@ mod tests {

let peers: std::collections::HashSet<_> = peers.into_iter().map(|p| p.peer).collect();
match engine.next_action() {
Some(QueryAction::GetRecordQueryDone { record, .. }) => {
assert!(peers.contains(&record.peer.expect("Peer Id must be provided")));
assert_eq!(record.record.key, original_record.key);
assert_eq!(record.record.value, original_record.value);
Some(QueryAction::GetRecordQueryDone { records, .. }) => {
let query_peers = records
.iter()
.map(|peer_record| peer_record.peer)
.collect::<std::collections::HashSet<_>>();
assert_eq!(peers, query_peers);

let records: std::collections::HashSet<_> =
records.into_iter().map(|peer_record| peer_record.record).collect();
// One single record found across peers.
assert_eq!(records.len(), 1);
let record = records.into_iter().next().unwrap();

assert_eq!(record.key, original_record.key);
assert_eq!(record.value, original_record.value);
}
_ => panic!("invalid event received"),
}
Expand Down
11 changes: 5 additions & 6 deletions src/protocol/libp2p/kademlia/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl From<Multihash> for Key {
}

/// A record stored in the DHT.
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Record {
/// Key of the record.
pub key: Key,
Expand Down Expand Up @@ -109,13 +109,12 @@ impl Record {
}
}

/// A record either received by the given peer or retrieved from the local
/// record store.
/// A record received by the given peer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,
/// The peer from whom the record was received
pub peer: PeerId,

/// The provided record.
pub record: Record,
}