Skip to content

feat!: add net_protocol feature #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ futures-util = "0.3.30"
testdir = "0.9.1"

[features]
default = ["fs-store", "rpc"]
default = ["fs-store", "rpc", "net_protocol"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
net_protocol = ["downloader"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["iroh-metrics/metrics"]
redb = ["dep:redb"]
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub mod format;
pub mod get;
pub mod hashseq;
pub mod metrics;
#[cfg(feature = "downloader")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "downloader")))]
#[cfg(feature = "net_protocol")]
#[cfg_attr(iroh_docsrs, doc(cfg(feature = "net_protocol")))]
pub mod net_protocol;
pub mod protocol;
pub mod provider;
Expand Down
89 changes: 45 additions & 44 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,6 @@ use crate::{
HashAndFormat, TempTag,
};

#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
pub struct BatchId(u64);

/// A request to the node to download and share the data specified by the hash.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobDownloadRequest {
/// This mandatory field contains the hash of the data to download and share.
pub hash: Hash,
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
/// well.
pub format: BlobFormat,
/// This mandatory field specifies the nodes to download the data from.
///
/// If set to more than a single node, they will all be tried. If `mode` is set to
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
/// if the concurrency limits permit.
pub nodes: Vec<NodeAddr>,
/// Optional tag to tag the data with.
pub tag: SetTagOption,
/// Whether to directly start the download or add it to the download queue.
pub mode: DownloadMode,
}

/// Set the mode for whether to directly start the download or add it to the download queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DownloadMode {
/// Start the download right away.
///
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
/// concurrency.
Direct,
/// Queue the download.
///
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
Queued,
}

#[derive(Debug)]
pub struct Blobs<S> {
rt: LocalPoolHandle,
Expand All @@ -81,7 +43,7 @@ const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download";

/// Keeps track of all the currently active batch operations of the blobs api.
#[derive(Debug, Default)]
pub struct BlobBatches {
pub(crate) struct BlobBatches {
/// Currently active batches
batches: BTreeMap<BatchId, BlobBatch>,
/// Used to generate new batch ids.
Expand Down Expand Up @@ -152,19 +114,19 @@ impl<S: crate::store::Store> Blobs<S> {
&self.store
}

pub(crate) fn rt(&self) -> LocalPoolHandle {
self.rt.clone()
pub fn rt(&self) -> &LocalPoolHandle {
&self.rt
}

pub(crate) fn endpoint(&self) -> &Endpoint {
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}

pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> {
self.batches.lock().await
}

pub async fn download(
pub(crate) async fn download(
&self,
endpoint: Endpoint,
req: BlobDownloadRequest,
Expand Down Expand Up @@ -318,3 +280,42 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
})
}
}

/// A request to the node to download and share the data specified by the hash.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobDownloadRequest {
/// This mandatory field contains the hash of the data to download and share.
pub hash: Hash,
/// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
/// well.
pub format: BlobFormat,
/// This mandatory field specifies the nodes to download the data from.
///
/// If set to more than a single node, they will all be tried. If `mode` is set to
/// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
/// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
/// if the concurrency limits permit.
pub nodes: Vec<NodeAddr>,
/// Optional tag to tag the data with.
pub tag: SetTagOption,
/// Whether to directly start the download or add it to the download queue.
pub mode: DownloadMode,
}

/// Set the mode for whether to directly start the download or add it to the download queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DownloadMode {
/// Start the download right away.
///
/// No concurrency limits or queuing will be applied. It is up to the user to manage download
/// concurrency.
Direct,
/// Queue the download.
///
/// The download queue will be processed in-order, while respecting the downloader concurrency limits.
Queued,
}

/// Newtype for a batch id
#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
pub struct BatchId(pub u64);
10 changes: 6 additions & 4 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

/// Handle a tags request
pub async fn handle_tags_request<C>(
async fn handle_tags_request<C>(
self: Arc<Self>,
msg: proto::tags::Request,
chan: RpcChannel<proto::RpcService, C>,
Expand All @@ -91,7 +91,7 @@ impl<D: crate::store::Store> Blobs<D> {
}

/// Handle a blobs request
pub async fn handle_blobs_request<C>(
async fn handle_blobs_request<C>(
self: Arc<Self>,
msg: proto::blobs::Request,
chan: RpcChannel<proto::RpcService, C>,
Expand Down Expand Up @@ -308,7 +308,8 @@ impl<D: crate::store::Store> Blobs<D> {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = async_channel::bounded(32);
let tx2 = tx.clone();
self.rt().spawn_detached(|| async move {
let rt = self.rt().clone();
rt.spawn_detached(|| async move {
if let Err(e) = self.blob_add_from_path0(msg, tx).await {
tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok();
}
Expand Down Expand Up @@ -386,7 +387,8 @@ impl<D: crate::store::Store> Blobs<D> {
fn blob_export(self: Arc<Self>, msg: ExportRequest) -> impl Stream<Item = ExportResponse> {
let (tx, rx) = async_channel::bounded(1024);
let progress = AsyncChannelProgressSender::new(tx);
self.rt().spawn_detached(move || async move {
let rt = self.rt().clone();
rt.spawn_detached(move || async move {
let res = crate::export::export(
self.store(),
msg.hash,
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ use crate::rpc::proto::blobs::{

/// Iroh blobs client.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: RpcClient<RpcService, C>,
}
Expand Down
Loading