From 81751cff37c45babc3d4483cd5f767ed59598d23 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 12 Oct 2024 11:11:14 +0300 Subject: [PATCH] Add `PieceProvider::get_from_cache()` that supports connected peers and retrieving pieces more efficiently in batches --- Cargo.lock | 1 + crates/subspace-networking/Cargo.toml | 1 + crates/subspace-networking/src/lib.rs | 1 + .../src/utils/piece_provider.rs | 607 +++++++++++++++++- crates/subspace-node/Cargo.toml | 2 +- 5 files changed, 605 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58a1628091..10818cb057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12700,6 +12700,7 @@ dependencies = [ "subspace-metrics", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "unsigned-varint 0.8.0", diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index 41b06b3c7b..d329bec891 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -44,6 +44,7 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti subspace-metrics = { version = "0.1.0", path = "../../shared/subspace-metrics" } thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } +tokio-stream = "0.1.16" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } unsigned-varint = { version = "0.8.0", features = ["futures", "asynchronous_codec"] } diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index d11cfa76b6..d99c39d189 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -15,6 +15,7 @@ //! Networking functionality of Subspace Network, primarily used for DSN (Distributed Storage //! Network). + #![feature(const_option, impl_trait_in_assoc_type, ip, try_blocks)] #![warn(missing_docs)] diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 8f568acf54..960180c2f6 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -1,18 +1,32 @@ //! Provides methods to retrieve pieces from DSN. +use crate::protocols::request_response::handlers::cached_piece_by_index::{ + CachedPieceByIndexRequest, CachedPieceByIndexResponse, PieceResult, +}; use crate::protocols::request_response::handlers::piece_by_index::{ PieceByIndexRequest, PieceByIndexResponse, }; use crate::utils::multihash::ToMultihash; -use crate::Node; +use crate::{Multihash, Node}; use async_trait::async_trait; -use futures::StreamExt; -use libp2p::kad::RecordKey; -use libp2p::PeerId; -use std::collections::HashSet; -use std::fmt; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::task::noop_waker_ref; +use futures::{stream, FutureExt, Stream, StreamExt}; +use libp2p::kad::store::RecordStore; +use libp2p::kad::{store, Behaviour as Kademlia, KBucketKey, ProviderRecord, Record, RecordKey}; +use libp2p::swarm::NetworkBehaviour; +use libp2p::{Multiaddr, PeerId}; +use parking_lot::Mutex; +use rand::prelude::*; +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; +use std::iter::Empty; use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, iter, mem}; use subspace_core_primitives::pieces::{Piece, PieceIndex}; +use tokio_stream::StreamMap; use tracing::{debug, trace, warn}; /// Validates piece against using its commitment. @@ -28,6 +42,7 @@ pub trait PieceValidator: Sync + Send { } /// Stub implementation for piece validation. +#[derive(Debug, Clone, Copy)] pub struct NoPieceValidator; #[async_trait] @@ -63,6 +78,34 @@ where } } + /// Get a pieces with provided indices from cache + pub async fn get_from_cache<'a, PieceIndices>( + &'a self, + piece_indices: PieceIndices, + ) -> impl Stream)> + 'a + where + PieceIndices: IntoIterator + 'a, + { + let (tx, mut rx) = mpsc::unbounded(); + let mut fut = Box::pin(get_from_cache_inner( + piece_indices.into_iter(), + &self.node, + &self.piece_validator, + tx, + )); + + // Drive above future and stream back any pieces that were downloaded so far + stream::poll_fn(move |cx| { + let end_result = fut.poll_unpin(cx); + + if let Ok(maybe_result) = rx.try_next() { + return Poll::Ready(maybe_result); + } + + end_result.map(|()| None) + }) + } + /// Returns piece by its index from farmer's piece cache (L2) pub async fn get_piece_from_cache(&self, piece_index: PieceIndex) -> Option { let key = RecordKey::from(piece_index.to_multihash()); @@ -327,3 +370,555 @@ where None } } + +struct DummyRecordStore; + +impl RecordStore for DummyRecordStore { + type RecordsIter<'a> + = Empty> + where + Self: 'a; + type ProvidedIter<'a> + = Empty> + where + Self: 'a; + + fn get(&self, _key: &RecordKey) -> Option> { + // Not supported + None + } + + fn put(&mut self, _record: Record) -> store::Result<()> { + // Not supported + Ok(()) + } + + fn remove(&mut self, _key: &RecordKey) { + // Not supported + } + + fn records(&self) -> Self::RecordsIter<'_> { + // Not supported + iter::empty() + } + + fn add_provider(&mut self, _record: ProviderRecord) -> store::Result<()> { + // Not supported + Ok(()) + } + + fn providers(&self, _key: &RecordKey) -> Vec { + // Not supported + Vec::new() + } + + fn provided(&self) -> Self::ProvidedIter<'_> { + // Not supported + iter::empty() + } + + fn remove_provider(&mut self, _key: &RecordKey, _provider: &PeerId) { + // Not supported + } +} + +/// Kademlia wrapper to take advantage of its internal logic of selecting closest peers +struct KademliaWrapper { + local_peer_id: PeerId, + kademlia: Kademlia, + noop_context: Context<'static>, +} + +impl KademliaWrapper { + fn new(local_peer_id: PeerId) -> Self { + Self { + local_peer_id, + kademlia: Kademlia::new(local_peer_id, DummyRecordStore), + noop_context: Context::from_waker(noop_waker_ref()), + } + } + + fn add_peer(&mut self, peer_id: &PeerId, addresses: Vec) { + for address in addresses { + self.kademlia.add_address(peer_id, address); + } + while self.kademlia.poll(&mut self.noop_context).is_ready() { + // Simply drain useless events generated by above calls + } + } + + /// Returned peers are already sorted in ascending distance order + fn closest_peers( + &mut self, + key: &KBucketKey, + ) -> impl Iterator)> + 'static { + let mut closest_peers = self + .kademlia + .find_closest(key, &self.local_peer_id) + .into_iter() + .map(|peer| { + ( + KBucketKey::from(peer.node_id).distance(key), + peer.node_id, + peer.multiaddrs, + ) + }) + .collect::>(); + + closest_peers.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + closest_peers + .into_iter() + .map(|(_distance, peer_id, addresses)| (peer_id, addresses)) + } +} + +async fn get_from_cache_inner( + piece_indices: PieceIndices, + node: &Node, + piece_validator: &PV, + results: mpsc::UnboundedSender<(PieceIndex, Option)>, +) where + PV: PieceValidator, + PieceIndices: Iterator, +{ + // Download from connected peers first + let pieces_to_download = + download_cached_pieces_from_connected_peers(piece_indices, node, piece_validator, &results) + .await; + + if pieces_to_download.is_empty() { + return; + } + + // Download from iteratively closer peers according to Kademlia rules + download_cached_pieces_from_closest_peers(pieces_to_download, node, piece_validator, &results) + .await; +} + +/// Takes pieces to download as an input, sends results with pieces that were downloaded +/// successfully and returns those that were not downloaded from connected peer with addresses of +/// potential candidates +async fn download_cached_pieces_from_connected_peers( + piece_indices: PieceIndices, + node: &Node, + piece_validator: &PV, + results: &mpsc::UnboundedSender<(PieceIndex, Option)>, +) -> HashMap>> +where + PV: PieceValidator, + PieceIndices: Iterator, +{ + // Make sure every piece index has an entry since this will be the primary container for + // tracking pieces to download going forward. + // + // At the end pieces that were not downloaded will remain with a collection of known closest + // peers for them. + let mut pieces_to_download = piece_indices + .map(|piece_index| (piece_index, HashMap::new())) + .collect::>>>(); + let mut checked_connected_peers = HashSet::new(); + + // The loop is in order to check peers that might be connected after the initial loop has + // started. + loop { + let Ok(connected_peers) = node.connected_peers().await else { + break; + }; + + let num_pieces = pieces_to_download.len(); + let step = num_pieces / connected_peers.len().min(num_pieces); + + // Dispatch initial set of requests to peers + let mut downloading_stream = connected_peers + .into_iter() + .take(num_pieces) + .enumerate() + .filter_map(|(peer_index, peer_id)| { + if !checked_connected_peers.insert(peer_id) { + return None; + } + + // Take unique first piece index for each connected peer and the rest just to check + // cached pieces up to recommended limit + let mut peer_piece_indices = pieces_to_download + .keys() + .cycle() + .skip(step * peer_index) + .take(num_pieces.min(CachedPieceByIndexRequest::RECOMMENDED_LIMIT)) + .copied() + .collect::>(); + // Pick first piece index as the piece we want to download + let piece_index = peer_piece_indices.swap_remove(0); + + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + Vec::new(), + Arc::new(peer_piece_indices), + piece_index, + HashSet::new(), + HashSet::new(), + ); + + Some((piece_index, Box::pin(fut.into_stream()))) + }) + .collect::>(); + + // Process every response and potentially schedule follow-up request to the same peer + while let Some((piece_index, result)) = downloading_stream.next().await { + let DownloadedPieceFromPeer { + peer_id, + result, + mut cached_pieces, + not_cached_pieces, + } = result; + + let Some(result) = result else { + // Downloading failed, ignore peer + continue; + }; + + match result { + PieceResult::Piece(piece) => { + // Downloaded successfully + pieces_to_download.remove(&piece_index); + + results + .unbounded_send((piece_index, Some(piece))) + .expect("This future isn't polled after receiver is dropped; qed"); + + if pieces_to_download.is_empty() { + return HashMap::new(); + } + } + PieceResult::ClosestPeers(closest_peers) => { + // Store closer peers in case piece index was not downloaded yet + if let Some(peers) = pieces_to_download.get_mut(&piece_index) { + peers.extend(Vec::from(closest_peers)); + } + } + } + + let mut maybe_piece_index_to_download_next = None; + // Clear useless entries in cached pieces and find something to download next + cached_pieces.retain(|piece_index| { + // Clear downloaded pieces + if !pieces_to_download.contains_key(piece_index) { + return false; + } + + // Try to pick a piece to download that is not being downloaded already + if maybe_piece_index_to_download_next.is_none() + && !downloading_stream.contains_key(piece_index) + { + maybe_piece_index_to_download_next.replace(*piece_index); + // We'll not need to download it after this attempt + return false; + } + + // Retain everything else + true + }); + + let piece_index_to_download_next = + if let Some(piece_index) = maybe_piece_index_to_download_next { + piece_index + } else { + // Nothing left to do with this peer + continue; + }; + + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + Vec::new(), + // Sample more random cached piece indices for connected peer, algorithm can be + // improved, but has to be something simple and this should do it for now + Arc::new( + pieces_to_download + .keys() + // Do a bit of work to filter-out piece indices we already know remote peer + // has or doesn't to decrease burden on them + .filter_map(|piece_index| { + if piece_index == &piece_index_to_download_next + || cached_pieces.contains(piece_index) + || not_cached_pieces.contains(piece_index) + { + None + } else { + Some(*piece_index) + } + }) + .choose_multiple( + &mut thread_rng(), + CachedPieceByIndexRequest::RECOMMENDED_LIMIT, + ), + ), + piece_index_to_download_next, + cached_pieces, + not_cached_pieces, + ); + downloading_stream.insert(piece_index, Box::pin(fut.into_stream())); + } + + if pieces_to_download.len() == num_pieces { + // Nothing was downloaded, we're done here + break; + } + } + + pieces_to_download +} + +/// Takes pieces to download with potential peer candidates as an input, sends results with pieces +/// that were downloaded successfully and returns those that were not downloaded +async fn download_cached_pieces_from_closest_peers( + maybe_pieces_to_download: HashMap>>, + node: &Node, + piece_validator: &PV, + results: &mpsc::UnboundedSender<(PieceIndex, Option)>, +) where + PV: PieceValidator, +{ + let kademlia = &Mutex::new(KademliaWrapper::new(node.id())); + // Collection of pieces to download and already connected peers that claim to have them + let connected_peers_with_piece = &Mutex::new( + maybe_pieces_to_download + .keys() + .map(|&piece_index| (piece_index, HashSet::::new())) + .collect::>(), + ); + + let mut downloaded_pieces = maybe_pieces_to_download + .into_iter() + .map(|(piece_index, collected_peers)| async move { + let key = piece_index.to_multihash(); + let kbucket_key = KBucketKey::from(key); + let mut checked_closest_peers = HashSet::::new(); + + { + let local_closest_peers = node + .get_closest_local_peers(key, None) + .await + .unwrap_or_default(); + let mut kademlia = kademlia.lock(); + + for (peer_id, addresses) in collected_peers { + kademlia.add_peer(&peer_id, addresses); + } + for (peer_id, addresses) in local_closest_peers { + kademlia.add_peer(&peer_id, addresses); + } + } + + loop { + // Collect pieces that still need to be downloaded and connected peers that claim to + // have them + let (pieces_to_download, connected_peers) = { + let mut connected_peers_with_piece = connected_peers_with_piece.lock(); + + ( + Arc::new( + connected_peers_with_piece + .keys() + .filter(|&candidate| candidate != &piece_index) + .take(CachedPieceByIndexRequest::RECOMMENDED_LIMIT) + .copied() + .collect::>(), + ), + connected_peers_with_piece + .get_mut(&piece_index) + .map(mem::take) + .unwrap_or_default(), + ) + }; + + // Check connected peers that claim to have the piece index first + for peer_id in connected_peers { + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + Vec::new(), + Arc::default(), + piece_index, + HashSet::new(), + HashSet::new(), + ); + + match fut.await.result { + Some(PieceResult::Piece(piece)) => { + return (piece_index, Some(piece)); + } + Some(PieceResult::ClosestPeers(closest_peers)) => { + let mut kademlia = kademlia.lock(); + + // Store additional closest peers reported by the peer + for (peer_id, addresses) in Vec::from(closest_peers) { + kademlia.add_peer(&peer_id, addresses); + } + } + None => { + checked_closest_peers.insert(peer_id); + } + } + } + + // Find the closest peers that were not queried yet + let closest_peers_to_check = kademlia.lock().closest_peers(&kbucket_key); + let closest_peers_to_check = closest_peers_to_check + .filter(|(peer_id, _addresses)| checked_closest_peers.insert(*peer_id)) + .collect::>(); + + if closest_peers_to_check.is_empty() { + // No new closest peers found, nothing left to do here + break; + } + + for (peer_id, addresses) in closest_peers_to_check { + let fut = download_cached_piece_from_peer( + node, + piece_validator, + peer_id, + addresses, + Arc::clone(&pieces_to_download), + piece_index, + HashSet::new(), + HashSet::new(), + ); + + let DownloadedPieceFromPeer { + peer_id: _, + result, + cached_pieces, + not_cached_pieces: _, + } = fut.await; + + if !cached_pieces.is_empty() { + let mut connected_peers_with_piece = connected_peers_with_piece.lock(); + + // Remember that this peer has some pieces that need to be downloaded here + for cached_piece_index in cached_pieces { + if let Some(peers) = + connected_peers_with_piece.get_mut(&cached_piece_index) + { + peers.insert(peer_id); + } + } + } + + match result { + Some(PieceResult::Piece(piece)) => { + return (piece_index, Some(piece)); + } + Some(PieceResult::ClosestPeers(closest_peers)) => { + let mut kademlia = kademlia.lock(); + + // Store additional closest peers + for (peer_id, addresses) in Vec::from(closest_peers) { + kademlia.add_peer(&peer_id, addresses); + } + } + None => { + checked_closest_peers.insert(peer_id); + } + } + } + } + + (piece_index, None) + }) + .collect::>(); + + while let Some((piece_index, maybe_piece)) = downloaded_pieces.next().await { + connected_peers_with_piece.lock().remove(&piece_index); + + results + .unbounded_send((piece_index, maybe_piece)) + .expect("This future isn't polled after receiver is dropped; qed"); + } +} + +struct DownloadedPieceFromPeer { + peer_id: PeerId, + result: Option, + cached_pieces: HashSet, + not_cached_pieces: HashSet, +} + +#[allow(clippy::too_many_arguments)] +async fn download_cached_piece_from_peer( + node: &Node, + piece_validator: &PV, + peer_id: PeerId, + addresses: Vec, + peer_piece_indices: Arc>, + piece_index: PieceIndex, + mut cached_pieces: HashSet, + mut not_cached_pieces: HashSet, +) -> DownloadedPieceFromPeer +where + PV: PieceValidator, +{ + let result = match node + .send_generic_request( + peer_id, + addresses, + CachedPieceByIndexRequest { + piece_index, + cached_pieces: peer_piece_indices, + }, + ) + .await + { + Ok(response) => { + let CachedPieceByIndexResponse { + result, + cached_pieces, + } = response; + + match result { + PieceResult::Piece(piece) => piece_validator + .validate_piece(peer_id, piece_index, piece) + .await + .map(|piece| CachedPieceByIndexResponse { + result: PieceResult::Piece(piece), + cached_pieces, + }), + PieceResult::ClosestPeers(closest_peers) => Some(CachedPieceByIndexResponse { + result: PieceResult::ClosestPeers(closest_peers), + cached_pieces, + }), + } + } + Err(error) => { + debug!(%error, %peer_id, %piece_index, "Failed to download cached piece from peer"); + + None + } + }; + + match result { + Some(result) => DownloadedPieceFromPeer { + peer_id, + result: Some(result.result), + cached_pieces: { + cached_pieces.extend(result.cached_pieces); + cached_pieces + }, + not_cached_pieces, + }, + None => { + not_cached_pieces.insert(piece_index); + + DownloadedPieceFromPeer { + peer_id, + result: None, + cached_pieces, + not_cached_pieces, + } + } + } +} diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 0248521aaf..c6d223ee88 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -81,7 +81,7 @@ supports-color = "3.0.1" tempfile = "3.13.0" thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["macros"] } -tokio-stream = { version = "0.1.16" } +tokio-stream = "0.1.16" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }