Skip to content

feat: add mem rpc client #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ tracing = "0.1"

# rpc
nested_enum_utils = { version = "0.1.0", optional = true }
quic-rpc = { version = "0.15", optional = true }
quic-rpc = { version = "0.15.1", optional = true }
quic-rpc-derive = { version = "0.15", optional = true }
serde-error = { version = "0.1.3", optional = true }
portable-atomic = { version = "1.9.0", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct Engine<D> {
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl<D: iroh_blobs::store::Store> Engine<D> {
Expand Down Expand Up @@ -118,6 +120,8 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
default_author: Arc::new(default_author),
local_pool_handle,
blob_store: bao_store,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
})
}

Expand Down
45 changes: 38 additions & 7 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Quic RPC implementation for docs.

use proto::RpcService;
use quic_rpc::server::{ChannelTypes, RpcChannel};
use proto::{Request, RpcService};
use quic_rpc::{
server::{ChannelTypes, RpcChannel},
RpcClient, RpcServer,
};
use tokio_util::task::AbortOnDropHandle;

use crate::engine::Engine;

Expand All @@ -14,15 +18,22 @@ type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Get an in memory client to interact with the docs engine.
pub fn client(&self) -> &client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self))
.client
}

/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
&self,
msg: crate::rpc::proto::Request,
self,
msg: Request,
chan: RpcChannel<RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
use crate::rpc::proto::Request::*;

let this = self.clone();
use Request::*;
let this = self;
match msg {
Open(msg) => chan.rpc(msg, this, Self::doc_open).await,
Close(msg) => chan.rpc(msg, this, Self::doc_close).await,
Expand Down Expand Up @@ -65,3 +76,23 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
}
}
}

#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: client::docs::MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = client::docs::MemClient::new(RpcClient::new(connector));
let _handler = listener
.spawn_accept_loop(move |req, chan| engine.clone().handle_rpc_request(req, chan));
Self { client, _handler }
}
}
1 change: 1 addition & 0 deletions src/rpc/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{

/// Iroh docs client.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}
Expand Down
16 changes: 14 additions & 2 deletions src/rpc/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use iroh_base::node_addr::AddrInfoOptions;
use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicBool, Ordering};
use quic_rpc::{client::BoxedConnector, message::RpcMsg, Connector};
use quic_rpc::{
client::BoxedConnector, message::RpcMsg, transport::flume::FlumeConnector, Connector,
};
use serde::{Deserialize, Serialize};

use super::flatten;
use super::{authors, flatten};
use crate::{
actor::OpenState,
rpc::proto::{
Expand All @@ -38,8 +40,13 @@ pub use crate::{
Entry,
};

/// Type alias for a memory-backed client.
pub type MemClient =
Client<FlumeConnector<crate::rpc::proto::Response, crate::rpc::proto::Request>>;

/// Iroh docs client.
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Client<C = BoxedConnector<RpcService>> {
pub(super) rpc: quic_rpc::RpcClient<RpcService, C>,
}
Expand All @@ -50,6 +57,11 @@ impl<C: Connector<RpcService>> Client<C> {
Self { rpc }
}

/// Returns an authors client.
pub fn authors(&self) -> authors::Client<C> {
authors::Client::new(self.rpc.clone())
}

/// Creates a client.
pub async fn create(&self) -> Result<Doc<C>> {
let res = self.rpc.rpc(CreateRequest {}).await??;
Expand Down
Loading