Skip to content

Commit f0a1632

Browse files
committed
Add mini node for testing
1 parent 1ca13d0 commit f0a1632

File tree

4 files changed

+225
-88
lines changed

4 files changed

+225
-88
lines changed

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub mod rpc;
4646
pub mod store;
4747
pub mod util;
4848

49+
pub mod node;
50+
4951
use bao_tree::BlockSize;
5052
pub use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
5153

src/node.rs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
//! An iroh node that just has the blobs transport
2+
use std::{path::Path, sync::Arc};
3+
4+
use iroh_net::{NodeAddr, NodeId};
5+
use quic_rpc::client::BoxedServiceConnection;
6+
use tokio_util::task::AbortOnDropHandle;
7+
8+
use crate::{
9+
provider::{CustomEventSender, EventSender},
10+
rpc::client::{blobs, tags},
11+
util::local_pool::LocalPool,
12+
};
13+
14+
type RpcClient = quic_rpc::RpcClient<
15+
crate::rpc::proto::RpcService,
16+
BoxedServiceConnection<crate::rpc::proto::RpcService>,
17+
crate::rpc::proto::RpcService,
18+
>;
19+
20+
/// An iroh node that just has the blobs transport
21+
#[derive(Debug)]
22+
pub struct Node {
23+
router: iroh_router::Router,
24+
client: RpcClient,
25+
_local_pool: LocalPool,
26+
_rpc_task: AbortOnDropHandle<()>,
27+
}
28+
29+
/// An iroh node builder
30+
#[derive(Debug)]
31+
pub struct Builder<S> {
32+
store: S,
33+
events: EventSender,
34+
}
35+
36+
impl<S: crate::store::Store> Builder<S> {
37+
/// Sets the event sender
38+
pub fn blobs_events(self, events: impl CustomEventSender) -> Self {
39+
Builder {
40+
store: self.store,
41+
events: events.into(),
42+
}
43+
}
44+
45+
/// Spawns the node
46+
pub async fn spawn(self) -> anyhow::Result<Node> {
47+
let (client, router, rpc_task, _local_pool) = setup_router(self.store, self.events).await?;
48+
Ok(Node {
49+
router,
50+
client,
51+
_rpc_task: AbortOnDropHandle::new(rpc_task),
52+
_local_pool,
53+
})
54+
}
55+
}
56+
57+
impl Node {
58+
/// Creates a new node with memory storage
59+
pub fn memory() -> Builder<crate::store::mem::Store> {
60+
Builder {
61+
store: crate::store::mem::Store::new(),
62+
events: Default::default(),
63+
}
64+
}
65+
66+
/// Creates a new node with persistent storage
67+
pub async fn persistent(
68+
path: impl AsRef<Path>,
69+
) -> anyhow::Result<Builder<crate::store::fs::Store>> {
70+
Ok(Builder {
71+
store: crate::store::fs::Store::load(path).await?,
72+
events: Default::default(),
73+
})
74+
}
75+
76+
/// Returns the node id
77+
pub fn node_id(&self) -> NodeId {
78+
self.router.endpoint().node_id()
79+
}
80+
81+
/// Returns the node address
82+
pub async fn node_addr(&self) -> anyhow::Result<NodeAddr> {
83+
self.router.endpoint().node_addr().await
84+
}
85+
86+
/// Shuts down the node
87+
pub async fn shutdown(self) -> anyhow::Result<()> {
88+
self.router.shutdown().await
89+
}
90+
91+
/// Returns an in-memory blobs client
92+
pub fn blobs(&self) -> blobs::Client {
93+
blobs::Client::new(self.client.clone())
94+
}
95+
96+
/// Returns an in-memory tags client
97+
pub fn tags(&self) -> tags::Client {
98+
tags::Client::new(self.client.clone())
99+
}
100+
}
101+
102+
async fn setup_router<S: crate::store::Store>(
103+
store: S,
104+
events: EventSender,
105+
) -> anyhow::Result<(
106+
RpcClient,
107+
iroh_router::Router,
108+
tokio::task::JoinHandle<()>,
109+
LocalPool,
110+
)> {
111+
let endpoint = iroh_net::Endpoint::builder().bind().await?;
112+
let local_pool = LocalPool::single();
113+
let mut router = iroh_router::Router::builder(endpoint.clone());
114+
115+
// Setup blobs
116+
let downloader = crate::downloader::Downloader::new(
117+
store.clone(),
118+
endpoint.clone(),
119+
local_pool.handle().clone(),
120+
);
121+
let blobs = Arc::new(crate::net_protocol::Blobs::new_with_events(
122+
store.clone(),
123+
local_pool.handle().clone(),
124+
events,
125+
downloader,
126+
endpoint.clone(),
127+
));
128+
router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone());
129+
130+
// Build the router
131+
let router = router.spawn().await?;
132+
133+
// Setup RPC
134+
let (internal_rpc, controller) =
135+
quic_rpc::transport::flume::service_connection::<crate::rpc::proto::RpcService>(32);
136+
let controller = quic_rpc::transport::boxed::Connection::new(controller);
137+
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
138+
let internal_rpc = quic_rpc::RpcServer::new(internal_rpc);
139+
140+
let rpc_server_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move {
141+
loop {
142+
let request = internal_rpc.accept().await;
143+
match request {
144+
Ok(accepting) => {
145+
let blobs = blobs.clone();
146+
tokio::task::spawn(async move {
147+
let (msg, chan) = accepting.read_first().await.unwrap();
148+
blobs.handle_rpc_request(msg, chan).await.unwrap();
149+
});
150+
}
151+
Err(err) => {
152+
tracing::warn!("rpc error: {:?}", err);
153+
}
154+
}
155+
}
156+
});
157+
158+
let client = quic_rpc::RpcClient::new(controller);
159+
160+
Ok((client, router, rpc_server_task, local_pool))
161+
}

0 commit comments

Comments
 (0)