Skip to content

Commit 166d779

Browse files
Merge bdc9e23 into 4c1446f
2 parents 4c1446f + bdc9e23 commit 166d779

File tree

8 files changed

+86
-194
lines changed

8 files changed

+86
-194
lines changed

Cargo.lock

Lines changed: 56 additions & 89 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ futures-util = "0.3.30"
9595
testdir = "0.9.1"
9696

9797
[features]
98-
default = ["fs-store", "net_protocol"]
98+
default = ["fs-store", "net_protocol", "rpc"]
9999
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
100100
net_protocol = ["downloader", "dep:futures-util"]
101101
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
@@ -184,6 +184,5 @@ panic = 'abort'
184184
incremental = false
185185

186186
[patch.crates-io]
187-
# iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
188-
# iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
189-
# iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
187+
iroh-base = { git = "https://github.com/n0-computer/iroh" }
188+
iroh = { git = "https://github.com/n0-computer/iroh" }

deny.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ ignore = [
3838

3939
[sources]
4040
allow-git = [
41-
# "https://github.com/n0-computer/iroh.git",
41+
"https://github.com/n0-computer/iroh.git",
4242
]

examples/custom-protocol.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ impl ProtocolHandler for BlobSearch {
144144
///
145145
/// The returned future runs on a newly spawned tokio task, so it can run as long as
146146
/// the connection lasts.
147-
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
147+
fn accept(&self, connecting: Connecting) -> BoxedFuture<Result<()>> {
148+
let this = self.clone();
148149
// We have to return a boxed future from the handler.
149150
Box::pin(async move {
150151
// Wait for the connection to be fully established.
@@ -162,7 +163,7 @@ impl ProtocolHandler for BlobSearch {
162163

163164
// Now, we can perform the actual query on our local database.
164165
let query = String::from_utf8(query_bytes)?;
165-
let hashes = self.query_local(&query);
166+
let hashes = this.query_local(&query);
166167

167168
// We want to return a list of hashes. We do the simplest thing possible, and just send
168169
// one hash after the other. Because the hashes have a fixed size of 32 bytes, this is

src/net_protocol.rs

Lines changed: 14 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ impl Default for GcState {
4747
}
4848
}
4949

50-
#[derive(Debug)]
50+
#[derive(Debug, Clone)]
5151
pub struct Blobs<S> {
5252
rt: LocalPoolHandle,
5353
pub(crate) store: S,
5454
events: EventSender,
5555
downloader: Downloader,
5656
#[cfg(feature = "rpc")]
57-
batches: tokio::sync::Mutex<BlobBatches>,
57+
batches: Arc<tokio::sync::Mutex<BlobBatches>>,
5858
endpoint: Endpoint,
5959
gc_state: Arc<std::sync::Mutex<GcState>>,
6060
#[cfg(feature = "rpc")]
@@ -131,15 +131,15 @@ impl<S: crate::store::Store> Builder<S> {
131131

132132
/// Build the Blobs protocol handler.
133133
/// You need to provide a local pool handle and an endpoint.
134-
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc<Blobs<S>> {
134+
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
135135
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
136-
Arc::new(Blobs::new(
136+
Blobs::new(
137137
self.store,
138138
rt.clone(),
139139
self.events.unwrap_or_default(),
140140
downloader,
141141
endpoint.clone(),
142-
))
142+
)
143143
}
144144
}
145145

@@ -391,82 +391,26 @@ impl<S: crate::store::Store> Blobs<S> {
391391
}
392392
}
393393

394-
// trait BlobsInner: Debug + Send + Sync + 'static {
395-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
396-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
397-
// fn client(self: Arc<Self>) -> MemClient;
398-
// fn local_pool_handle(&self) -> &LocalPoolHandle;
399-
// fn downloader(&self) -> &Downloader;
400-
// }
401-
402-
// #[derive(Debug)]
403-
// struct Blobs2 {
404-
// inner: Arc<dyn BlobsInner>,
405-
// }
406-
407-
// impl Blobs2 {
408-
// fn client(&self) -> MemClient {
409-
// self.inner.clone().client()
410-
// }
411-
412-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
413-
// self.inner.local_pool_handle()
414-
// }
415-
416-
// fn downloader(&self) -> &Downloader {
417-
// self.inner.downloader()
418-
// }
419-
// }
420-
421-
// impl<S: crate::store::Store> BlobsInner for Blobs<S> {
422-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
423-
// ProtocolHandler::shutdown(self)
424-
// }
425-
426-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
427-
// ProtocolHandler::accept(self, conn)
428-
// }
429-
430-
// fn client(self: Arc<Self>) -> MemClient {
431-
// Blobs::client(self)
432-
// }
433-
434-
// fn local_pool_handle(&self) -> &LocalPoolHandle {
435-
// self.rt()
436-
// }
437-
438-
// fn downloader(&self) -> &Downloader {
439-
// self.downloader()
440-
// }
441-
// }
442-
443-
// impl ProtocolHandler for Blobs2 {
444-
// fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
445-
// self.inner.clone().accept(conn)
446-
// }
447-
448-
// fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
449-
// self.inner.clone().shutdown()
450-
// }
451-
// }
452-
453394
impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
454-
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
395+
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
396+
let this = self.clone();
397+
455398
Box::pin(async move {
456399
crate::provider::handle_connection(
457400
conn.await?,
458-
self.store.clone(),
459-
self.events.clone(),
460-
self.rt.clone(),
401+
this.store.clone(),
402+
this.events.clone(),
403+
this.rt.clone(),
461404
)
462405
.await;
463406
Ok(())
464407
})
465408
}
466409

467-
fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
410+
fn shutdown(&self) -> BoxedFuture<()> {
411+
let this = self.clone();
468412
Box::pin(async move {
469-
self.store.shutdown().await;
413+
this.store.shutdown().await;
470414
})
471415
}
472416
}

src/rpc.rs

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::{
44
io,
5-
ops::Deref,
65
sync::{Arc, Mutex},
76
};
87

@@ -63,48 +62,31 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2;
6362

6463
impl<D: crate::store::Store> Blobs<D> {
6564
/// Get a client for the blobs protocol
66-
pub fn client(self: Arc<Self>) -> blobs::MemClient {
65+
pub fn client(&self) -> blobs::MemClient {
6766
let client = self
6867
.rpc_handler
69-
.get_or_init(|| RpcHandler::new(&self))
68+
.get_or_init(|| RpcHandler::new(self))
7069
.client
7170
.clone();
7271
blobs::Client::new(client)
7372
}
7473

7574
/// Handle an RPC request
7675
pub async fn handle_rpc_request<C>(
77-
self: Arc<Self>,
76+
self,
7877
msg: Request,
7978
chan: RpcChannel<RpcService, C>,
8079
) -> std::result::Result<(), RpcServerError<C>>
8180
where
8281
C: ChannelTypes<RpcService>,
8382
{
8483
use Request::*;
85-
let handler = Handler(self);
84+
let handler = self;
8685
match msg {
8786
Blobs(msg) => handler.handle_blobs_request(msg, chan).await,
8887
Tags(msg) => handler.handle_tags_request(msg, chan).await,
8988
}
9089
}
91-
}
92-
93-
#[derive(Clone)]
94-
struct Handler<S>(Arc<Blobs<S>>);
95-
96-
impl<S> Deref for Handler<S> {
97-
type Target = Blobs<S>;
98-
99-
fn deref(&self) -> &Self::Target {
100-
&self.0
101-
}
102-
}
103-
104-
impl<D: crate::store::Store> Handler<D> {
105-
fn store(&self) -> &D {
106-
&self.0.store
107-
}
10890

10991
/// Handle a tags request
11092
pub async fn handle_tags_request<C>(
@@ -903,7 +885,7 @@ pub(crate) struct RpcHandler {
903885
}
904886

905887
impl RpcHandler {
906-
fn new<D: crate::store::Store>(blobs: &Arc<Blobs<D>>) -> Self {
888+
fn new<D: crate::store::Store>(blobs: &Blobs<D>) -> Self {
907889
let blobs = blobs.clone();
908890
let (listener, connector) = quic_rpc::transport::flume::channel(1);
909891
let listener = RpcServer::new(listener);

src/rpc/client/blobs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ mod tests {
10031003

10041004
mod node {
10051005
//! An iroh node that just has the blobs transport
1006-
use std::{path::Path, sync::Arc};
1006+
use std::path::Path;
10071007

10081008
use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId};
10091009
use tokio_util::task::AbortOnDropHandle;
@@ -1068,13 +1068,13 @@ mod tests {
10681068
// Setup blobs
10691069
let downloader =
10701070
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
1071-
let blobs = Arc::new(Blobs::new(
1071+
let blobs = Blobs::new(
10721072
store.clone(),
10731073
local_pool.handle().clone(),
10741074
events,
10751075
downloader,
10761076
endpoint.clone(),
1077-
));
1077+
);
10781078
router = router.accept(crate::ALPN, blobs.clone());
10791079

10801080
// Build the router

tests/gc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{
33
io,
44
io::{Cursor, Write},
55
path::PathBuf,
6-
sync::Arc,
76
time::Duration,
87
};
98

@@ -41,7 +40,7 @@ use tokio::io::AsyncReadExt;
4140
#[derive(Debug)]
4241
pub struct Node<S> {
4342
pub router: iroh::protocol::Router,
44-
pub blobs: Arc<Blobs<S>>,
43+
pub blobs: Blobs<S>,
4544
pub store: S,
4645
pub _local_pool: LocalPool,
4746
}

0 commit comments

Comments
 (0)