Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions examples/transfer-collection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//! Example that shows how to create a collection, and transfer it to another
//! node. It also shows patterns for defining a "Node" struct in higher-level
//! code that abstracts over these operations with an API that feels closer to
//! what an application would use.
//!
//! Run the entire example in one command:
//! $ cargo run --example transfer-collection
use std::collections::HashMap;

use anyhow::{Context, Result};
use iroh::{protocol::Router, Endpoint, NodeAddr, Watcher};
use iroh_blobs::{
api::{downloader::Shuffled, Store, TempTag},
format::collection::Collection,
store::mem::MemStore,
BlobsProtocol, Hash, HashAndFormat,
};

/// Node is something you'd define in your application. It can contain whatever
/// shared state you'd want to couple with network operations.
struct Node {
store: Store,
/// Router with the blobs protocol registered, to accept blobs requests.
/// We can always get the endpoint with router.endpoint()
router: Router,
}

impl Node {
async fn new() -> Result<Self> {
let endpoint = Endpoint::builder().bind().await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm skipping .discovery_n0() here and using add_node_addr directly on the endpoint in the Node::get_collection method below. Largely to take a highligher to the add_node_addr discussion we've been having.

@rklaehn: the store has a connection pool, but I can't seem to get at it? Seems the connection pool relies on me being able to plumb addresses into the endpoint, is that right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The store doesn't have anything, not even an endpoint. The downloader has a connection pool.


let store = MemStore::new();

// this BlobsProtocol accepts connections from other nodes and serves blobs from the store
// we pass None to skip subscribing to request events
let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
// Routers group one or more protocols together to accept connections from other nodes,
// here we're only using one, but could add more in a real world use case as needed
let router = Router::builder(endpoint)
.accept(iroh_blobs::ALPN, blobs)
.spawn();

Ok(Self {
store: store.into(),
router,
})
}

// get address of this node. Has the side effect of waiting for the node
// to be online & ready to accept connections
async fn node_addr(&self) -> Result<NodeAddr> {
let addr = self.router.endpoint().node_addr().initialized().await;
Ok(addr)
}

async fn list_hashes(&self) -> Result<Vec<Hash>> {
self.store
.blobs()
.list()
.hashes()
.await
.context("Failed to list hashes")
}

/// creates a collection from a given set of named blobs, adds it to the local store
/// and returns the hash of the collection.
async fn create_collection(&self, named_blobs: Vec<(&str, Vec<u8>)>) -> Result<Hash> {
let mut collection_items: HashMap<&str, TempTag> = HashMap::new();

let tx = self.store.batch().await?;
for (name, data) in named_blobs {
let tmp_tag = tx.add_slice(data).await?;
collection_items.insert(name, tmp_tag);
}

let collection_items = collection_items
.iter()
.map(|(name, tag)| {
let hash = tag.hash().clone();
(name.to_string(), hash)
})
.collect::<Vec<_>>();

let collection = Collection::from_iter(collection_items);

let collection = collection.store(&self.store).await?;
let hash = collection.hash().clone();
self.store.tags().create(collection).await?;
Ok(hash)
}

/// retrive an entire collection from a given hash and provider
async fn get_collection(&self, hash: Hash, provider: NodeAddr) -> Result<()> {
self.router.endpoint().add_node_addr(provider.clone())?;
let req = HashAndFormat::hash_seq(hash);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This magical incantation to feed to download as a req param was super tough to figure out. Maybe we just handle this with examples.

let addrs = Shuffled::new(vec![provider.node_id]);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised while working with store.downloader().download()that theContentDiscoverytrait worked inNodeIdand notNodeAddr`, is there a reason why?

Second: would it be possible to set it up so I can pass a Vec<NodeId> / Vec<NodeAddr>, and call .into() on it to get a Shuffled content discovery? That might help remove the concept of Content Discovery entirely from what we'd pass to download.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised while working with store.downloader().download()that theContentDiscoverytrait worked inNodeIdand notNodeAddr`, is there a reason why?

I think it is nicer to let the downloader deal only with node ids and let the node discovery figure out how to dial the nodes.

I guess you could use NodeAddr in the public API, and then just call endpoint.add_node_addr before handing it over to the lower levels. But I am very much convinced that the lower levels should not do NodeAddrs.

Any large scale content discovery mechanism will only ever work with node ids, since node ids are longer time stable than addrs.

Second: would it be possible to set it up so I can pass a Vec / Vec, and call .into() on it to get a Shuffled content discovery? That might help remove the concept of Content Discovery entirely from what we'd pass to download.

I don't quite get this. You don't always want shuffled. And since ContentDiscovery itself is an infinite stream of node ids you can't make this a combinator - no way to shuffle an infinite stream.

I guess we could maybe make a builder for the common case where you have a finite set of node ids, where you would have shuffled as a combinator.

self.store
.downloader(self.router.endpoint())
Comment on lines +97 to +98
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm super confused about why I have to re-pass the endpoint to the store here to create a downloader. I guess I'm supposed to make a downloader & store it on the Node struct I've defined here?

It generally feels like a higher-level API is missing that just falls back to a DefaultDownloader

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The store can be used without an endpoint. There are many very valid use cases for this, e.g. you have a local store and want to do something without ever listening on the network.

I guess you could do a thing which combines a store and an endpoint, but then that is what the downloader is. Maybe there needs to be a thing that combines a store, endpoint and downloader.

I want to keep the downloader as a separate thing though since it has some internal state and there might be situations where you want to create/destroy them or even have multiple independent of the store.

.download(req, addrs)
.await?;
Ok(())
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let send_node = Node::new().await?;
let send_node_addr = send_node.node_addr().await?;
let hash = send_node
.create_collection(vec![
("a.txt", b"this is file a".into()),
("b.txt", b"this is file b".into()),
("c.txt", b"this is file c".into()),
])
.await?;

let recv_node = Node::new().await?;
recv_node.get_collection(hash, send_node_addr).await?;

let send_hashes = send_node.list_hashes().await?;
let recv_hashes = recv_node.list_hashes().await?;
assert_eq!(send_hashes.len(), recv_hashes.len());

println!("Transfer complete!");
Ok(())
}
2 changes: 1 addition & 1 deletion src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ pub struct AddPathOptions {
/// stream directly can be inconvenient, so this struct provides some convenience
/// methods to work with the result.
///
/// It also implements [`IntoFuture`], so you can await it to get the [`TempTag`] that
/// It also implements [`IntoFuture`], so you can await it to get the [`TagInfo`] that
/// contains the hash of the added content and also protects the content.
///
/// If you want access to the stream, you can use the [`AddProgress::stream`] method.
Expand Down
Loading