Skip to content

feat: Add a lazily initialized in mem client for ffi etc. #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use spawn_accept_loop
  • Loading branch information
rklaehn committed Nov 14, 2024
commit 80327e48c73f5b9bb97587c42e7106ff8c3bc2bd
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" }
1 change: 0 additions & 1 deletion src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
}

/// Handle receiving a [`Message`].
///
// This is called in the actor loop, and only async because subscribing to an existing transfer
// sends the initial state.
async fn handle_message(&mut self, msg: Message) {
Expand Down
13 changes: 7 additions & 6 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@
//! # use bao_tree::{ChunkNum, ChunkRanges};
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
//! # let hash: iroh_blobs::Hash = [0; 32].into();
//! let ranges = &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
//! let ranges =
//! &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110));
//! let spec = RangeSpecSeq::from_ranges([ranges]);
//! let request = GetRequest::new(hash, spec);
//! ```
Expand Down Expand Up @@ -236,8 +237,8 @@
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
//! # let hash: iroh_blobs::Hash = [0; 32].into();
//! let spec = RangeSpecSeq::from_ranges_infinite([
//! ChunkRanges::all(), // the collection itself
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
//! ChunkRanges::all(), // the collection itself
//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child
//! ]);
//! let request = GetRequest::new(hash, spec);
//! ```
Expand All @@ -252,9 +253,9 @@
//! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq};
//! # let hash: iroh_blobs::Hash = [0; 32].into();
//! let spec = RangeSpecSeq::from_ranges([
//! ChunkRanges::empty(), // we don't need the collection itself
//! ChunkRanges::empty(), // we don't need the first child either
//! ChunkRanges::all(), // we need the second child completely
//! ChunkRanges::empty(), // we don't need the collection itself
//! ChunkRanges::empty(), // we don't need the first child either
//! ChunkRanges::all(), // we need the second child completely
//! ]);
//! let request = GetRequest::new(hash, spec);
//! ```
Expand Down
47 changes: 4 additions & 43 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ use quic_rpc::{
server::{ChannelTypes, RpcChannel, RpcServerError},
RpcClient, RpcServer,
};
use tokio::task::JoinSet;
use tokio_util::task::AbortOnDropHandle;
use tracing::{error, warn};

use crate::{
export::ExportProgress,
Expand Down Expand Up @@ -892,7 +890,7 @@ impl<D: crate::store::Store> Blobs<D> {
#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: RpcClient<crate::rpc::proto::RpcService, MemConnector>,
client: RpcClient<RpcService, MemConnector>,
/// Handler task
_handler: AbortOnDropHandle<()>,
}
Expand All @@ -903,45 +901,8 @@ impl RpcHandler {
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = RpcClient::new(connector);
let task = tokio::spawn(async move {
let mut tasks = JoinSet::new();
loop {
tokio::select! {
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
if let Err(e) = res {
if e.is_panic() {
error!("Panic handling RPC request: {e}");
}
}
}
req = listener.accept() => {
let req = match req {
Ok(req) => req,
Err(e) => {
warn!("Error accepting RPC request: {e}");
continue;
}
};
let blobs = blobs.clone();
tasks.spawn(async move {
let (req, client) = match req.read_first().await {
Ok((req, client)) => (req, client),
Err(e) => {
warn!("Error reading first message: {e}");
return;
}
};
if let Err(cause) = blobs.handle_rpc_request(req, client).await {
warn!("Error handling RPC request: {:?}", cause);
}
});
}
}
}
});
Self {
client,
_handler: AbortOnDropHandle::new(task),
}
let _handler = listener
.spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan));
Self { client, _handler }
}
}
1 change: 0 additions & 1 deletion src/util/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ pub struct PathContent {
}

/// Walks the directory to get the total size and number of files in directory or file
///
// TODO: possible combine with `scan_dir`
pub fn path_content_info(path: impl AsRef<Path>) -> anyhow::Result<PathContent> {
path_content_info0(path)
Expand Down
Loading