-
Notifications
You must be signed in to change notification settings - Fork 21
docs: add example for creating & fetching a collection #176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
8a00f47
fafba99
18c972b
fb4a89e
029ec12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| //! Example that shows how to create a collection, and transfer it to another | ||
| //! node. It also shows patterns for defining a "Node" struct in higher-level | ||
| //! code that abstracts over these operations with an API that feels closer to | ||
| //! what an application would use. | ||
| //! | ||
| //! Run the entire example in one command: | ||
| //! $ cargo run --example transfer-collection | ||
| use std::collections::HashMap; | ||
|
|
||
| use anyhow::{Context, Result}; | ||
| use iroh::{protocol::Router, Endpoint, NodeAddr, Watcher}; | ||
| use iroh_blobs::{ | ||
| api::{downloader::Shuffled, Store, TempTag}, | ||
| format::collection::Collection, | ||
| store::mem::MemStore, | ||
| BlobsProtocol, Hash, HashAndFormat, | ||
| }; | ||
|
|
||
| /// Node is something you'd define in your application. It can contain whatever | ||
| /// shared state you'd want to couple with network operations. | ||
| struct Node { | ||
| store: Store, | ||
| /// Router with the blobs protocol registered, to accept blobs requests. | ||
| /// We can always get the endpoint with router.endpoint() | ||
| router: Router, | ||
| } | ||
|
|
||
| impl Node { | ||
| async fn new() -> Result<Self> { | ||
| let endpoint = Endpoint::builder().bind().await?; | ||
|
|
||
| let store = MemStore::new(); | ||
|
|
||
| // this BlobsProtocol accepts connections from other nodes and serves blobs from the store | ||
| // we pass None to skip subscribing to request events | ||
| let blobs = BlobsProtocol::new(&store, endpoint.clone(), None); | ||
| // Routers group one or more protocols together to accept connections from other nodes, | ||
| // here we're only using one, but could add more in a real world use case as needed | ||
| let router = Router::builder(endpoint) | ||
| .accept(iroh_blobs::ALPN, blobs) | ||
| .spawn(); | ||
|
|
||
| Ok(Self { | ||
| store: store.into(), | ||
| router, | ||
| }) | ||
| } | ||
|
|
||
| // get address of this node. Has the side effect of waiting for the node | ||
| // to be online & ready to accept connections | ||
| async fn node_addr(&self) -> Result<NodeAddr> { | ||
| let addr = self.router.endpoint().node_addr().initialized().await; | ||
| Ok(addr) | ||
| } | ||
|
|
||
| async fn list_hashes(&self) -> Result<Vec<Hash>> { | ||
| self.store | ||
| .blobs() | ||
| .list() | ||
| .hashes() | ||
| .await | ||
| .context("Failed to list hashes") | ||
| } | ||
|
|
||
| /// creates a collection from a given set of named blobs, adds it to the local store | ||
| /// and returns the hash of the collection. | ||
| async fn create_collection(&self, named_blobs: Vec<(&str, Vec<u8>)>) -> Result<Hash> { | ||
| let mut collection_items: HashMap<&str, TempTag> = HashMap::new(); | ||
|
|
||
| let tx = self.store.batch().await?; | ||
| for (name, data) in named_blobs { | ||
| let tmp_tag = tx.add_slice(data).await?; | ||
rklaehn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| collection_items.insert(name, tmp_tag); | ||
| } | ||
|
|
||
| let collection_items = collection_items | ||
| .iter() | ||
| .map(|(name, tag)| { | ||
| let hash = tag.hash().clone(); | ||
| (name.to_string(), hash) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| let collection = Collection::from_iter(collection_items); | ||
|
|
||
| let collection = collection.store(&self.store).await?; | ||
| let hash = collection.hash().clone(); | ||
| self.store.tags().create(collection).await?; | ||
rklaehn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(hash) | ||
| } | ||
|
|
||
| /// retrive an entire collection from a given hash and provider | ||
| async fn get_collection(&self, hash: Hash, provider: NodeAddr) -> Result<()> { | ||
| self.router.endpoint().add_node_addr(provider.clone())?; | ||
| let req = HashAndFormat::hash_seq(hash); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This magical incantation to feed to |
||
| let addrs = Shuffled::new(vec![provider.node_id]); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was surprised while working with store.downloader().download() Second: would it be possible to set it up so I can pass a
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think it is nicer to let the downloader deal only with node ids and let the node discovery figure out how to dial the nodes. I guess you could use NodeAddr in the public API, and then just call endpoint.add_node_addr before handing it over to the lower levels. But I am very much convinced that the lower levels should not do NodeAddrs. Any large scale content discovery mechanism will only ever work with node ids, since node ids are longer time stable than addrs.
I don't quite get this. You don't always want shuffled. And since ContentDiscovery itself is an infinite stream of node ids you can't make this a combinator - no way to shuffle an infinite stream. I guess we could maybe make a builder for the common case where you have a finite set of node ids, where you would have shuffled as a combinator. |
||
| self.store | ||
| .downloader(self.router.endpoint()) | ||
|
Comment on lines
+97
to
+98
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm super confused about why I have to re-pass the endpoint to the store here to create a downloader. I guess I'm supposed to make a downloader & store it on the It generally feels like a higher-level API is missing that just falls back to a
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The store can be used without an endpoint. There are many very valid use cases for this, e.g. you have a local store and want to do something without ever listening on the network. I guess you could do a thing which combines a store and an endpoint, but then that is what the downloader is. Maybe there needs to be a thing that combines a store, endpoint and downloader. I want to keep the downloader as a separate thing though since it has some internal state and there might be situations where you want to create/destroy them or even have multiple independent of the store. |
||
| .download(req, addrs) | ||
| .await?; | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> anyhow::Result<()> { | ||
| let send_node = Node::new().await?; | ||
| let send_node_addr = send_node.node_addr().await?; | ||
| let hash = send_node | ||
| .create_collection(vec![ | ||
| ("a.txt", b"this is file a".into()), | ||
| ("b.txt", b"this is file b".into()), | ||
| ("c.txt", b"this is file c".into()), | ||
| ]) | ||
| .await?; | ||
|
|
||
| let recv_node = Node::new().await?; | ||
| recv_node.get_collection(hash, send_node_addr).await?; | ||
|
|
||
| let send_hashes = send_node.list_hashes().await?; | ||
| let recv_hashes = recv_node.list_hashes().await?; | ||
| assert_eq!(send_hashes.len(), recv_hashes.len()); | ||
|
|
||
| println!("Transfer complete!"); | ||
| Ok(()) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm skipping
.discovery_n0()here and usingadd_node_addrdirectly on the endpoint in theNode::get_collectionmethod below. Largely to take a highligher to theadd_node_addrdiscussion we've been having.@rklaehn: the store has a connection pool, but I can't seem to get at it? Seems the connection pool relies on me being able to plumb addresses into the endpoint, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The store doesn't have anything, not even an endpoint. The downloader has a connection pool.