Skip to content

fix: fix silent failure to add data of more than ~16MB via add_bytes or add_bytes_named #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
iroh.config.toml
.vscode/*
99 changes: 99 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ postcard = { version = "1", default-features = false, features = [
"use-std",
"experimental-derive",
] }
quic-rpc = { version = "0.17", optional = true }
quic-rpc = { version = "0.17.1", optional = true }
quic-rpc-derive = { version = "0.17", optional = true }
quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] }
rand = "0.8"
Expand Down Expand Up @@ -119,6 +119,7 @@ example-iroh = [
"dep:console",
"iroh/discovery-local-network"
]
test = ["quic-rpc/quinn-transport"]

[package.metadata.docs.rs]
all-features = true
Expand Down
16 changes: 11 additions & 5 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ where
}
});
tokio::spawn(async move {
// TODO: Is it important to catch this error? It should also result in an error on the
// response stream. If we deem it important, we could one-shot send it into the
// BlobAddProgress and return from there. Not sure.
if let Err(err) = sink.send_all(&mut input).await {
// if we get an error in send_all due to the connection being closed, this will just fail again.
// if we get an error due to something else (serialization or size limit), tell the remote to abort.
sink.send(AddStreamUpdate::Abort).await.ok();
warn!("Failed to send input stream to remote: {err:?}");
}
});
Expand All @@ -281,7 +281,7 @@ where

/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
self.add_stream(input, SetTagOption::Auto).await?.await
}

Expand All @@ -291,7 +291,7 @@ where
bytes: impl Into<Bytes>,
name: impl Into<Tag>,
) -> anyhow::Result<AddOutcome> {
let input = futures_lite::stream::once(Ok(bytes.into()));
let input = chunked_bytes_stream(bytes.into(), 1024 * 64).map(Ok);
self.add_stream(input, SetTagOption::Named(name.into()))
.await?
.await
Expand Down Expand Up @@ -987,6 +987,12 @@ pub struct DownloadOptions {
pub mode: DownloadMode,
}

fn chunked_bytes_stream(mut b: Bytes, c: usize) -> impl Stream<Item = Bytes> {
futures_lite::stream::iter(std::iter::from_fn(move || {
Some(b.split_to(b.len().min(c))).filter(|x| !x.is_empty())
}))
}

#[cfg(test)]
mod tests {
use std::{path::Path, time::Duration};
Expand Down
153 changes: 153 additions & 0 deletions tests/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#![cfg(feature = "test")]
use std::{net::SocketAddr, path::PathBuf, sync::Arc};

use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool};
use quic_rpc::transport::quinn::QuinnConnector;
use quinn::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
rustls, ClientConfig, Endpoint, ServerConfig,
};
use rcgen::CertifiedKey;
use tempfile::TempDir;
use testresult::TestResult;
use tokio_util::task::AbortOnDropHandle;

type QC = QuinnConnector<iroh_blobs::rpc::proto::Response, iroh_blobs::rpc::proto::Request>;
type BlobsClient = iroh_blobs::rpc::client::blobs::Client<QC>;

/// Builds default quinn client config and trusts given certificates.
///
/// ## Args
///
/// - server_certs: a list of trusted certificates in DER format.
fn configure_client(server_certs: &[CertifiedKey]) -> anyhow::Result<ClientConfig> {
let mut certs = rustls::RootCertStore::empty();
for cert in server_certs {
let cert = cert.cert.der().clone();
certs.add(cert)?;
}

let crypto_client_config = rustls::ClientConfig::builder_with_provider(Arc::new(
rustls::crypto::ring::default_provider(),
))
.with_protocol_versions(&[&rustls::version::TLS13])
.expect("valid versions")
.with_root_certificates(certs)
.with_no_client_auth();
let quic_client_config = QuicClientConfig::try_from(crypto_client_config)?;

Ok(ClientConfig::new(Arc::new(quic_client_config)))
}

/// Returns default server configuration along with its certificate.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
fn configure_server() -> anyhow::Result<(ServerConfig, CertifiedKey)> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
let cert_der = cert.cert.der();
let priv_key = rustls::pki_types::PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der());
let cert_chain = vec![cert_der.clone()];

let crypto_server_config = rustls::ServerConfig::builder_with_provider(Arc::new(
rustls::crypto::ring::default_provider(),
))
.with_protocol_versions(&[&rustls::version::TLS13])
.expect("valid versions")
.with_no_client_auth()
.with_single_cert(cert_chain, priv_key.into())?;
let quic_server_config = QuicServerConfig::try_from(crypto_server_config)?;
let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));

Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_concurrent_uni_streams(0_u8.into());

Ok((server_config, cert))
}

pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, CertifiedKey)> {
let (server_config, server_cert) = configure_server()?;
let endpoint = Endpoint::server(server_config, bind_addr)?;
Ok((endpoint, server_cert))
}

pub fn make_client_endpoint(
bind_addr: SocketAddr,
server_certs: &[CertifiedKey],
) -> anyhow::Result<Endpoint> {
let client_cfg = configure_client(server_certs)?;
let mut endpoint = Endpoint::client(bind_addr)?;
endpoint.set_default_client_config(client_cfg);
Ok(endpoint)
}

/// An iroh node that just has the blobs transport
#[derive(Debug)]
pub struct Node {
pub router: iroh::protocol::Router,
pub blobs: Blobs<iroh_blobs::store::fs::Store>,
pub local_pool: LocalPool,
pub rpc_task: AbortOnDropHandle<()>,
}

impl Node {
pub async fn new(path: PathBuf) -> anyhow::Result<(Self, SocketAddr, CertifiedKey)> {
let store = iroh_blobs::store::fs::Store::load(path).await?;
let local_pool = LocalPool::default();
let endpoint = iroh::Endpoint::builder().bind().await?;
let blobs = Blobs::builder(store).build(local_pool.handle(), &endpoint);
let router = iroh::protocol::Router::builder(endpoint)
.accept(iroh_blobs::ALPN, blobs.clone())
.spawn()
.await?;
let (config, key) = configure_server()?;
let endpoint = quinn::Endpoint::server(config, "127.0.0.1:0".parse().unwrap())?;
let local_addr = endpoint.local_addr()?;
let rpc_server = quic_rpc::transport::quinn::QuinnListener::new(endpoint)?;
let rpc_server =
quic_rpc::RpcServer::<iroh_blobs::rpc::proto::RpcService, _>::new(rpc_server);
let blobs2 = blobs.clone();
let rpc_task = rpc_server
.spawn_accept_loop(move |msg, chan| blobs2.clone().handle_rpc_request(msg, chan));
let node = Self {
router,
blobs,
local_pool,
rpc_task,
};
Ok((node, local_addr, key))
}
}

async fn node_and_client() -> TestResult<(Node, BlobsClient, TempDir)> {
let testdir = tempfile::tempdir()?;
let (node, addr, key) = Node::new(testdir.path().join("blobs")).await?;
let client = make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[key])?;
let client = QuinnConnector::new(client, addr, "localhost".to_string());
let client = quic_rpc::RpcClient::<iroh_blobs::rpc::proto::RpcService, _>::new(client);
let client = iroh_blobs::rpc::client::blobs::Client::new(client);
Ok((node, client, testdir))
}

#[tokio::test]
async fn quinn_rpc_smoke() -> TestResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let (_node, client, _testdir) = node_and_client().await?;
let data = b"hello";
let hash = client.add_bytes(data.to_vec()).await?.hash;
assert_eq!(hash, iroh_blobs::Hash::new(data));
let data2 = client.read_to_bytes(hash).await?;
assert_eq!(data, &data2[..]);
Ok(())
}

#[tokio::test]
async fn quinn_rpc_large() -> TestResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let (_node, client, _testdir) = node_and_client().await?;
let data = vec![0; 1024 * 1024 * 16];
let hash = client.add_bytes(data.clone()).await?.hash;
assert_eq!(hash, iroh_blobs::Hash::new(&data));
let data2 = client.read_to_bytes(hash).await?;
assert_eq!(data, &data2[..]);
Ok(())
}
Loading