From a6988d755dc5846418a0291a51faa8ea466749db Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Fri, 23 Feb 2024 14:27:56 -0600 Subject: [PATCH 01/17] Debugging loopback issue --- citadel-internal-service/src/kernel/mod.rs | 4 +-- service/Cargo.toml | 3 ++- service/src/main.rs | 29 +++++++++++++++++++++- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 3660823..615642a 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -591,8 +591,8 @@ fn handle_connection( } } } - Err(_) => { - warn!(target: "citadel", "Bad message from client"); + Err(err) => { + warn!(target: "citadel", "Bad message from client: {err:?}"); } } } diff --git a/service/Cargo.toml b/service/Cargo.toml index 991ecd7..7e969e3 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -22,4 +22,5 @@ structopt = { workspace = true } tokio = { workspace = true, features = ["macros", "rt"] } citadel-internal-service = { workspace = true } citadel_sdk = { workspace = true } -citadel_logging = { workspace = true } \ No newline at end of file +citadel_logging = { workspace = true } +parking_lot = { version = "0.12.1", features = ["deadlock_detection"] } \ No newline at end of file diff --git a/service/src/main.rs b/service/src/main.rs index 37ea230..cea36b7 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -2,11 +2,38 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType}; use std::error::Error; use std::net::SocketAddr; -use structopt::StructOpt; +use structopt::{lazy_static, StructOpt}; +use citadel_logging::{info, error}; #[tokio::main] async fn main() -> Result<(), Box> { citadel_logging::setup_log(); + lazy_static::lazy_static! { + static ref DEADLOCK_INIT: () = { + let _ = std::thread::spawn(move || { + info!(target: "gadget", "Executing deadlock detector ..."); + use std::thread; + use std::time::Duration; + use parking_lot::deadlock; + loop { + std::thread::sleep(Duration::from_secs(5)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + error!(target: "citadel", "{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + error!(target: "citadel", "Deadlock #{}", i); + for t in threads { + info!(target: "citadel", "Thread Id {:#?}", t.thread_id()); + error!(target: "citadel", "{:#?}", t.backtrace()); + } + } + } + }); + }; +} let opts: Options = Options::from_args(); let service = CitadelWorkspaceService::new(opts.bind); From 74e8fc7c411dd364d45612246030131673bfe300 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Fri, 23 Feb 2024 18:08:20 -0500 Subject: [PATCH 02/17] Add loopback test (incomplete) --- Cargo.toml | 3 +- citadel-internal-service/tests/loopback.rs | 56 +++++++++++++++++++ service/Cargo.toml | 6 +- service/src/main.rs | 65 ++++++++++++---------- 4 files changed, 97 insertions(+), 33 deletions(-) create mode 100644 citadel-internal-service/tests/loopback.rs diff --git a/Cargo.toml b/Cargo.toml index c118bee..d620740 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,4 +41,5 @@ uuid = { version="1.3.3", features = [ anyhow = "1.0.71" async-recursion = { version = "1.0.4" } parking_lot = { version = "0.12.1" } -structopt = { version = "0.3.26" } \ No newline at end of file +structopt = { version = "0.3.26" } +lazy_static = "1.4.0" \ No newline at end of file diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs new file mode 100644 index 0000000..d9c60ba --- /dev/null +++ b/citadel-internal-service/tests/loopback.rs @@ -0,0 +1,56 @@ +mod common; + +#[cfg(test)] +mod tests { + use crate::common::{ + register_and_connect_to_server, server_info_skip_cert_verification, RegisterAndConnectItems, + }; + use citadel_internal_service::kernel::CitadelWorkspaceService; + use citadel_logging::setup_log; + use citadel_sdk::prelude::*; + + #[tokio::test] + async fn test_2_peers_1_service() -> Result<(), Box> { + setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let mut to_spawn = vec![]; + to_spawn.push(RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: format!("Peer 0"), + username: format!("peer.0"), + password: format!("secret_0").into_bytes().to_owned(), + }); + + to_spawn.push(RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: format!("Peer 1"), + username: format!("peer.1"), + password: format!("secret_1").into_bytes().to_owned(), + }); + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (peer_0_tx, peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (peer_1_tx, peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + Ok(()) + } +} diff --git a/service/Cargo.toml b/service/Cargo.toml index 7e969e3..b335bac 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -8,9 +8,10 @@ edition = "2021" [features] localhost-testing = ["citadel_sdk/localhost-testing"] vendored = ["citadel-internal-service/vendored"] +deadlock-detection = ["parking_lot/deadlock_detection", "parking_lot", "lazy_static"] [[bin]] -name = "citadel_service_bin" +name = "internal-service" path = "src/main.rs" [[bin]] @@ -23,4 +24,5 @@ tokio = { workspace = true, features = ["macros", "rt"] } citadel-internal-service = { workspace = true } citadel_sdk = { workspace = true } citadel_logging = { workspace = true } -parking_lot = { version = "0.12.1", features = ["deadlock_detection"] } \ No newline at end of file +parking_lot = { workspace = true, optional = true } +lazy_static = { workspace = true, optional = true } \ No newline at end of file diff --git a/service/src/main.rs b/service/src/main.rs index cea36b7..47b9056 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -2,38 +2,12 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType}; use std::error::Error; use std::net::SocketAddr; -use structopt::{lazy_static, StructOpt}; -use citadel_logging::{info, error}; +use std::time::Duration; +use structopt::StructOpt; #[tokio::main] async fn main() -> Result<(), Box> { citadel_logging::setup_log(); - lazy_static::lazy_static! { - static ref DEADLOCK_INIT: () = { - let _ = std::thread::spawn(move || { - info!(target: "gadget", "Executing deadlock detector ..."); - use std::thread; - use std::time::Duration; - use parking_lot::deadlock; - loop { - std::thread::sleep(Duration::from_secs(5)); - let deadlocks = deadlock::check_deadlock(); - if deadlocks.is_empty() { - continue; - } - - error!(target: "citadel", "{} deadlocks detected", deadlocks.len()); - for (i, threads) in deadlocks.iter().enumerate() { - error!(target: "citadel", "Deadlock #{}", i); - for t in threads { - info!(target: "citadel", "Thread Id {:#?}", t.thread_id()); - error!(target: "citadel", "{:#?}", t.backtrace()); - } - } - } - }); - }; -} let opts: Options = Options::from_args(); let service = CitadelWorkspaceService::new(opts.bind); @@ -46,14 +20,16 @@ async fn main() -> Result<(), Box> { builder = builder.with_insecure_skip_cert_verification() } - builder.build(service)?.await?; + let internal_service = builder.build(service)?; + tokio::spawn(internal_service); + tokio::time::sleep(Duration::from_millis(1000)).await; Ok(()) } #[derive(Debug, StructOpt)] #[structopt( - name = "citadel-service-bin", + name = "internal-service", about = "Used for running a local service for citadel applications" )] struct Options { @@ -62,3 +38,32 @@ struct Options { #[structopt(short, long)] dangerous: Option, } + +#[cfg(feature = "deadlock-detection")] +lazy_static::lazy_static! { + static ref DEADLOCK_INIT: () = { + let _ = std::thread::spawn(move || { + info!(target: "gadget", "Executing deadlock detector ..."); + use std::thread; + use std::time::Duration; + use parking_lot::deadlock; + use citadel_logging::*; + loop { + std::thread::sleep(Duration::from_secs(5)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + error!(target: "citadel", "{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + error!(target: "citadel", "Deadlock #{}", i); + for t in threads { + error!(target: "citadel", "Thread Id {:#?}", t.thread_id()); + error!(target: "citadel", "{:#?}", t.backtrace()); + } + } + } + }); + }; +} From 20ecc0a9f5857d8b155ab673303d1c9b1a552eb8 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Fri, 23 Feb 2024 18:26:15 -0500 Subject: [PATCH 03/17] Refactor and add failing loopback test --- citadel-internal-service/tests/common/mod.rs | 114 ++++++++++++++++++- citadel-internal-service/tests/loopback.rs | 57 +++++++--- citadel-internal-service/tests/service.rs | 112 ++++-------------- 3 files changed, 172 insertions(+), 111 deletions(-) diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index fbb8700..1505e28 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -2,7 +2,8 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_connector::util::{InternalServiceConnector, WrappedSink}; use citadel_internal_service_types::{ - InternalServiceRequest, InternalServiceResponse, PeerConnectSuccess, PeerRegisterSuccess, + InternalServiceRequest, InternalServiceResponse, PeerConnectNotification, PeerConnectSuccess, + PeerRegisterNotification, PeerRegisterSuccess, }; use citadel_logging::info; use citadel_sdk::prefabs::server::client_connect_listener::ClientConnectListenerKernel; @@ -397,6 +398,117 @@ pub async fn register_and_connect_to_server_then_peers( Ok(returned_service_info) } +pub async fn register_p2p( + to_service_a: &mut UnboundedSender, + from_service_a: &mut UnboundedReceiver, + cid_a: u64, + to_service_b: &mut UnboundedSender, + from_service_b: &mut UnboundedReceiver, + cid_b: u64, + session_security_settings: SessionSecuritySettings, +) -> Result<(), Box> { + // Service A Requests to Register with Service B + to_service_a + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid_a, + peer_cid: cid_b, + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + // Service B receives Register Request from Service A + let inbound_response = from_service_b.recv().await.unwrap(); + match inbound_response { + InternalServiceResponse::PeerRegisterNotification(PeerRegisterNotification { + cid, + peer_cid, + peer_username: _, + request_id: _, + }) => { + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + } + _ => { + panic!( + "Peer B didn't get the PeerRegisterNotification, instead got {inbound_response:?}" + ); + } + } + + // Service B Sends Register Request to Accept + to_service_b + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid_b, + peer_cid: cid_a, + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + // Receive Register Success Responses + let _ = from_service_a.recv().await.unwrap(); + let _ = from_service_b.recv().await.unwrap(); + Ok(()) +} + +pub async fn connect_p2p( + to_service_a: &mut UnboundedSender, + from_service_a: &mut UnboundedReceiver, + cid_a: u64, + to_service_b: &mut UnboundedSender, + from_service_b: &mut UnboundedReceiver, + cid_b: u64, + session_security_settings: SessionSecuritySettings, +) -> Result<(), Box> { + // Service A Requests To Connect + to_service_a + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid_a, + peer_cid: cid_b, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + // Service B Receives Connect Request from Service A + let inbound_response = from_service_b.recv().await.unwrap(); + match inbound_response { + InternalServiceResponse::PeerConnectNotification(PeerConnectNotification { + cid, + peer_cid, + session_security_settings: _, + udp_mode: _, + request_id: _, + }) => { + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + } + _ => { + panic!("Peer B didn't get the PeerConnectNotification"); + } + } + + // Service B Sends Connect Request to Accept + to_service_b + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid_b, + peer_cid: cid_a, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + // Receive Connect Success Responses + let _ = from_service_a.recv().await.unwrap(); + let _ = from_service_b.recv().await.unwrap(); + Ok(()) +} + pub fn spawn_services(futures_to_spawn: Vec) { let services_to_spawn = async move { let (returned_future, _, _) = futures::future::select_all(futures_to_spawn).await; diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index d9c60ba..1f6dc72 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -30,27 +30,48 @@ mod tests { // Now with both the server and the IS running, we can test both peers trying to connect, then to each other // via p2p - let mut to_spawn = vec![]; - to_spawn.push(RegisterAndConnectItems { - internal_service_addr: service_addr, - server_addr: server_bind_address, - full_name: format!("Peer 0"), - username: format!("peer.0"), - password: format!("secret_0").into_bytes().to_owned(), - }); - - to_spawn.push(RegisterAndConnectItems { - internal_service_addr: service_addr, - server_addr: server_bind_address, - full_name: format!("Peer 1"), - username: format!("peer.1"), - password: format!("secret_1").into_bytes().to_owned(), - }); + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); - let (peer_0_tx, peer_0_rx, peer_0_cid) = returned_service_info.remove(0); - let (peer_1_tx, peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; Ok(()) } } diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index 1fe1f7b..63e28ad 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -11,7 +11,6 @@ mod tests { use citadel_internal_service_connector::util::InternalServiceConnector; use citadel_internal_service_types::{ InternalServiceRequest, InternalServiceResponse, MessageNotification, MessageSendSuccess, - PeerConnectNotification, PeerRegisterNotification, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -502,97 +501,26 @@ mod tests { let (ref mut to_service_a, ref mut from_service_a, cid_a) = item; for neighbor in neighbor_items { let (ref mut to_service_b, ref mut from_service_b, cid_b) = neighbor; - let session_security_settings = - SessionSecuritySettingsBuilder::default().build().unwrap(); - - // Service A Requests to Register with Service B - to_service_a - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: (*cid_b), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Service B receives Register Request from Service A - let inbound_response = from_service_b.recv().await.unwrap(); - match inbound_response { - InternalServiceResponse::PeerRegisterNotification( - PeerRegisterNotification { - cid, - peer_cid, - peer_username: _, - request_id: _, - }, - ) => { - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Peer B didn't get the PeerRegisterNotification, instead got {inbound_response:?}"); - } - } - - // Service B Sends Register Request to Accept - to_service_b - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: (*cid_a), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Receive Register Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); - - // Service A Requests To Connect - to_service_a - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: *cid_b, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Service B Receives Connect Request from Service A - let inbound_response = from_service_b.recv().await.unwrap(); - match inbound_response { - InternalServiceResponse::PeerConnectNotification(PeerConnectNotification { - cid, - peer_cid, - session_security_settings: _, - udp_mode: _, - request_id: _, - }) => { - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Peer B didn't get the PeerConnectNotification"); - } - } - - // Service B Sends Connect Request to Accept - to_service_b - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: *cid_a, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Receive Connect Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); + crate::common::register_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + SessionSecuritySettings::default(), + ) + .await?; + crate::common::connect_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + SessionSecuritySettings::default(), + ) + .await?; } } Ok(()) From 54d0da2328da3f37e9419b6977cb792d7fcdd6b3 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Sun, 25 Feb 2024 12:44:19 -0500 Subject: [PATCH 04/17] Get intra-kernel peer_register and peer_connect working --- Cargo.toml | 6 +- citadel-internal-service-types/src/lib.rs | 1 + citadel-internal-service/src/kernel/mod.rs | 25 ++- .../src/kernel/request_handler.rs | 205 +++++++++++++++-- citadel-internal-service/tests/common/mod.rs | 208 +++++------------- citadel-internal-service/tests/loopback.rs | 3 +- 6 files changed, 258 insertions(+), 190 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d620740..7106e2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,9 @@ citadel-internal-service-connector = { path = "./citadel-internal-service-connec citadel-internal-service-macros = { path = "./citadel-internal-service-macros", default-features = false, version = "0.1.0" } # Avarok deps -citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" } -citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" } -citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", default-features = false } +citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" } +citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" } +citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat", default-features = false } # Standard deps serde = { version = "1.0.104", features = ["derive"] } diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 29579f9..65f5e33 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -115,6 +115,7 @@ pub struct DeleteVirtualFileFailure { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PeerConnectSuccess { pub cid: u64, + pub peer_cid: u64, pub request_id: Option, } diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 615642a..a2fbab4 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -167,7 +167,7 @@ impl NetKernel for CitadelWorkspaceService { } async fn on_start(&self) -> Result<(), NetworkError> { - let mut remote = self.remote.clone().unwrap(); + let remote = self.remote.clone().unwrap(); let remote_for_closure = remote.clone(); let listener = tokio::net::TcpListener::bind(self.bind_address).await?; @@ -198,14 +198,20 @@ impl NetKernel for CitadelWorkspaceService { let inbound_command_task = async move { while let Some((command, conn_id)) = rx.recv().await { // TODO: handle error once payload_handler is fallible - handle_request( - command, - conn_id, - server_connection_map, - &mut remote, - tcp_connection_map, - ) - .await; + let mut remote = remote.clone(); + let server_connection_map = server_connection_map.clone(); + let tcp_connection_map = tcp_connection_map.clone(); + // Spawn the task, that way, we can handle multiple requests in parallel + tokio::task::spawn(async move { + handle_request( + command, + conn_id, + &server_connection_map, + &mut remote, + &tcp_connection_map, + ) + .await; + }); } Ok(()) }; @@ -221,7 +227,6 @@ impl NetKernel for CitadelWorkspaceService { } async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> { - info!(target: "citadel", "NODE EVENT RECEIVED WITH MESSAGE: {message:?}"); match message { NodeResult::Disconnect(disconnect) => { if let Some(conn) = disconnect.v_conn_type { diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 89f61bb..e2aede8 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -7,6 +7,8 @@ use citadel_internal_service_types::*; use citadel_logging::tracing::log; use citadel_logging::{error, info}; use citadel_sdk::prefabs::ClientServerRemote; +use citadel_sdk::prelude::remote_specialization::PeerRemote; +use citadel_sdk::prelude::results::PeerRegisterStatus; use citadel_sdk::prelude::*; use futures::StreamExt; use std::collections::HashMap; @@ -299,7 +301,6 @@ pub async fn handle_request( session_security_settings, request_id, } => { - info!(target: "citadel", "About to connect to server {server_addr:?} for user {username}"); match remote .register( server_addr, @@ -330,17 +331,23 @@ pub async fn handle_request( request_id, }; - handle_request( - connect_command, - uuid, - server_connection_map, - remote, - tcp_connection_map, - ) - .await + let server_connection_map = server_connection_map.clone(); + let mut remote = remote.clone(); + let tcp_connection_map = tcp_connection_map.clone(); + tokio::task::spawn(async move { + handle_request( + connect_command, + uuid, + &server_connection_map, + &mut remote, + &tcp_connection_map, + ) + .await + }); } }, Err(err) => { + citadel_logging::error!(target: "citadel", "Failure on register: {err:?}"); let response = InternalServiceResponse::RegisterFailure( citadel_internal_service_types::RegisterFailure { message: err.into_string(), @@ -777,9 +784,68 @@ pub async fn handle_request( session_security_settings, ); + let remote_for_scanner = &remote; + let scan_for_status = |status| async move { + if let NodeResult::PeerEvent(PeerEvent { + event: + PeerSignal::PostRegister { + peer_conn_type, + inviter_username: _, + invitee_username: _, + ticket_opt: _, + invitee_response: resp, + }, + ticket: _, + }) = status + { + match resp { + Some(PeerResponse::Accept(..)) => Ok(Some(PeerRegisterStatus::Accepted)), + Some(PeerResponse::Decline) => Ok(Some(PeerRegisterStatus::Declined)), + Some(PeerResponse::Timeout) => Ok(Some(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) })), + _ => { + // This may be a signal needing to be routed + if peer_conn_type.get_original_target_cid() == peer_cid { + // If the target peer is the same as the peer_cid, that means that + // the peer is in the same kernel space. Route the signal through the TCP client + let lock = server_connection_map.lock().await; + if let Some(peer) = lock.get(&peer_cid) { + let username_of_sender = remote_for_scanner.account_manager().get_username_by_cid(cid).await + .map_err(|err| NetworkError::msg(format!("Unable to get username for cid: {cid}. Error: {err:?}")))? + .ok_or_else(|| NetworkError::msg(format!("Unable to get username for cid: {cid}")))?; + + let peer_uuid = peer.associated_tcp_connection; + drop(lock); + + log::debug!(target: "citadel", "Routing signal meant for other peer in intra-kernel space"); + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::PeerRegisterNotification( + PeerRegisterNotification { + cid: peer_cid, + peer_cid: cid, + peer_username: username_of_sender, + request_id: Some(request_id), + }, + ), + peer_uuid + ).await + } + } + + Ok(None) + } + } + } else { + Ok(None) + } + }; + match client_to_server_remote.propose_target(cid, peer_cid).await { Ok(symmetric_identifier_handle_ref) => { - match symmetric_identifier_handle_ref.register_to_peer().await { + match symmetric_identifier_handle_ref + .register_to_peer_with_fn(scan_for_status) + .await + { Ok(_peer_register_success) => { let account_manager = symmetric_identifier_handle_ref.account_manager(); if let Ok(target_information) = @@ -796,14 +862,19 @@ pub async fn handle_request( request_id, }; - handle_request( - connect_command, - uuid, - server_connection_map, - remote, - tcp_connection_map, - ) - .await; + let server_connection_map = server_connection_map.clone(); + let mut remote = remote.clone(); + let tcp_connection_map = tcp_connection_map.clone(); + tokio::task::spawn(async move { + handle_request( + connect_command, + uuid, + &server_connection_map, + &mut remote, + &tcp_connection_map, + ) + .await; + }); } false => { send_response_to_tcp_client( @@ -864,7 +935,6 @@ pub async fn handle_request( session_security_settings, request_id, } => { - // TODO: check to see if peer is already in the hashmap let client_to_server_remote = ClientServerRemote::new( VirtualTargetType::LocalGroupPeer { implicated_cid: cid, @@ -873,10 +943,98 @@ pub async fn handle_request( remote.clone(), session_security_settings, ); + match client_to_server_remote.find_target(cid, peer_cid).await { - Ok(symmetric_identifier_handle_ref) => { - match symmetric_identifier_handle_ref - .connect_to_peer_custom(session_security_settings, udp_mode) + Ok(symmetric_identifier_handle) => { + let symmetric_identifier_handle = &symmetric_identifier_handle.into_owned(); + + let remote_for_scanner = &remote; + let signal_scanner = |status| async move { + match status { + NodeResult::PeerChannelCreated(PeerChannelCreated { + ticket: _, + channel, + udp_rx_opt, + }) => { + let username = + symmetric_identifier_handle.target_username().cloned(); + + let remote = PeerRemote { + inner: (*remote_for_scanner).clone(), + peer: symmetric_identifier_handle + .try_as_peer_connection() + .await? + .as_virtual_connection(), + username, + session_security_settings, + }; + + Ok(Some(results::PeerConnectSuccess { + remote, + channel, + udp_channel_rx: udp_rx_opt, + incoming_object_transfer_handles: None, + })) + } + + NodeResult::PeerEvent(PeerEvent { + event: + PeerSignal::PostConnect { + peer_conn_type: _, + ticket_opt: _, + invitee_response: Some(PeerResponse::Decline), + .. + }, + .. + }) => Err(NetworkError::msg(format!( + "Peer {peer_cid} declined connection" + ))), + + NodeResult::PeerEvent(PeerEvent { + event: + PeerSignal::PostConnect { + peer_conn_type, + ticket_opt: _, + invitee_response: None, + .. + }, + .. + }) => { + // Route the signal to the intra-kernel user + if peer_conn_type.get_original_target_cid() == peer_cid { + let lock = server_connection_map.lock().await; + if let Some(conn) = lock.get(&peer_cid) { + let peer_uuid = conn.associated_tcp_connection; + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::PeerConnectNotification( + PeerConnectNotification { + cid: peer_cid, + peer_cid: cid, + session_security_settings, + udp_mode, + request_id: Some(request_id), + }, + ), + peer_uuid, + ) + .await + } + } + + Ok(None) + } + + _ => Ok(None), + } + }; + + match symmetric_identifier_handle + .connect_to_peer_with_fn( + session_security_settings, + udp_mode, + signal_scanner, + ) .await { Ok(peer_connect_success) => { @@ -890,7 +1048,7 @@ pub async fn handle_request( .add_peer_connection( peer_cid, sink, - symmetric_identifier_handle_ref.into_owned(), + symmetric_identifier_handle.clone(), ); let hm_for_conn = tcp_connection_map.clone(); @@ -899,6 +1057,7 @@ pub async fn handle_request( tcp_connection_map, InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, + peer_cid, request_id: Some(request_id), }), uuid, diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 1505e28..99f9c45 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -245,154 +245,27 @@ pub async fn register_and_connect_to_server_then_peers( let (ref mut to_service_b, ref mut from_service_b, cid_b) = neighbor; let session_security_settings = SessionSecuritySettingsBuilder::default().build().unwrap(); - - // now, both peers are connected and registered to the central server. Now, we - // need to have them peer-register to each other - info!( - target = "citadel", - "Peer {cid_a:?} Sending PeerRegister Request to {cid_b:?}" - ); - to_service_a - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: (*cid_b), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Receive Notification of Register Request - let peer_register_notification = from_service_b.recv().await.unwrap(); - assert!(matches!( - peer_register_notification, - InternalServiceResponse::PeerRegisterNotification(..) - )); - - info!( - target = "citadel", - "Peer {cid_b:?} Accepting PeerRegister Request From {cid_a:?}" - ); - to_service_b - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: (*cid_a), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username: _, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_b:?} Received PeerRegisterSuccess Signal" - ); - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } - } - - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username: _, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_a:?} Received PeerRegisterSuccess Signal" - ); - assert_eq!(cid, *cid_a); - assert_eq!(peer_cid, *cid_b); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } - } - - info!( - target = "citadel", - "Peer {cid_a:?} Sending PeerConnect Request to {cid_b:?}" - ); - to_service_a - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: *cid_b, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Receive Notification of Connect Request - let peer_connect_notification = from_service_b.recv().await.unwrap(); - assert!(matches!( - peer_connect_notification, - InternalServiceResponse::PeerConnectNotification(..) - )); - - info!( - target = "citadel", - "Peer {cid_b:?} Accepting PeerConnect Request From {cid_a:?}" - ); - to_service_b - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: *cid_a, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { - cid, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_b:?} Received PeerConnectSuccess Signal" - ); - assert_eq!(cid, *cid_b); - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } - - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { - cid, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_a:?} Received PeerConnectSuccess Signal" - ); - assert_eq!(cid, *cid_a); - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } + register_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + session_security_settings, + ) + .await?; + + connect_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + session_security_settings, + ) + .await?; } } Ok(returned_service_info) @@ -449,8 +322,24 @@ pub async fn register_p2p( .unwrap(); // Receive Register Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); + let resp = from_service_a.recv().await.unwrap(); + let InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { cid, peer_cid, .. }) = + resp + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_a); + assert_eq!(peer_cid, cid_b); + + let resp = from_service_b.recv().await.unwrap(); + let InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { cid, peer_cid, .. }) = + resp + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + Ok(()) } @@ -504,8 +393,23 @@ pub async fn connect_p2p( .unwrap(); // Receive Connect Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); + let signal = from_service_a.recv().await.unwrap(); + let InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, peer_cid, .. }) = + signal + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_a); + assert_eq!(peer_cid, cid_b); + let signal = from_service_b.recv().await.unwrap(); + let InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, peer_cid, .. }) = + signal + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + Ok(()) } diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index 1f6dc72..c1275d6 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -6,12 +6,11 @@ mod tests { register_and_connect_to_server, server_info_skip_cert_verification, RegisterAndConnectItems, }; use citadel_internal_service::kernel::CitadelWorkspaceService; - use citadel_logging::setup_log; use citadel_sdk::prelude::*; #[tokio::test] async fn test_2_peers_1_service() -> Result<(), Box> { - setup_log(); + crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); tokio::task::spawn(server); From 45fa75ff6bee19055053d87b81ea5db1722218f2 Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Sun, 25 Feb 2024 15:17:12 -0500 Subject: [PATCH 05/17] fix --- service/src/main.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/service/src/main.rs b/service/src/main.rs index 47b9056..d17c43e 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -2,7 +2,6 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_sdk::prelude::{BackendType, NodeBuilder, NodeType}; use std::error::Error; use std::net::SocketAddr; -use std::time::Duration; use structopt::StructOpt; #[tokio::main] @@ -20,9 +19,7 @@ async fn main() -> Result<(), Box> { builder = builder.with_insecure_skip_cert_verification() } - let internal_service = builder.build(service)?; - tokio::spawn(internal_service); - tokio::time::sleep(Duration::from_millis(1000)).await; + builder.build(service)?.await?; Ok(()) } From 9047ed97d70525a5022995a90f28892d707cbed1 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Thu, 29 Feb 2024 20:37:02 -0600 Subject: [PATCH 06/17] Intra-kernel support for SendFile --- .../src/kernel/request_handler.rs | 81 ++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index e2aede8..c249d90 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -464,30 +464,72 @@ pub async fn handle_request( Some(conn) => { let result = if let Some(peer_cid) = peer_cid { if let Some(peer_remote) = conn.peers.get(&peer_cid) { - let request = NodeRequest::SendObject(SendObject { - source: Box::new(source), - chunk_size, - implicated_cid: cid, - v_conn_type: *peer_remote.remote.user(), - transfer_type, - }); + let scan_for_status = |status| async move { + match status { + NodeResult::ObjectTransferHandle(ObjectTransferHandle { + ticket: _ticket, + mut handle, + }) => { + let target_cid = handle.receiver; + if target_cid != cid { + // Route the signal to the intra-kernel user + if target_cid == peer_cid { + let lock = server_connection_map.lock().await; + if let Some(conn) = lock.get(&peer_cid) { + let peer_uuid = conn.associated_tcp_connection; + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { + cid, + peer_cid, + metadata: handle.metadata, + }, + ), + peer_uuid, + ) + .await + } + } + return Ok(None); + } else { + while let Some(res) = handle.next().await { + log::trace!(target: "citadel", "Client received RES {:?}", res); + if let ObjectTransferStatus::TransferComplete = res + { + return Ok(Some(())); + } + } + } + Err(NetworkError::msg("Failed To Receive TransferComplete Response During FileTransfer")) + } - remote.send(request).await + res => { + log::error!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {:?}", res); + Err(NetworkError::msg( + "Invalid Response Received During FileTransfer", + )) + } + } + }; + peer_remote + .remote + .send_file_with_custom_opts_and_with_fn( + source, + chunk_size.unwrap_or_default(), + transfer_type, + scan_for_status, + ) + .await } else { Err(NetworkError::msg("Peer Connection Not Found")) } } else { - let request = NodeRequest::SendObject(SendObject { - source: Box::new(source), - chunk_size, - implicated_cid: cid, - v_conn_type: VirtualTargetType::LocalGroupServer { - implicated_cid: cid, - }, + conn.client_server_remote.send_file_with_custom_opts( + source, + chunk_size.unwrap_or_default(), transfer_type, - }); - - remote.send(request).await + ).await }; match result { @@ -1038,7 +1080,6 @@ pub async fn handle_request( .await { Ok(peer_connect_success) => { - let connection_cid = peer_connect_success.channel.get_peer_cid(); let (sink, mut stream) = peer_connect_success.channel.split(); server_connection_map .lock() @@ -1068,7 +1109,7 @@ pub async fn handle_request( let message = InternalServiceResponse::MessageNotification( MessageNotification { message: message.into_buffer(), - cid: connection_cid, + cid, peer_cid, request_id: Some(request_id), }, From af0acfa7349f522b937a90b3b71b779c651ead7c Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 3 Mar 2024 00:18:25 -0600 Subject: [PATCH 07/17] File Transfer Intra-Kernel WIP --- .../src/kernel/request_handler.rs | 57 ++++--- citadel-internal-service/tests/common/mod.rs | 78 +++++++++- .../tests/file_transfer.rs | 79 +--------- citadel-internal-service/tests/loopback.rs | 146 +++++++++++++++++- 4 files changed, 261 insertions(+), 99 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index c249d90..49a1b6a 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -464,31 +464,47 @@ pub async fn handle_request( Some(conn) => { let result = if let Some(peer_cid) = peer_cid { if let Some(peer_remote) = conn.peers.get(&peer_cid) { + let peer_remote = &peer_remote.remote.clone(); + drop(lock); let scan_for_status = |status| async move { match status { NodeResult::ObjectTransferHandle(ObjectTransferHandle { - ticket: _ticket, - mut handle, - }) => { - let target_cid = handle.receiver; - if target_cid != cid { - // Route the signal to the intra-kernel user - if target_cid == peer_cid { - let lock = server_connection_map.lock().await; - if let Some(conn) = lock.get(&peer_cid) { - let peer_uuid = conn.associated_tcp_connection; + ticket: _ticket, + mut handle, + }) => { + let original_target_cid = handle.receiver; + let original_source_cid = handle.source; + let handle_metadata = handle.metadata.clone(); + + if let ObjectTransferOrientation::Receiver { + is_revfs_pull: _, + } = handle.orientation + { + if original_target_cid == peer_cid { + let mut lock = server_connection_map.lock().await; + // Route the signal to the intra-kernel user if possible + if let Some(peer) = + lock.get_mut(&original_target_cid) + { + let peer_uuid = peer.associated_tcp_connection; send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::FileTransferRequestNotification( FileTransferRequestNotification { - cid, - peer_cid, - metadata: handle.metadata, + cid: original_target_cid, + peer_cid: original_source_cid, + metadata: handle_metadata.clone(), }, ), peer_uuid, ) - .await + .await; + peer.add_object_transfer_handler( + cid, + handle_metadata.object_id, + Some(handle), + ); + return Ok(Some(())); } } return Ok(None); @@ -513,7 +529,6 @@ pub async fn handle_request( } }; peer_remote - .remote .send_file_with_custom_opts_and_with_fn( source, chunk_size.unwrap_or_default(), @@ -525,11 +540,13 @@ pub async fn handle_request( Err(NetworkError::msg("Peer Connection Not Found")) } } else { - conn.client_server_remote.send_file_with_custom_opts( - source, - chunk_size.unwrap_or_default(), - transfer_type, - ).await + conn.client_server_remote + .send_file_with_custom_opts( + source, + chunk_size.unwrap_or_default(), + transfer_type, + ) + .await }; match result { diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 99f9c45..46b8748 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -2,8 +2,8 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_connector::util::{InternalServiceConnector, WrappedSink}; use citadel_internal_service_types::{ - InternalServiceRequest, InternalServiceResponse, PeerConnectNotification, PeerConnectSuccess, - PeerRegisterNotification, PeerRegisterSuccess, + FileTransferTickNotification, InternalServiceRequest, InternalServiceResponse, + PeerConnectNotification, PeerConnectSuccess, PeerRegisterNotification, PeerRegisterSuccess, }; use citadel_logging::info; use citadel_sdk::prefabs::server::client_connect_listener::ClientConnectListenerKernel; @@ -549,6 +549,80 @@ pub fn server_info_file_transfer<'a>( (server, bind_addr) } +pub async fn exhaust_stream_to_file_completion( + cmp_path: PathBuf, + svc: &mut UnboundedReceiver, +) { + // Exhaust the stream for the receiver + let mut path = None; + let mut is_revfs = false; + loop { + let tick_response = svc.recv().await.unwrap(); + match tick_response { + InternalServiceResponse::FileTransferTickNotification( + FileTransferTickNotification { + cid: _, + peer_cid: _, + status, + }, + ) => match status { + ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { + path = Some(file_path); + is_revfs = matches!( + vfm.transfer_type, + TransferType::RemoteEncryptedVirtualFilesystem { .. } + ); + info!(target: "citadel", "File Transfer (Receiving) Beginning"); + assert_eq!(vfm.name, "test.txt") + } + ObjectTransferStatus::ReceptionTick(..) => { + info!(target: "citadel", "File Transfer (Receiving) Tick"); + } + ObjectTransferStatus::ReceptionComplete => { + info!(target: "citadel", "File Transfer (Receiving) Completed"); + let cmp_data = tokio::fs::read(cmp_path.clone()).await.unwrap(); + let streamed_data = tokio::fs::read( + path.clone() + .expect("Never received the ReceptionBeginning tick!"), + ) + .await + .unwrap(); + if is_revfs { + // The locally stored contents should NEVER be the same as the plaintext for REVFS + assert_ne!( + cmp_data.as_slice(), + streamed_data.as_slice(), + "Original data and streamed data does not match" + ); + } else { + assert_eq!( + cmp_data.as_slice(), + streamed_data.as_slice(), + "Original data and streamed data does not match" + ); + } + + return; + } + ObjectTransferStatus::TransferComplete => { + info!(target: "citadel", "File Transfer (Sending) Completed"); + return; + } + ObjectTransferStatus::TransferBeginning => { + info!(target: "citadel", "File Transfer (Sending) Beginning"); + } + ObjectTransferStatus::TransferTick(..) => {} + _ => { + panic!("File Send Reception Status Yielded Unexpected Response") + } + }, + unexpected_response => { + citadel_logging::warn!(target: "citadel", "Unexpected signal {unexpected_response:?}") + } + } + } +} + pub async fn test_kv_for_service( to_service: &UnboundedSender, from_service: &mut UnboundedReceiver, diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index aaf74b7..5d1c19b 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -3,8 +3,9 @@ mod common; #[cfg(test)] mod tests { use crate::common::{ - register_and_connect_to_server, register_and_connect_to_server_then_peers, - server_info_file_transfer, RegisterAndConnectItems, + exhaust_stream_to_file_completion, register_and_connect_to_server, + register_and_connect_to_server_then_peers, server_info_file_transfer, + RegisterAndConnectItems, }; use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_types::{ @@ -415,78 +416,4 @@ mod tests { Ok(()) } - - async fn exhaust_stream_to_file_completion( - cmp_path: PathBuf, - svc: &mut UnboundedReceiver, - ) { - // Exhaust the stream for the receiver - let mut path = None; - let mut is_revfs = false; - loop { - let tick_response = svc.recv().await.unwrap(); - match tick_response { - InternalServiceResponse::FileTransferTickNotification( - FileTransferTickNotification { - cid: _, - peer_cid: _, - status, - }, - ) => match status { - ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { - path = Some(file_path); - is_revfs = matches!( - vfm.transfer_type, - TransferType::RemoteEncryptedVirtualFilesystem { .. } - ); - info!(target: "citadel", "File Transfer (Receiving) Beginning"); - assert_eq!(vfm.name, "test.txt") - } - ObjectTransferStatus::ReceptionTick(..) => { - info!(target: "citadel", "File Transfer (Receiving) Tick"); - } - ObjectTransferStatus::ReceptionComplete => { - info!(target: "citadel", "File Transfer (Receiving) Completed"); - let cmp_data = tokio::fs::read(cmp_path.clone()).await.unwrap(); - let streamed_data = tokio::fs::read( - path.clone() - .expect("Never received the ReceptionBeginning tick!"), - ) - .await - .unwrap(); - if is_revfs { - // The locally stored contents should NEVER be the same as the plaintext for REVFS - assert_ne!( - cmp_data.as_slice(), - streamed_data.as_slice(), - "Original data and streamed data does not match" - ); - } else { - assert_eq!( - cmp_data.as_slice(), - streamed_data.as_slice(), - "Original data and streamed data does not match" - ); - } - - return; - } - ObjectTransferStatus::TransferComplete => { - info!(target: "citadel", "File Transfer (Sending) Completed"); - return; - } - ObjectTransferStatus::TransferBeginning => { - info!(target: "citadel", "File Transfer (Sending) Beginning"); - } - ObjectTransferStatus::TransferTick(..) => {} - _ => { - panic!("File Send Reception Status Yielded Unexpected Response") - } - }, - unexpected_response => { - citadel_logging::warn!(target: "citadel", "Unexpected signal {unexpected_response:?}") - } - } - } - } } diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index c1275d6..3517323 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -3,10 +3,17 @@ mod common; #[cfg(test)] mod tests { use crate::common::{ - register_and_connect_to_server, server_info_skip_cert_verification, RegisterAndConnectItems, + exhaust_stream_to_file_completion, register_and_connect_to_server, + server_info_skip_cert_verification, RegisterAndConnectItems, }; use citadel_internal_service::kernel::CitadelWorkspaceService; + use citadel_internal_service_types::{ + FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, + InternalServiceResponse, + }; use citadel_sdk::prelude::*; + use std::path::PathBuf; + use uuid::Uuid; #[tokio::test] async fn test_2_peers_1_service() -> Result<(), Box> { @@ -73,4 +80,141 @@ mod tests { .await?; Ok(()) } + + #[tokio::test] + async fn test_loopback_send_file() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + + let file_to_send = PathBuf::from("../resources/test.txt"); + + let send_file_to_service_1_payload = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: file_to_send, + cid: peer_0_cid, + transfer_type: TransferType::FileTransfer, + peer_cid: Some(peer_1_cid), + chunk_size: None, + }; + peer_0_tx.send(send_file_to_service_1_payload).unwrap(); + citadel_logging::info!(target:"citadel", "File Transfer Request Sent from {peer_0_cid:?}"); + + citadel_logging::info!(target:"citadel", "File Transfer Request Sent Successfully {peer_0_cid:?}"); + let deserialized_service_1_payload_response = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { metadata, .. }, + ) = deserialized_service_1_payload_response + { + citadel_logging::info!(target:"citadel", "File Transfer Request {peer_1_cid:?}"); + + let file_transfer_accept = InternalServiceRequest::RespondFileTransfer { + cid: peer_1_cid, + peer_cid: peer_0_cid, + object_id: metadata.object_id as _, + accept: true, + download_location: None, + request_id: Uuid::new_v4(), + }; + peer_1_tx.send(file_transfer_accept).unwrap(); + citadel_logging::info!(target:"citadel", "Accepted File Transfer {peer_1_cid:?}"); + + let file_transfer_accept = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferStatusNotification( + FileTransferStatusNotification { + cid: _, + object_id: _, + success, + response, + message: _, + request_id: _, + }, + ) = file_transfer_accept + { + if success && response { + citadel_logging::info!(target:"citadel", "File Transfer Accept Success {peer_1_cid:?}"); + // continue to status ticks + } else { + panic!("Service 1 Accept Response Failure - Success: {success:?} Response {response:?}") + } + } else { + panic!("Unhandled Service 1 response") + } + + // Exhaust the stream for the receiver + exhaust_stream_to_file_completion( + PathBuf::from("../resources/test.txt"), + &mut peer_1_rx, + ) + .await; + // Exhaust the stream for the sender + exhaust_stream_to_file_completion( + PathBuf::from("../resources/test.txt"), + &mut peer_0_rx, + ) + .await; + } else { + panic!("File Transfer P2P Failure"); + }; + + Ok(()) + } } From 34877e74aa1a943c225a730e3e14d9cddcfac58d Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Wed, 6 Mar 2024 23:14:15 -0600 Subject: [PATCH 08/17] Intra-Kernel Standard File Transfer Fix --- .../src/kernel/request_handler.rs | 33 ++++++++++++------- citadel-internal-service/tests/common/mod.rs | 1 + .../tests/file_transfer.rs | 2 ++ citadel-internal-service/tests/loopback.rs | 2 +- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 49a1b6a..a7abb3c 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -31,7 +31,7 @@ pub async fn handle_request( Ok(peers) => { let mut accounts = HashMap::new(); for peer in &peers { - // TOOD: Do not unwrap below + // TODO: Do not unwrap below let peer_username = remote .find_target(cid, peer.cid) .await @@ -470,13 +470,13 @@ pub async fn handle_request( match status { NodeResult::ObjectTransferHandle(ObjectTransferHandle { ticket: _ticket, - mut handle, + handle, }) => { let original_target_cid = handle.receiver; let original_source_cid = handle.source; let handle_metadata = handle.metadata.clone(); - if let ObjectTransferOrientation::Receiver { + return if let ObjectTransferOrientation::Receiver { is_revfs_pull: _, } = handle.orientation { @@ -507,17 +507,26 @@ pub async fn handle_request( return Ok(Some(())); } } - return Ok(None); + Ok(None) } else { - while let Some(res) = handle.next().await { - log::trace!(target: "citadel", "Client received RES {:?}", res); - if let ObjectTransferStatus::TransferComplete = res - { - return Ok(Some(())); - } + let mut server_connection_map = + server_connection_map.lock().await; + if let Some(_conn) = + server_connection_map.get_mut(&original_target_cid) + { + // The Sender is a local user, so we start the tick updater + spawn_tick_updater( + handle, + original_target_cid, + original_source_cid, + &mut server_connection_map, + tcp_connection_map.clone(), + ); + Ok(Some(())) + } else { + Ok(None) } - } - Err(NetworkError::msg("Failed To Receive TransferComplete Response During FileTransfer")) + }; } res => { diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 46b8748..8cda5a1 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -194,6 +194,7 @@ pub async fn register_and_connect_to_server_then_peers( info!(target: "citadel", "Internal Service Spawning"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel) diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index 5d1c19b..a000534 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -51,6 +51,7 @@ mod tests { info!(target: "citadel", "sub server spawn"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel)?; @@ -198,6 +199,7 @@ mod tests { info!(target: "citadel", "sub server spawn"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel)?; diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index 3517323..532b66b 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -92,7 +92,7 @@ mod tests { let service = CitadelWorkspaceService::new(service_addr); let internal_service = NodeBuilder::default() - .with_backend(BackendType::InMemory) + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(service)?; From 037e1a51bfe165afb3806f6bbee7850b43b4c50c Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 10 Mar 2024 21:33:18 -0500 Subject: [PATCH 09/17] Intra-Kernel REVFS Test and Download File WIP --- .../src/kernel/request_handler.rs | 82 +++++++-- citadel-internal-service/tests/loopback.rs | 168 +++++++++++++++++- 2 files changed, 230 insertions(+), 20 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index a7abb3c..03c9948 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -12,6 +12,7 @@ use citadel_sdk::prelude::results::PeerRegisterStatus; use citadel_sdk::prelude::*; use futures::StreamExt; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; @@ -685,31 +686,80 @@ pub async fn handle_request( request_id, } => { let security_level = security_level.unwrap_or_default(); + let scan_for_status = |status| async move { + match status { + NodeResult::ObjectTransferHandle(ObjectTransferHandle { + ticket: _ticket, + handle, + }) => { + let (implicated_cid, peer_cid) = + if let ObjectTransferOrientation::Receiver { + is_revfs_pull: true, + } = handle.orientation + { + (handle.receiver, handle.source) + } else { + (handle.source, handle.receiver) + }; + + // Both the Sender and Receiver should automatically Send/Receive here + let mut server_connection_map = server_connection_map.lock().await; + if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { + // The Receiver is a local user, so we start the tick updater + spawn_tick_updater( + handle, + implicated_cid, + peer_cid, + &mut server_connection_map, + tcp_connection_map.clone(), + ); + // We have to return Some, but we don't care what it is in this case + Ok(Some(PathBuf::default())) + } else { + Ok(None) + } + } + + res => { + log::error!(target: "citadel", "Invalid NodeResult for REVFS DownloadFile request received: {:?}", res); + Err(NetworkError::msg( + "Invalid Response Received During REVFS Pull", + )) + } + } + }; match server_connection_map.lock().await.get_mut(&cid) { Some(conn) => { let result = if let Some(peer_cid) = peer_cid { if let Some(peer_remote) = conn.peers.get_mut(&peer_cid) { - let request = NodeRequest::PullObject(PullObject { - v_conn: *peer_remote.remote.user(), - virtual_dir: virtual_directory, - delete_on_pull, - transfer_security_level: security_level, - }); - - peer_remote.remote.send(request).await + let current_security_settings = + peer_remote.remote.session_security_settings(); + info!(target: "citadel","Current Security Settings: {current_security_settings:?}"); + peer_remote + .remote + .remote_encrypted_virtual_filesystem_pull_with_fn( + virtual_directory, + security_level, + delete_on_pull, + scan_for_status, + ) + .await } else { Err(NetworkError::msg("Peer Connection Not Found")) } } else { - let request = NodeRequest::PullObject(PullObject { - v_conn: *conn.client_server_remote.user(), - virtual_dir: virtual_directory, - delete_on_pull, - transfer_security_level: security_level, - }); - - conn.client_server_remote.send(request).await + let current_security_settings = + conn.client_server_remote.session_security_settings(); + info!(target: "citadel","Current Security Settings: {current_security_settings:?}"); + conn.client_server_remote + .remote_encrypted_virtual_filesystem_pull_with_fn( + virtual_directory, + security_level, + delete_on_pull, + scan_for_status, + ) + .await }; match result { diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index 532b66b..ed39f00 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -8,15 +8,16 @@ mod tests { }; use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_types::{ - FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, - InternalServiceResponse, + DeleteVirtualFileSuccess, DownloadFileSuccess, FileTransferRequestNotification, + FileTransferStatusNotification, InternalServiceRequest, InternalServiceResponse, + SendFileRequestSuccess, }; use citadel_sdk::prelude::*; use std::path::PathBuf; use uuid::Uuid; #[tokio::test] - async fn test_2_peers_1_service() -> Result<(), Box> { + async fn test_intra_kernel_service_and_peers() -> Result<(), Box> { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); @@ -82,7 +83,7 @@ mod tests { } #[tokio::test] - async fn test_loopback_send_file() -> Result<(), Box> { + async fn test_intra_kernel_send_file() -> Result<(), Box> { crate::common::setup_log(); let (server, server_bind_address) = server_info_skip_cert_verification(); @@ -217,4 +218,163 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_intra_kernel_revfs() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettingsBuilder::default().build().unwrap(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettingsBuilder::default().build().unwrap(), //SessionSecuritySettings::default(), + ) + .await?; + + // Push file to REVFS on peer + let file_to_send = PathBuf::from("../resources/test.txt"); + let virtual_path = PathBuf::from("/vfs/virtual_test.txt"); + let send_file_peer_1_tx_payload = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: file_to_send.clone(), + cid: peer_0_cid, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.clone(), + security_level: Default::default(), + }, + peer_cid: Some(peer_1_cid), + chunk_size: None, + }; + peer_0_tx.send(send_file_peer_1_tx_payload).unwrap(); + let deserialized_service_a_payload_response = peer_0_rx.recv().await.unwrap(); + citadel_logging::info!(target: "citadel","{deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::SendFileRequestSuccess(SendFileRequestSuccess { .. }) = + &deserialized_service_a_payload_response + { + citadel_logging::info!(target:"citadel", "File Transfer Request {peer_1_cid}"); + let deserialized_service_a_payload_response = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { metadata, .. }, + ) = deserialized_service_a_payload_response + { + let file_transfer_accept_payload = InternalServiceRequest::RespondFileTransfer { + cid: peer_1_cid, + peer_cid: peer_0_cid, + object_id: metadata.object_id, + accept: true, + download_location: None, + request_id: Uuid::new_v4(), + }; + peer_1_tx.send(file_transfer_accept_payload).unwrap(); + citadel_logging::info!(target:"citadel", "Accepted File Transfer {peer_1_cid}"); + } else { + panic!("File Transfer P2P Failure"); + } + } else { + panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); + } + + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_1_rx).await; + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_0_rx).await; + + // Download P2P REVFS file - without delete on pull + let download_file_command = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_path.clone(), + security_level: Default::default(), + delete_on_pull: false, + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + request_id: Uuid::new_v4(), + }; + peer_0_tx.send(download_file_command).unwrap(); + let download_file_response = peer_0_rx.recv().await.unwrap(); + match download_file_response { + InternalServiceResponse::DownloadFileSuccess(DownloadFileSuccess { + cid: response_cid, + request_id: _, + }) => { + assert_eq!(peer_0_cid, response_cid); + } + _ => { + panic!("Didn't get the REVFS DownloadFileSuccess - instead got {download_file_response:?}"); + } + } + + // Delete file on Peer REVFS + let delete_file_command = InternalServiceRequest::DeleteVirtualFile { + virtual_directory: virtual_path, + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + request_id: Uuid::new_v4(), + }; + peer_0_tx.send(delete_file_command).unwrap(); + let delete_file_response = peer_0_rx.recv().await.unwrap(); + match delete_file_response { + InternalServiceResponse::DeleteVirtualFileSuccess(DeleteVirtualFileSuccess { + cid: response_cid, + request_id: _, + }) => { + assert_eq!(peer_0_cid, response_cid); + } + _ => { + panic!("Didn't get the REVFS DeleteVirtualFileSuccess - instead got {delete_file_response:?}"); + } + } + citadel_logging::info!(target: "citadel","{delete_file_response:?}"); + + Ok(()) + } } From 462551e4a64457743dbb41497d991dd567d17365 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Wed, 13 Mar 2024 00:21:42 -0500 Subject: [PATCH 10/17] Peer Remote and File Transfer Fix WIP --- citadel-internal-service-types/src/lib.rs | 2 +- citadel-internal-service/src/kernel/mod.rs | 13 +-- .../src/kernel/request_handler.rs | 94 +++++++++++++------ citadel-internal-service/tests/common/mod.rs | 8 +- citadel-internal-service/tests/loopback.rs | 10 +- 5 files changed, 85 insertions(+), 42 deletions(-) diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 65f5e33..8ad44ee 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -558,7 +558,7 @@ pub struct FileTransferStatusNotification { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FileTransferTickNotification { pub cid: u64, - pub peer_cid: u64, + pub peer_cid: Option, pub status: ObjectTransferStatus, } diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index a2fbab4..7e69000 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -4,6 +4,7 @@ use citadel_internal_service_connector::util::wrap_tcp_conn; use citadel_internal_service_types::*; use citadel_logging::{error, info, warn}; use citadel_sdk::prefabs::ClientServerRemote; +use citadel_sdk::prelude::remote_specialization::PeerRemote; use citadel_sdk::prelude::VirtualTargetType; use citadel_sdk::prelude::*; use futures::stream::{SplitSink, StreamExt}; @@ -50,7 +51,7 @@ pub struct Connection { #[allow(dead_code)] struct PeerConnection { sink: PeerChannelSendHalf, - remote: SymmetricIdentifierHandle, + remote: PeerRemote, handler_map: HashMap>, associated_tcp_connection: Uuid, } @@ -82,7 +83,7 @@ impl Connection { &mut self, peer_cid: u64, sink: PeerChannelSendHalf, - remote: SymmetricIdentifierHandle, + remote: PeerRemote, ) { self.peers.insert( peer_cid, @@ -323,7 +324,7 @@ impl NetKernel for CitadelWorkspaceService { spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, self.tcp_connection_map.clone(), ); @@ -353,7 +354,7 @@ impl NetKernel for CitadelWorkspaceService { spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, self.tcp_connection_map.clone(), ); @@ -811,7 +812,7 @@ async fn handle_group_broadcast( fn spawn_tick_updater( object_transfer_handler: ObjectTransferHandler, implicated_cid: u64, - peer_cid: u64, + peer_cid: Option, server_connection_map: &mut HashMap, tcp_connection_map: Arc>>>, ) { @@ -832,7 +833,7 @@ fn spawn_tick_updater( ); match entry.send(message.clone()) { Ok(_res) => { - info!(target: "citadel", "File Transfer Status Tick Sent"); + info!(target: "citadel", "File Transfer Status Tick Sent {status:?}"); } Err(err) => { warn!(target: "citadel", "File Transfer Status Tick Not Sent: {err:?}"); diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 03c9948..b88d99c 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -12,7 +12,6 @@ use citadel_sdk::prelude::results::PeerRegisterStatus; use citadel_sdk::prelude::*; use futures::StreamExt; use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; @@ -519,7 +518,7 @@ pub async fn handle_request( spawn_tick_updater( handle, original_target_cid, - original_source_cid, + Some(original_source_cid), &mut server_connection_map, tcp_connection_map.clone(), ); @@ -550,11 +549,37 @@ pub async fn handle_request( Err(NetworkError::msg("Peer Connection Not Found")) } } else { + let send_file_c2s = |status| async move { + match status { + NodeResult::ObjectTransferHandle(ObjectTransferHandle { + ticket: _ticket, + handle, + }) => { + let mut server_connection_map = + server_connection_map.lock().await; + spawn_tick_updater( + handle, + cid, + None, + &mut server_connection_map, + tcp_connection_map.clone(), + ); + Ok(Some(())) + } + res => { + log::error!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {:?}", res); + Err(NetworkError::msg( + "Invalid Response Received During FileTransfer", + )) + } + } + }; conn.client_server_remote - .send_file_with_custom_opts( + .send_file_with_custom_opts_and_with_fn( source, chunk_size.unwrap_or_default(), transfer_type, + send_file_c2s, ) .await }; @@ -626,7 +651,7 @@ pub async fn handle_request( spawn_tick_updater( owned_handler, cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, tcp_connection_map.clone(), ); @@ -690,32 +715,41 @@ pub async fn handle_request( match status { NodeResult::ObjectTransferHandle(ObjectTransferHandle { ticket: _ticket, - handle, + mut handle, }) => { - let (implicated_cid, peer_cid) = + let (implicated_cid, _peer_cid) = if let ObjectTransferOrientation::Receiver { is_revfs_pull: true, } = handle.orientation { + info!(target: "citadel", "Download Orientation: Receiver"); (handle.receiver, handle.source) } else { + info!(target: "citadel", "Download Orientation: Sender"); (handle.source, handle.receiver) }; - // Both the Sender and Receiver should automatically Send/Receive here let mut server_connection_map = server_connection_map.lock().await; if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { - // The Receiver is a local user, so we start the tick updater - spawn_tick_updater( - handle, - implicated_cid, - peer_cid, - &mut server_connection_map, - tcp_connection_map.clone(), - ); - // We have to return Some, but we don't care what it is in this case - Ok(Some(PathBuf::default())) + let mut local_path = None; + while let Some(res) = handle.next().await { + match res { + ObjectTransferStatus::ReceptionBeginning(path, _) => { + local_path = Some(path) + } + ObjectTransferStatus::TransferComplete => { + break; + } + _ => {} + } + } + if local_path.is_some() { + Ok(local_path) + } else { + Err(NetworkError::InternalError("Local path never loaded")) + } } else { + info!(target: "citadel", "Download File - Returning None"); Ok(None) } } @@ -729,15 +763,14 @@ pub async fn handle_request( } }; - match server_connection_map.lock().await.get_mut(&cid) { + let mut lock = server_connection_map.lock().await; + match lock.get_mut(&cid) { Some(conn) => { let result = if let Some(peer_cid) = peer_cid { if let Some(peer_remote) = conn.peers.get_mut(&peer_cid) { - let current_security_settings = - peer_remote.remote.session_security_settings(); - info!(target: "citadel","Current Security Settings: {current_security_settings:?}"); + let peer_remote = peer_remote.remote.clone(); + drop(lock); peer_remote - .remote .remote_encrypted_virtual_filesystem_pull_with_fn( virtual_directory, security_level, @@ -749,10 +782,9 @@ pub async fn handle_request( Err(NetworkError::msg("Peer Connection Not Found")) } } else { - let current_security_settings = - conn.client_server_remote.session_security_settings(); - info!(target: "citadel","Current Security Settings: {current_security_settings:?}"); - conn.client_server_remote + let client_server_remote = conn.client_server_remote.clone(); + drop(lock); + client_server_remote .remote_encrypted_virtual_filesystem_pull_with_fn( virtual_directory, security_level, @@ -1165,7 +1197,7 @@ pub async fn handle_request( .add_peer_connection( peer_cid, sink, - symmetric_identifier_handle.clone(), + peer_connect_success.remote, //symmetric_identifier_handle.clone(), ); let hm_for_conn = tcp_connection_map.clone(); @@ -1270,7 +1302,7 @@ pub async fn handle_request( None => { // TODO: handle none case } - Some(target_peer) => match target_peer.remote.send(request).await { + Some(target_peer) => match target_peer.remote.inner.send(request).await { Ok(ticket) => { conn.clear_peer_connection(peer_cid); let peer_disconnect_success = @@ -2017,7 +2049,7 @@ pub async fn handle_request( command: GroupBroadcast::ListGroupsFor { cid: peer_cid }, }); if let Ok(mut subscription) = - peer_remote.send_callback_subscription(request).await + peer_remote.inner.send_callback_subscription(request).await { if let Some(evt) = subscription.next().await { if let NodeResult::GroupEvent(GroupEvent { @@ -2136,7 +2168,7 @@ pub async fn handle_request( if let Some(connection) = server_connection_map.get_mut(&cid) { if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { let peer_remote = peer_connection.remote.clone(); - match peer_remote.send_callback_subscription(request).await { + match peer_remote.inner.send_callback_subscription(request).await { Ok(mut subscription) => { let mut result = false; if invitation { @@ -2276,7 +2308,7 @@ pub async fn handle_request( implicated_cid: cid, command: group_request, }); - match peer_remote.send_callback_subscription(request).await { + match peer_remote.inner.send_callback_subscription(request).await { Ok(mut subscription) => { let mut result = Err("Group Request Join Failed".to_string()); while let Some(evt) = subscription.next().await { diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index 8cda5a1..48dfcd9 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -557,6 +557,12 @@ pub async fn exhaust_stream_to_file_completion( // Exhaust the stream for the receiver let mut path = None; let mut is_revfs = false; + let cmp_file_name = cmp_path + .file_name() + .unwrap() + .to_os_string() + .into_string() + .unwrap(); loop { let tick_response = svc.recv().await.unwrap(); match tick_response { @@ -574,7 +580,7 @@ pub async fn exhaust_stream_to_file_completion( TransferType::RemoteEncryptedVirtualFilesystem { .. } ); info!(target: "citadel", "File Transfer (Receiving) Beginning"); - assert_eq!(vfm.name, "test.txt") + assert_eq!(vfm.name, cmp_file_name) } ObjectTransferStatus::ReceptionTick(..) => { info!(target: "citadel", "File Transfer (Receiving) Tick"); diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index ed39f00..f7d69b4 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -268,7 +268,7 @@ mod tests { &mut peer_1_tx, &mut peer_1_rx, peer_1_cid, - SessionSecuritySettingsBuilder::default().build().unwrap(), + SessionSecuritySettings::default(), ) .await?; citadel_logging::info!(target: "citadel", "P2P Register complete"); @@ -279,13 +279,13 @@ mod tests { &mut peer_1_tx, &mut peer_1_rx, peer_1_cid, - SessionSecuritySettingsBuilder::default().build().unwrap(), //SessionSecuritySettings::default(), + SessionSecuritySettings::default(), ) .await?; // Push file to REVFS on peer let file_to_send = PathBuf::from("../resources/test.txt"); - let virtual_path = PathBuf::from("/vfs/virtual_test.txt"); + let virtual_path = PathBuf::from("/vfs/test.txt"); let send_file_peer_1_tx_payload = InternalServiceRequest::SendFile { request_id: Uuid::new_v4(), source: file_to_send.clone(), @@ -330,6 +330,8 @@ mod tests { exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_1_rx).await; exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_0_rx).await; + citadel_logging::info!(target: "citadel", "Peer 0 Requesting to Download File"); + // Download P2P REVFS file - without delete on pull let download_file_command = InternalServiceRequest::DownloadFile { virtual_directory: virtual_path.clone(), @@ -353,6 +355,8 @@ mod tests { } } + citadel_logging::info!(target: "citadel", "Peer 0 Requesting to Delete File"); + // Delete file on Peer REVFS let delete_file_command = InternalServiceRequest::DeleteVirtualFile { virtual_directory: virtual_path, From f5d0ea860863f6f2acb5f9b9eaabab18f57585e1 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Fri, 15 Mar 2024 00:45:31 -0500 Subject: [PATCH 11/17] File Transfer Debugging --- .../src/kernel/request_handler.rs | 118 ++++++++++++------ .../tests/file_transfer.rs | 55 ++++---- 2 files changed, 113 insertions(+), 60 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index b88d99c..d4cf1f2 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -12,6 +12,7 @@ use citadel_sdk::prelude::results::PeerRegisterStatus; use citadel_sdk::prelude::*; use futures::StreamExt; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; @@ -462,11 +463,15 @@ pub async fn handle_request( let lock = server_connection_map.lock().await; match lock.get(&cid) { Some(conn) => { + info!(target: "citadel", "Send File Server Connection Exists"); let result = if let Some(peer_cid) = peer_cid { + info!(target: "citadel", "Send File Peer Version"); if let Some(peer_remote) = conn.peers.get(&peer_cid) { + info!(target: "citadel", "Send File Peer Remote Exists"); let peer_remote = &peer_remote.remote.clone(); drop(lock); let scan_for_status = |status| async move { + info!(target: "citadel", "Send File Scan For Status Entered"); match status { NodeResult::ObjectTransferHandle(ObjectTransferHandle { ticket: _ticket, @@ -480,12 +485,15 @@ pub async fn handle_request( is_revfs_pull: _, } = handle.orientation { + info!(target: "citadel", "Send File Receiver Entered"); if original_target_cid == peer_cid { + info!(target: "citadel", "Send File Receiver is targeted at Peer CID"); let mut lock = server_connection_map.lock().await; // Route the signal to the intra-kernel user if possible if let Some(peer) = lock.get_mut(&original_target_cid) { + info!(target: "citadel", "Send File Receiver - Target is local, rerouting"); let peer_uuid = peer.associated_tcp_connection; send_response_to_tcp_client( tcp_connection_map, @@ -509,12 +517,15 @@ pub async fn handle_request( } Ok(None) } else { + info!(target: "citadel", "Sender should be spawning tick updater on Send File"); + info!(target: "citadel", "Sender Source CID: {original_source_cid:?} and Actual CID: {cid:?}"); let mut server_connection_map = server_connection_map.lock().await; if let Some(_conn) = server_connection_map.get_mut(&original_target_cid) { // The Sender is a local user, so we start the tick updater + info!(target: "citadel", "Spawning Tick Updater For Send file Target {original_target_cid:?} who is CID {cid:?}"); spawn_tick_updater( handle, original_target_cid, @@ -522,10 +533,9 @@ pub async fn handle_request( &mut server_connection_map, tcp_connection_map.clone(), ); - Ok(Some(())) - } else { - Ok(None) + return Ok(Some(())); } + Ok(None) }; } @@ -537,6 +547,7 @@ pub async fn handle_request( } } }; + info!(target: "citadel", "Send File Remote Method Call"); peer_remote .send_file_with_custom_opts_and_with_fn( source, @@ -574,7 +585,9 @@ pub async fn handle_request( } } }; - conn.client_server_remote + let client_server_remote = conn.client_server_remote.clone(); + drop(lock); + client_server_remote .send_file_with_custom_opts_and_with_fn( source, chunk_size.unwrap_or_default(), @@ -717,40 +730,73 @@ pub async fn handle_request( ticket: _ticket, mut handle, }) => { - let (implicated_cid, _peer_cid) = - if let ObjectTransferOrientation::Receiver { - is_revfs_pull: true, - } = handle.orientation - { - info!(target: "citadel", "Download Orientation: Receiver"); - (handle.receiver, handle.source) - } else { - info!(target: "citadel", "Download Orientation: Sender"); - (handle.source, handle.receiver) - }; - - let mut server_connection_map = server_connection_map.lock().await; - if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { - let mut local_path = None; - while let Some(res) = handle.next().await { - match res { - ObjectTransferStatus::ReceptionBeginning(path, _) => { - local_path = Some(path) - } - ObjectTransferStatus::TransferComplete => { - break; - } - _ => {} - } - } - if local_path.is_some() { - Ok(local_path) + if let Some(peer_cid) = peer_cid { + // P2P REVFS Pull - Reroute if needed, otherwise spawn tick updater + info!(target: "citadel", "Download Source: {:?} Receiver: {:?}", handle.source, handle.receiver); + info!(target: "citadel", "Download CID: {cid:?} Peer: {peer_cid:?}"); + let (implicated_cid, peer_cid) = + if let ObjectTransferOrientation::Receiver { + is_revfs_pull: true, + } = handle.orientation + { + info!(target: "citadel", "Download Orientation: Receiver"); + (handle.receiver, handle.source) + } else { + info!(target: "citadel", "Download Orientation: Sender"); + (handle.receiver, handle.source) + }; + let mut server_connection_map = server_connection_map.lock().await; + if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { + info!(target: "citadel", "Download Peer - Spawning Tick Updater as Receiver"); + // let mut local_path = None; + // while let Some(res) = handle.next().await { + // match res { + // ObjectTransferStatus::ReceptionBeginning(path, _) => { + // info!(target: "citadel", "Received ReceptionBeginning Status"); + // local_path = Some(path) + // } + // ObjectTransferStatus::ReceptionComplete => { + // info!(target: "citadel", "Received ReceptionComplete Status"); + // break; + // } + // ObjectTransferStatus::TransferComplete => { + // info!(target: "citadel", "Received TransferComplete Status"); + // break; + // } + // _ => { + // info!(target: "citadel", "Received {res:?} Status"); + // } + // } + // } + spawn_tick_updater( + handle, + implicated_cid, + Some(peer_cid), + &mut server_connection_map, + tcp_connection_map.clone(), + ); + // if local_path.is_some() { + // Ok(local_path) + // } else { + // Err(NetworkError::InternalError("Local path never loaded")) + // } + Ok(Some(PathBuf::default())) } else { - Err(NetworkError::InternalError("Local path never loaded")) + info!(target: "citadel", "Download File - Returning None"); + Ok(None) } } else { - info!(target: "citadel", "Download File - Returning None"); - Ok(None) + // C2S REVFS Pull - We can just start a tick updater + info!(target: "citadel", "Download C2S Receiver Starting Tick Updater"); + let mut server_connection_map = server_connection_map.lock().await; + spawn_tick_updater( + handle, + cid, + None, + &mut server_connection_map, + tcp_connection_map.clone(), + ); + Ok(Some(PathBuf::default())) } } @@ -794,6 +840,8 @@ pub async fn handle_request( .await }; + info!(target: "citadel", "Successfully Finished call to REVFS Pull Method"); + match result { Ok(_) => { send_response_to_tcp_client( diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index a000534..9c0deff 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -326,7 +326,7 @@ mod tests { // Push file to REVFS on peer let file_to_send = PathBuf::from("../resources/test.txt"); - let virtual_path = PathBuf::from("/vfs/virtual_test.txt"); + let virtual_path = PathBuf::from("/vfs/test.txt"); let send_file_to_service_b_payload = InternalServiceRequest::SendFile { request_id: Uuid::new_v4(), source: file_to_send.clone(), @@ -339,35 +339,37 @@ mod tests { chunk_size: None, }; to_service_a.send(send_file_to_service_b_payload).unwrap(); - let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); - info!(target: "citadel","{deserialized_service_a_payload_response:?}"); - if let InternalServiceResponse::SendFileRequestSuccess(SendFileRequestSuccess { .. }) = - &deserialized_service_a_payload_response + info!(target:"citadel", "File Transfer Request {cid_b}"); + let deserialized_service_a_payload_response = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { metadata, .. }, + ) = deserialized_service_a_payload_response { - info!(target:"citadel", "File Transfer Request {cid_b}"); - let deserialized_service_a_payload_response = from_service_b.recv().await.unwrap(); - if let InternalServiceResponse::FileTransferRequestNotification( - FileTransferRequestNotification { metadata, .. }, - ) = deserialized_service_a_payload_response - { - let file_transfer_accept_payload = InternalServiceRequest::RespondFileTransfer { - cid: *cid_b, - peer_cid: *cid_a, - object_id: metadata.object_id, - accept: true, - download_location: None, - request_id: Uuid::new_v4(), - }; - to_service_b.send(file_transfer_accept_payload).unwrap(); - info!(target:"citadel", "Accepted File Transfer {cid_b}"); - } else { - panic!("File Transfer P2P Failure"); - } + let file_transfer_accept_payload = InternalServiceRequest::RespondFileTransfer { + cid: *cid_b, + peer_cid: *cid_a, + object_id: metadata.object_id, + accept: true, + download_location: None, + request_id: Uuid::new_v4(), + }; + to_service_b.send(file_transfer_accept_payload).unwrap(); + info!(target:"citadel", "Accepted File Transfer {cid_b}"); } else { - panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); + panic!("File Transfer P2P Failure"); } + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","{deserialized_service_a_payload_response:?}"); + + // if let InternalServiceResponse::SendFileRequestSuccess(SendFileRequestSuccess { .. }) = + // &deserialized_service_a_payload_response + // { + // } else { + // panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); + // } + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; @@ -394,6 +396,9 @@ mod tests { } } + //exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; + //exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; + // Delete file on Peer REVFS let delete_file_command = InternalServiceRequest::DeleteVirtualFile { virtual_directory: virtual_path, From cb1d544fe2537f215211bd3e1a198828af2543bc Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Tue, 19 Mar 2024 23:04:59 -0500 Subject: [PATCH 12/17] File Transfer Fix WIP --- citadel-internal-service/src/kernel/mod.rs | 13 ++++++++++--- .../src/kernel/request_handler.rs | 10 +++++----- citadel-internal-service/tests/file_transfer.rs | 10 ++++------ citadel-internal-service/tests/loopback.rs | 4 ++++ 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 7e69000..6c9b806 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -282,13 +282,20 @@ impl NetKernel for CitadelWorkspaceService { let object_transfer_handler = object_transfer_handle.handle; let (implicated_cid, peer_cid) = if matches!( + // object_transfer_handler.orientation, + // ObjectTransferOrientation::Receiver { + // is_revfs_pull: true + // } object_transfer_handler.orientation, - ObjectTransferOrientation::Receiver { - is_revfs_pull: true - } + ObjectTransferOrientation::Sender, ) { // When this is a REVFS pull reception handle, THIS node is the source of the file. // The other node, i.e. the peer, is the receiver who is requesting the file. + //let (implicated_cid, peer_cid) = if matches!(object_transfer_handler.metadata.transfer_type, TransferType::RemoteEncryptedVirtualFilesystem { .. }) { + // ( + // object_transfer_handler.source, + // object_transfer_handler.receiver, + // ) ( object_transfer_handler.source, object_transfer_handler.receiver, diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index d4cf1f2..17da128 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -522,14 +522,14 @@ pub async fn handle_request( let mut server_connection_map = server_connection_map.lock().await; if let Some(_conn) = - server_connection_map.get_mut(&original_target_cid) + server_connection_map.get_mut(&original_source_cid) { // The Sender is a local user, so we start the tick updater info!(target: "citadel", "Spawning Tick Updater For Send file Target {original_target_cid:?} who is CID {cid:?}"); spawn_tick_updater( handle, - original_target_cid, - Some(original_source_cid), + original_source_cid, + Some(original_target_cid), &mut server_connection_map, tcp_connection_map.clone(), ); @@ -728,7 +728,7 @@ pub async fn handle_request( match status { NodeResult::ObjectTransferHandle(ObjectTransferHandle { ticket: _ticket, - mut handle, + handle, }) => { if let Some(peer_cid) = peer_cid { // P2P REVFS Pull - Reroute if needed, otherwise spawn tick updater @@ -743,7 +743,7 @@ pub async fn handle_request( (handle.receiver, handle.source) } else { info!(target: "citadel", "Download Orientation: Sender"); - (handle.receiver, handle.source) + (handle.source, handle.receiver) }; let mut server_connection_map = server_connection_map.lock().await; if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index 9c0deff..eea6f53 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -10,9 +10,8 @@ mod tests { use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_types::{ DeleteVirtualFileSuccess, DownloadFileFailure, DownloadFileSuccess, - FileTransferRequestNotification, FileTransferStatusNotification, - FileTransferTickNotification, InternalServiceRequest, InternalServiceResponse, - SendFileRequestFailure, SendFileRequestSuccess, + FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, + InternalServiceResponse, SendFileRequestFailure, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -25,7 +24,6 @@ mod tests { use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; - use tokio::sync::mpsc::UnboundedReceiver; use uuid::Uuid; #[tokio::test] @@ -396,8 +394,8 @@ mod tests { } } - //exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; - //exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; // Delete file on Peer REVFS let delete_file_command = InternalServiceRequest::DeleteVirtualFile { diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index f7d69b4..f0e4a91 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -342,6 +342,7 @@ mod tests { request_id: Uuid::new_v4(), }; peer_0_tx.send(download_file_command).unwrap(); + citadel_logging::info!(target: "citadel", "Peer 0 Waiting for DownloadFileSuccess Response"); let download_file_response = peer_0_rx.recv().await.unwrap(); match download_file_response { InternalServiceResponse::DownloadFileSuccess(DownloadFileSuccess { @@ -355,6 +356,9 @@ mod tests { } } + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_1_rx).await; + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_0_rx).await; + citadel_logging::info!(target: "citadel", "Peer 0 Requesting to Delete File"); // Delete file on Peer REVFS From cd4f4f66adeb251445b309c120a4d09eb114031d Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sun, 24 Mar 2024 00:20:36 -0500 Subject: [PATCH 13/17] Message Fix and Intra-Kernel Message Test --- .../src/kernel/request_handler.rs | 28 +++-- citadel-internal-service/tests/loopback.rs | 104 +++++++++++++++++- 2 files changed, 119 insertions(+), 13 deletions(-) diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 17da128..b225fc2 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -368,12 +368,12 @@ pub async fn handle_request( } => { match server_connection_map.lock().await.get_mut(&cid) { Some(conn) => { - if let Some(peer_cid) = peer_cid { + let result = if let Some(peer_cid) = peer_cid { // send to peer if let Some(peer_conn) = conn.peers.get_mut(&peer_cid) { peer_conn.sink.set_security_level(security_level); // TODO no unwraps on send_message. We need to handle errors properly - peer_conn.sink.send_message(message.into()).await.unwrap(); + peer_conn.sink.send_message(message.into()).await } else { // TODO: refactor all connection not found messages, we have too many duplicates info!(target: "citadel","connection not found"); @@ -387,24 +387,28 @@ pub async fn handle_request( uuid, ) .await; + return; } } else { // send to server conn.sink_to_server.set_security_level(security_level); - conn.sink_to_server - .send_message(message.into()) - .await - .unwrap(); - } - - let response = - InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { + conn.sink_to_server.send_message(message.into()).await + }; + let response = match result { + Ok(_) => InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { cid, peer_cid, request_id: Some(request_id), - }); + }), + Err(err) => { + InternalServiceResponse::MessageSendFailure(MessageSendFailure { + cid, + message: format!("Message Send Failure: {err:?}"), + request_id: Some(request_id), + }) + } + }; send_response_to_tcp_client(tcp_connection_map, response, uuid).await; - info!(target: "citadel", "Into the message handler command send") } None => { info!(target: "citadel","connection not found"); diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index f0e4a91..aeed211 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -10,7 +10,7 @@ mod tests { use citadel_internal_service_types::{ DeleteVirtualFileSuccess, DownloadFileSuccess, FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, InternalServiceResponse, - SendFileRequestSuccess, + MessageNotification, MessageSendFailure, MessageSendSuccess, SendFileRequestSuccess, }; use citadel_sdk::prelude::*; use std::path::PathBuf; @@ -82,6 +82,108 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_intra_kernel_peer_message() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + let message_request = InternalServiceRequest::Message { + request_id: Uuid::new_v4(), + message: "Test Message From Peer 0.".to_string().into_bytes(), + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + security_level: Default::default(), + }; + peer_0_tx.send(message_request)?; + match peer_0_rx.recv().await.unwrap() { + InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { .. }) => { + citadel_logging::info!(target: "citadel", "Message Successfully Sent from Peer 0 to Peer 1."); + } + InternalServiceResponse::MessageSendFailure(MessageSendFailure { + cid: _, + message, + request_id: _, + }) => { + panic!("Message Sending Failed With Error: {message:?}") + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + match peer_1_rx.recv().await.unwrap() { + InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) => { + citadel_logging::info!(target: "citadel", "Message from Peer 0 Successfully Received at Peer 1: {message:?}"); + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + Ok(()) + } + #[tokio::test] async fn test_intra_kernel_send_file() -> Result<(), Box> { crate::common::setup_log(); From 1969582421e855349bd8f494456a9a1bc87d876c Mon Sep 17 00:00:00 2001 From: Thomas Braun Date: Thu, 28 Mar 2024 15:02:59 -0400 Subject: [PATCH 14/17] Cleanup and point to latest SDK master branch. Loopback fs/revfs not working yet --- Cargo.toml | 6 +- citadel-internal-service-types/src/lib.rs | 4 +- citadel-internal-service/Cargo.toml | 1 + .../src/kernel/request_handler.rs | 531 ++++-------------- citadel-internal-service/tests/group_chat.rs | 4 +- citadel-internal-service/tests/loopback.rs | 104 +++- 6 files changed, 216 insertions(+), 434 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7106e2f..bb339b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,9 @@ citadel-internal-service-connector = { path = "./citadel-internal-service-connec citadel-internal-service-macros = { path = "./citadel-internal-service-macros", default-features = false, version = "0.1.0" } # Avarok deps -citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" } -citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" } -citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat", default-features = false } +citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } +citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } +citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } # Standard deps serde = { version = "1.0.104", features = ["derive"] } diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 8ad44ee..cdef49b 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -360,7 +360,7 @@ pub struct GroupKickFailure { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct GroupListGroupsSuccess { pub cid: u64, - pub peer_cid: u64, + pub peer_cid: Option, pub group_list: Option>, pub request_id: Option, } @@ -814,7 +814,7 @@ pub enum InternalServiceRequest { }, GroupListGroupsFor { cid: u64, - peer_cid: u64, + peer_cid: Option, request_id: Uuid, }, GroupRequestJoin { diff --git a/citadel-internal-service/Cargo.toml b/citadel-internal-service/Cargo.toml index 850e12e..00b9861 100644 --- a/citadel-internal-service/Cargo.toml +++ b/citadel-internal-service/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +default = [] vendored = ["citadel_sdk/vendored"] [dependencies] diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 17da128..90fbe6d 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -7,12 +7,9 @@ use citadel_internal_service_types::*; use citadel_logging::tracing::log; use citadel_logging::{error, info}; use citadel_sdk::prefabs::ClientServerRemote; -use citadel_sdk::prelude::remote_specialization::PeerRemote; -use citadel_sdk::prelude::results::PeerRegisterStatus; use citadel_sdk::prelude::*; use futures::StreamExt; use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; @@ -368,12 +365,12 @@ pub async fn handle_request( } => { match server_connection_map.lock().await.get_mut(&cid) { Some(conn) => { - if let Some(peer_cid) = peer_cid { + let result = if let Some(peer_cid) = peer_cid { // send to peer if let Some(peer_conn) = conn.peers.get_mut(&peer_cid) { peer_conn.sink.set_security_level(security_level); // TODO no unwraps on send_message. We need to handle errors properly - peer_conn.sink.send_message(message.into()).await.unwrap(); + peer_conn.sink.send_message(message.into()).await } else { // TODO: refactor all connection not found messages, we have too many duplicates info!(target: "citadel","connection not found"); @@ -387,24 +384,28 @@ pub async fn handle_request( uuid, ) .await; + return; } } else { // send to server conn.sink_to_server.set_security_level(security_level); - conn.sink_to_server - .send_message(message.into()) - .await - .unwrap(); - } - - let response = - InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { + conn.sink_to_server.send_message(message.into()).await + }; + let response = match result { + Ok(_) => InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { cid, peer_cid, request_id: Some(request_id), - }); + }), + Err(err) => { + InternalServiceResponse::MessageSendFailure(MessageSendFailure { + cid, + message: format!("Message Send Failure: {err:?}"), + request_id: Some(request_id), + }) + } + }; send_response_to_tcp_client(tcp_connection_map, response, uuid).await; - info!(target: "citadel", "Into the message handler command send") } None => { info!(target: "citadel","connection not found"); @@ -470,129 +471,26 @@ pub async fn handle_request( info!(target: "citadel", "Send File Peer Remote Exists"); let peer_remote = &peer_remote.remote.clone(); drop(lock); - let scan_for_status = |status| async move { - info!(target: "citadel", "Send File Scan For Status Entered"); - match status { - NodeResult::ObjectTransferHandle(ObjectTransferHandle { - ticket: _ticket, - handle, - }) => { - let original_target_cid = handle.receiver; - let original_source_cid = handle.source; - let handle_metadata = handle.metadata.clone(); - - return if let ObjectTransferOrientation::Receiver { - is_revfs_pull: _, - } = handle.orientation - { - info!(target: "citadel", "Send File Receiver Entered"); - if original_target_cid == peer_cid { - info!(target: "citadel", "Send File Receiver is targeted at Peer CID"); - let mut lock = server_connection_map.lock().await; - // Route the signal to the intra-kernel user if possible - if let Some(peer) = - lock.get_mut(&original_target_cid) - { - info!(target: "citadel", "Send File Receiver - Target is local, rerouting"); - let peer_uuid = peer.associated_tcp_connection; - send_response_to_tcp_client( - tcp_connection_map, - InternalServiceResponse::FileTransferRequestNotification( - FileTransferRequestNotification { - cid: original_target_cid, - peer_cid: original_source_cid, - metadata: handle_metadata.clone(), - }, - ), - peer_uuid, - ) - .await; - peer.add_object_transfer_handler( - cid, - handle_metadata.object_id, - Some(handle), - ); - return Ok(Some(())); - } - } - Ok(None) - } else { - info!(target: "citadel", "Sender should be spawning tick updater on Send File"); - info!(target: "citadel", "Sender Source CID: {original_source_cid:?} and Actual CID: {cid:?}"); - let mut server_connection_map = - server_connection_map.lock().await; - if let Some(_conn) = - server_connection_map.get_mut(&original_source_cid) - { - // The Sender is a local user, so we start the tick updater - info!(target: "citadel", "Spawning Tick Updater For Send file Target {original_target_cid:?} who is CID {cid:?}"); - spawn_tick_updater( - handle, - original_source_cid, - Some(original_target_cid), - &mut server_connection_map, - tcp_connection_map.clone(), - ); - return Ok(Some(())); - } - Ok(None) - }; - } - res => { - log::error!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {:?}", res); - Err(NetworkError::msg( - "Invalid Response Received During FileTransfer", - )) - } - } - }; info!(target: "citadel", "Send File Remote Method Call"); peer_remote - .send_file_with_custom_opts_and_with_fn( + .send_file_with_custom_opts( source, chunk_size.unwrap_or_default(), transfer_type, - scan_for_status, ) .await } else { Err(NetworkError::msg("Peer Connection Not Found")) } } else { - let send_file_c2s = |status| async move { - match status { - NodeResult::ObjectTransferHandle(ObjectTransferHandle { - ticket: _ticket, - handle, - }) => { - let mut server_connection_map = - server_connection_map.lock().await; - spawn_tick_updater( - handle, - cid, - None, - &mut server_connection_map, - tcp_connection_map.clone(), - ); - Ok(Some(())) - } - res => { - log::error!(target: "citadel", "Invalid NodeResult for FileTransfer request received: {:?}", res); - Err(NetworkError::msg( - "Invalid Response Received During FileTransfer", - )) - } - } - }; let client_server_remote = conn.client_server_remote.clone(); drop(lock); client_server_remote - .send_file_with_custom_opts_and_with_fn( + .send_file_with_custom_opts( source, chunk_size.unwrap_or_default(), transfer_type, - send_file_c2s, ) .await }; @@ -724,91 +622,6 @@ pub async fn handle_request( request_id, } => { let security_level = security_level.unwrap_or_default(); - let scan_for_status = |status| async move { - match status { - NodeResult::ObjectTransferHandle(ObjectTransferHandle { - ticket: _ticket, - handle, - }) => { - if let Some(peer_cid) = peer_cid { - // P2P REVFS Pull - Reroute if needed, otherwise spawn tick updater - info!(target: "citadel", "Download Source: {:?} Receiver: {:?}", handle.source, handle.receiver); - info!(target: "citadel", "Download CID: {cid:?} Peer: {peer_cid:?}"); - let (implicated_cid, peer_cid) = - if let ObjectTransferOrientation::Receiver { - is_revfs_pull: true, - } = handle.orientation - { - info!(target: "citadel", "Download Orientation: Receiver"); - (handle.receiver, handle.source) - } else { - info!(target: "citadel", "Download Orientation: Sender"); - (handle.source, handle.receiver) - }; - let mut server_connection_map = server_connection_map.lock().await; - if let Some(_conn) = server_connection_map.get_mut(&implicated_cid) { - info!(target: "citadel", "Download Peer - Spawning Tick Updater as Receiver"); - // let mut local_path = None; - // while let Some(res) = handle.next().await { - // match res { - // ObjectTransferStatus::ReceptionBeginning(path, _) => { - // info!(target: "citadel", "Received ReceptionBeginning Status"); - // local_path = Some(path) - // } - // ObjectTransferStatus::ReceptionComplete => { - // info!(target: "citadel", "Received ReceptionComplete Status"); - // break; - // } - // ObjectTransferStatus::TransferComplete => { - // info!(target: "citadel", "Received TransferComplete Status"); - // break; - // } - // _ => { - // info!(target: "citadel", "Received {res:?} Status"); - // } - // } - // } - spawn_tick_updater( - handle, - implicated_cid, - Some(peer_cid), - &mut server_connection_map, - tcp_connection_map.clone(), - ); - // if local_path.is_some() { - // Ok(local_path) - // } else { - // Err(NetworkError::InternalError("Local path never loaded")) - // } - Ok(Some(PathBuf::default())) - } else { - info!(target: "citadel", "Download File - Returning None"); - Ok(None) - } - } else { - // C2S REVFS Pull - We can just start a tick updater - info!(target: "citadel", "Download C2S Receiver Starting Tick Updater"); - let mut server_connection_map = server_connection_map.lock().await; - spawn_tick_updater( - handle, - cid, - None, - &mut server_connection_map, - tcp_connection_map.clone(), - ); - Ok(Some(PathBuf::default())) - } - } - - res => { - log::error!(target: "citadel", "Invalid NodeResult for REVFS DownloadFile request received: {:?}", res); - Err(NetworkError::msg( - "Invalid Response Received During REVFS Pull", - )) - } - } - }; - let mut lock = server_connection_map.lock().await; match lock.get_mut(&cid) { Some(conn) => { @@ -817,11 +630,10 @@ pub async fn handle_request( let peer_remote = peer_remote.remote.clone(); drop(lock); peer_remote - .remote_encrypted_virtual_filesystem_pull_with_fn( + .remote_encrypted_virtual_filesystem_pull( virtual_directory, security_level, delete_on_pull, - scan_for_status, ) .await } else { @@ -831,11 +643,10 @@ pub async fn handle_request( let client_server_remote = conn.client_server_remote.clone(); drop(lock); client_server_remote - .remote_encrypted_virtual_filesystem_pull_with_fn( + .remote_encrypted_virtual_filesystem_pull( virtual_directory, security_level, delete_on_pull, - scan_for_status, ) .await }; @@ -982,68 +793,9 @@ pub async fn handle_request( session_security_settings, ); - let remote_for_scanner = &remote; - let scan_for_status = |status| async move { - if let NodeResult::PeerEvent(PeerEvent { - event: - PeerSignal::PostRegister { - peer_conn_type, - inviter_username: _, - invitee_username: _, - ticket_opt: _, - invitee_response: resp, - }, - ticket: _, - }) = status - { - match resp { - Some(PeerResponse::Accept(..)) => Ok(Some(PeerRegisterStatus::Accepted)), - Some(PeerResponse::Decline) => Ok(Some(PeerRegisterStatus::Declined)), - Some(PeerResponse::Timeout) => Ok(Some(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) })), - _ => { - // This may be a signal needing to be routed - if peer_conn_type.get_original_target_cid() == peer_cid { - // If the target peer is the same as the peer_cid, that means that - // the peer is in the same kernel space. Route the signal through the TCP client - let lock = server_connection_map.lock().await; - if let Some(peer) = lock.get(&peer_cid) { - let username_of_sender = remote_for_scanner.account_manager().get_username_by_cid(cid).await - .map_err(|err| NetworkError::msg(format!("Unable to get username for cid: {cid}. Error: {err:?}")))? - .ok_or_else(|| NetworkError::msg(format!("Unable to get username for cid: {cid}")))?; - - let peer_uuid = peer.associated_tcp_connection; - drop(lock); - - log::debug!(target: "citadel", "Routing signal meant for other peer in intra-kernel space"); - send_response_to_tcp_client( - tcp_connection_map, - InternalServiceResponse::PeerRegisterNotification( - PeerRegisterNotification { - cid: peer_cid, - peer_cid: cid, - peer_username: username_of_sender, - request_id: Some(request_id), - }, - ), - peer_uuid - ).await - } - } - - Ok(None) - } - } - } else { - Ok(None) - } - }; - match client_to_server_remote.propose_target(cid, peer_cid).await { Ok(symmetric_identifier_handle_ref) => { - match symmetric_identifier_handle_ref - .register_to_peer_with_fn(scan_for_status) - .await - { + match symmetric_identifier_handle_ref.register_to_peer().await { Ok(_peer_register_success) => { let account_manager = symmetric_identifier_handle_ref.account_manager(); if let Ok(target_information) = @@ -1144,95 +896,8 @@ pub async fn handle_request( match client_to_server_remote.find_target(cid, peer_cid).await { Ok(symmetric_identifier_handle) => { - let symmetric_identifier_handle = &symmetric_identifier_handle.into_owned(); - - let remote_for_scanner = &remote; - let signal_scanner = |status| async move { - match status { - NodeResult::PeerChannelCreated(PeerChannelCreated { - ticket: _, - channel, - udp_rx_opt, - }) => { - let username = - symmetric_identifier_handle.target_username().cloned(); - - let remote = PeerRemote { - inner: (*remote_for_scanner).clone(), - peer: symmetric_identifier_handle - .try_as_peer_connection() - .await? - .as_virtual_connection(), - username, - session_security_settings, - }; - - Ok(Some(results::PeerConnectSuccess { - remote, - channel, - udp_channel_rx: udp_rx_opt, - incoming_object_transfer_handles: None, - })) - } - - NodeResult::PeerEvent(PeerEvent { - event: - PeerSignal::PostConnect { - peer_conn_type: _, - ticket_opt: _, - invitee_response: Some(PeerResponse::Decline), - .. - }, - .. - }) => Err(NetworkError::msg(format!( - "Peer {peer_cid} declined connection" - ))), - - NodeResult::PeerEvent(PeerEvent { - event: - PeerSignal::PostConnect { - peer_conn_type, - ticket_opt: _, - invitee_response: None, - .. - }, - .. - }) => { - // Route the signal to the intra-kernel user - if peer_conn_type.get_original_target_cid() == peer_cid { - let lock = server_connection_map.lock().await; - if let Some(conn) = lock.get(&peer_cid) { - let peer_uuid = conn.associated_tcp_connection; - send_response_to_tcp_client( - tcp_connection_map, - InternalServiceResponse::PeerConnectNotification( - PeerConnectNotification { - cid: peer_cid, - peer_cid: cid, - session_security_settings, - udp_mode, - request_id: Some(request_id), - }, - ), - peer_uuid, - ) - .await - } - } - - Ok(None) - } - - _ => Ok(None), - } - }; - match symmetric_identifier_handle - .connect_to_peer_with_fn( - session_security_settings, - udp_mode, - signal_scanner, - ) + .connect_to_peer_custom(session_security_settings, udp_mode) .await { Ok(peer_connect_success) => { @@ -1350,7 +1015,7 @@ pub async fn handle_request( None => { // TODO: handle none case } - Some(target_peer) => match target_peer.remote.inner.send(request).await { + Some(target_peer) => match target_peer.remote.remote().send(request).await { Ok(ticket) => { conn.clear_peer_connection(peer_cid); let peer_disconnect_success = @@ -2089,29 +1754,18 @@ pub async fn handle_request( } => { let mut server_connection_map = server_connection_map.lock().await; if let Some(connection) = server_connection_map.get_mut(&cid) { - if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { - let peer_remote = peer_connection.remote.clone(); - drop(server_connection_map); - let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand { - implicated_cid: cid, - command: GroupBroadcast::ListGroupsFor { cid: peer_cid }, - }); - if let Ok(mut subscription) = - peer_remote.inner.send_callback_subscription(request).await - { - if let Some(evt) = subscription.next().await { - if let NodeResult::GroupEvent(GroupEvent { - implicated_cid: _, - ticket: _, - event: GroupBroadcast::ListResponse { groups }, - }) = evt - { + if let Some(peer_cid) = peer_cid { + if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { + let peer_remote = peer_connection.remote.clone(); + drop(server_connection_map); + match peer_remote.list_owned_groups().await { + Ok(groups) => { send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::GroupListGroupsSuccess( GroupListGroupsSuccess { cid, - peer_cid, + peer_cid: Some(peer_cid), group_list: Some(groups), request_id: Some(request_id), }, @@ -2119,13 +1773,14 @@ pub async fn handle_request( uuid, ) .await; - } else { + } + Err(err) => { send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::GroupListGroupsFailure( GroupListGroupsFailure { cid, - message: "Could Not List Groups - Failed".to_string(), + message: err.to_string(), request_id: Some(request_id), }, ), @@ -2133,20 +1788,6 @@ pub async fn handle_request( ) .await; } - } else { - send_response_to_tcp_client( - tcp_connection_map, - InternalServiceResponse::GroupListGroupsFailure( - GroupListGroupsFailure { - cid, - message: "Could Not List Groups - Subscription Error" - .to_string(), - request_id: Some(request_id), - }, - ), - uuid, - ) - .await; } } else { send_response_to_tcp_client( @@ -2154,8 +1795,7 @@ pub async fn handle_request( InternalServiceResponse::GroupListGroupsFailure( GroupListGroupsFailure { cid, - message: "Could Not List Groups - Subscription Error" - .to_string(), + message: "Could Not List Groups - Peer not found".to_string(), request_id: Some(request_id), }, ), @@ -2164,16 +1804,39 @@ pub async fn handle_request( .await; } } else { - send_response_to_tcp_client( - tcp_connection_map, - InternalServiceResponse::GroupListGroupsFailure(GroupListGroupsFailure { - cid, - message: "Could Not List Groups - Peer not found".to_string(), - request_id: Some(request_id), - }), - uuid, - ) - .await; + let remote = connection.client_server_remote.clone(); + drop(server_connection_map); + match remote.list_owned_groups().await { + Ok(groups) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsSuccess( + GroupListGroupsSuccess { + cid, + peer_cid: None, + group_list: Some(groups), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsFailure( + GroupListGroupsFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } } } else { send_response_to_tcp_client( @@ -2212,11 +1875,17 @@ pub async fn handle_request( implicated_cid: cid, command: group_request, }); - let mut server_connection_map = server_connection_map.lock().await; - if let Some(connection) = server_connection_map.get_mut(&cid) { + + let mut server_connection_map_lock = server_connection_map.lock().await; + if let Some(connection) = server_connection_map_lock.get_mut(&cid) { if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { let peer_remote = peer_connection.remote.clone(); - match peer_remote.inner.send_callback_subscription(request).await { + drop(server_connection_map_lock); + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = false; if invitation { @@ -2226,30 +1895,37 @@ pub async fn handle_request( NodeResult::GroupChannelCreated(GroupChannelCreated { ticket: _, channel, + implicated_cid: _, }) => { let key = channel.key(); let group_cid = channel.cid(); let (tx, rx) = channel.split(); - connection.add_group_channel( - key, - GroupConnection { + let mut server_connection_map_lock = + server_connection_map.lock().await; + if let Some(connection) = + server_connection_map_lock.get_mut(&cid) + { + connection.add_group_channel( key, - tx, - cid: group_cid, - }, - ); + GroupConnection { + key, + tx, + cid: group_cid, + }, + ); - let uuid = connection.associated_tcp_connection; - spawn_group_channel_receiver( - key, - cid, - uuid, - rx, - tcp_connection_map.clone(), - ); + let uuid = connection.associated_tcp_connection; + spawn_group_channel_receiver( + key, + cid, + uuid, + rx, + tcp_connection_map.clone(), + ); - result = true; - break; + result = true; + break; + } } NodeResult::GroupEvent(GroupEvent { implicated_cid: _, @@ -2281,7 +1957,6 @@ pub async fn handle_request( _ => {} }; } - drop(server_connection_map); } else { // For now we return a Success response - we did, in fact, receive the KernelStreamSubscription result = true; @@ -2356,7 +2031,11 @@ pub async fn handle_request( implicated_cid: cid, command: group_request, }); - match peer_remote.inner.send_callback_subscription(request).await { + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = Err("Group Request Join Failed".to_string()); while let Some(evt) = subscription.next().await { diff --git a/citadel-internal-service/tests/group_chat.rs b/citadel-internal-service/tests/group_chat.rs index 965fd3b..6b802a9 100644 --- a/citadel-internal-service/tests/group_chat.rs +++ b/citadel-internal-service/tests/group_chat.rs @@ -373,7 +373,7 @@ mod tests { // Service B Requests to Join and Service A Accepts let service_b_group_outbound = InternalServiceRequest::GroupListGroupsFor { cid: cid_b, - peer_cid: cid_a, + peer_cid: Some(cid_a), request_id: Uuid::new_v4(), }; to_service_b.send(service_b_group_outbound).unwrap(); @@ -475,7 +475,7 @@ mod tests { // Service C Requests to Join and Service A Declines let service_c_group_outbound = InternalServiceRequest::GroupListGroupsFor { cid: cid_c, - peer_cid: cid_a, + peer_cid: Some(cid_a), request_id: Uuid::new_v4(), }; to_service_c.send(service_c_group_outbound).unwrap(); diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/loopback.rs index f0e4a91..aeed211 100644 --- a/citadel-internal-service/tests/loopback.rs +++ b/citadel-internal-service/tests/loopback.rs @@ -10,7 +10,7 @@ mod tests { use citadel_internal_service_types::{ DeleteVirtualFileSuccess, DownloadFileSuccess, FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, InternalServiceResponse, - SendFileRequestSuccess, + MessageNotification, MessageSendFailure, MessageSendSuccess, SendFileRequestSuccess, }; use citadel_sdk::prelude::*; use std::path::PathBuf; @@ -82,6 +82,108 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_intra_kernel_peer_message() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + let message_request = InternalServiceRequest::Message { + request_id: Uuid::new_v4(), + message: "Test Message From Peer 0.".to_string().into_bytes(), + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + security_level: Default::default(), + }; + peer_0_tx.send(message_request)?; + match peer_0_rx.recv().await.unwrap() { + InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { .. }) => { + citadel_logging::info!(target: "citadel", "Message Successfully Sent from Peer 0 to Peer 1."); + } + InternalServiceResponse::MessageSendFailure(MessageSendFailure { + cid: _, + message, + request_id: _, + }) => { + panic!("Message Sending Failed With Error: {message:?}") + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + match peer_1_rx.recv().await.unwrap() { + InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) => { + citadel_logging::info!(target: "citadel", "Message from Peer 0 Successfully Received at Peer 1: {message:?}"); + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + Ok(()) + } + #[tokio::test] async fn test_intra_kernel_send_file() -> Result<(), Box> { crate::common::setup_log(); From fb79b0d85d3f58e41f86c4f230bb7a9988df53a6 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sat, 30 Mar 2024 18:04:12 -0500 Subject: [PATCH 15/17] File Transfer intra-kernel tests --- citadel-internal-service/src/kernel/mod.rs | 27 +------ .../src/kernel/request_handler.rs | 75 +++++++++---------- 2 files changed, 38 insertions(+), 64 deletions(-) diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 6c9b806..bad1279 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -281,30 +281,11 @@ impl NetKernel for CitadelWorkspaceService { let object_id = metadata.object_id; let object_transfer_handler = object_transfer_handle.handle; - let (implicated_cid, peer_cid) = if matches!( - // object_transfer_handler.orientation, - // ObjectTransferOrientation::Receiver { - // is_revfs_pull: true - // } - object_transfer_handler.orientation, - ObjectTransferOrientation::Sender, - ) { - // When this is a REVFS pull reception handle, THIS node is the source of the file. - // The other node, i.e. the peer, is the receiver who is requesting the file. - //let (implicated_cid, peer_cid) = if matches!(object_transfer_handler.metadata.transfer_type, TransferType::RemoteEncryptedVirtualFilesystem { .. }) { - // ( - // object_transfer_handler.source, - // object_transfer_handler.receiver, - // ) - ( - object_transfer_handler.source, - object_transfer_handler.receiver, - ) + let implicated_cid = object_transfer_handle.implicated_cid; + let peer_cid = if object_transfer_handler.receiver != implicated_cid { + object_transfer_handler.receiver } else { - ( - object_transfer_handler.receiver, - object_transfer_handler.source, - ) + object_transfer_handler.source }; citadel_logging::info!(target: "citadel", "Orientation: {:?}", object_transfer_handler.orientation); diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs index 90fbe6d..77c4002 100644 --- a/citadel-internal-service/src/kernel/request_handler.rs +++ b/citadel-internal-service/src/kernel/request_handler.rs @@ -464,40 +464,38 @@ pub async fn handle_request( let lock = server_connection_map.lock().await; match lock.get(&cid) { Some(conn) => { - info!(target: "citadel", "Send File Server Connection Exists"); let result = if let Some(peer_cid) = peer_cid { - info!(target: "citadel", "Send File Peer Version"); if let Some(peer_remote) = conn.peers.get(&peer_cid) { - info!(target: "citadel", "Send File Peer Remote Exists"); let peer_remote = &peer_remote.remote.clone(); drop(lock); - - info!(target: "citadel", "Send File Remote Method Call"); - peer_remote - .send_file_with_custom_opts( - source, - chunk_size.unwrap_or_default(), - transfer_type, - ) - .await + let request = NodeRequest::SendObject(SendObject { + source: Box::new(source), + chunk_size, + implicated_cid: cid, + v_conn_type: *peer_remote.user(), + transfer_type, + }); + peer_remote.remote().send(request).await } else { Err(NetworkError::msg("Peer Connection Not Found")) } } else { let client_server_remote = conn.client_server_remote.clone(); drop(lock); - client_server_remote - .send_file_with_custom_opts( - source, - chunk_size.unwrap_or_default(), - transfer_type, - ) - .await + let request = NodeRequest::SendObject(SendObject { + source: Box::new(source), + chunk_size, + implicated_cid: cid, + v_conn_type: VirtualTargetType::LocalGroupServer { + implicated_cid: cid, + }, + transfer_type, + }); + client_server_remote.remote().send(request).await }; match result { Ok(_) => { - info!(target: "citadel","InternalServiceRequest Send File Success"); send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::SendFileRequestSuccess( @@ -512,7 +510,6 @@ pub async fn handle_request( } Err(err) => { - info!(target: "citadel","InternalServiceRequest Send File Failure"); send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::SendFileRequestFailure( @@ -529,7 +526,6 @@ pub async fn handle_request( } } None => { - info!(target: "citadel","server connection not found"); send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::SendFileRequestFailure(SendFileRequestFailure { @@ -627,32 +623,31 @@ pub async fn handle_request( Some(conn) => { let result = if let Some(peer_cid) = peer_cid { if let Some(peer_remote) = conn.peers.get_mut(&peer_cid) { - let peer_remote = peer_remote.remote.clone(); + let peer_remote = &peer_remote.remote.clone(); drop(lock); - peer_remote - .remote_encrypted_virtual_filesystem_pull( - virtual_directory, - security_level, - delete_on_pull, - ) - .await + let request = NodeRequest::PullObject(PullObject { + v_conn: *peer_remote.user(), + virtual_dir: virtual_directory, + delete_on_pull, + transfer_security_level: security_level, + }); + + peer_remote.remote().send(request).await } else { Err(NetworkError::msg("Peer Connection Not Found")) } } else { let client_server_remote = conn.client_server_remote.clone(); drop(lock); - client_server_remote - .remote_encrypted_virtual_filesystem_pull( - virtual_directory, - security_level, - delete_on_pull, - ) - .await + let request = NodeRequest::PullObject(PullObject { + v_conn: *client_server_remote.user(), + virtual_dir: virtual_directory, + delete_on_pull, + transfer_security_level: security_level, + }); + client_server_remote.remote().send(request).await }; - info!(target: "citadel", "Successfully Finished call to REVFS Pull Method"); - match result { Ok(_) => { send_response_to_tcp_client( @@ -681,7 +676,6 @@ pub async fn handle_request( } } None => { - info!(target: "citadel","server connection not found"); send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::DownloadFileFailure(DownloadFileFailure { @@ -761,7 +755,6 @@ pub async fn handle_request( } } None => { - info!(target: "citadel","server connection not found"); send_response_to_tcp_client( tcp_connection_map, InternalServiceResponse::DeleteVirtualFileFailure( From 41a926d1373eb0d600ba14ac140b99ed845685da Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sat, 30 Mar 2024 18:11:35 -0500 Subject: [PATCH 16/17] Intra-kernel tests file rename --- citadel-internal-service/tests/{loopback.rs => intra_kernel.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename citadel-internal-service/tests/{loopback.rs => intra_kernel.rs} (100%) diff --git a/citadel-internal-service/tests/loopback.rs b/citadel-internal-service/tests/intra_kernel.rs similarity index 100% rename from citadel-internal-service/tests/loopback.rs rename to citadel-internal-service/tests/intra_kernel.rs From 85d2236b6b591166e0d494570ec5f5daa699fe87 Mon Sep 17 00:00:00 2001 From: Tjemmmic Date: Sat, 30 Mar 2024 19:07:52 -0500 Subject: [PATCH 17/17] Cleanup and Fixes --- .../src/kernel/requests/file/respond_file_transfer.rs | 2 +- .../src/kernel/requests/group/request_join.rs | 8 ++++++-- .../src/kernel/requests/group/respond_request.rs | 9 +++++++-- .../src/kernel/requests/peer/connect.rs | 8 +++----- .../src/kernel/responses/object_transfer_handle.rs | 4 ++-- citadel-internal-service/tests/file_transfer.rs | 10 +--------- 6 files changed, 20 insertions(+), 21 deletions(-) diff --git a/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs b/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs index 5907f63..b5bf8fd 100644 --- a/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs +++ b/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs @@ -32,7 +32,7 @@ pub async fn handle( spawn_tick_updater( owned_handler, cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); diff --git a/citadel-internal-service/src/kernel/requests/group/request_join.rs b/citadel-internal-service/src/kernel/requests/group/request_join.rs index 02d8a0d..eeeb78b 100644 --- a/citadel-internal-service/src/kernel/requests/group/request_join.rs +++ b/citadel-internal-service/src/kernel/requests/group/request_join.rs @@ -5,7 +5,7 @@ use citadel_internal_service_types::{ InternalServiceResponse, }; use citadel_sdk::prelude::{ - GroupBroadcast, GroupBroadcastCommand, GroupEvent, NodeRequest, NodeResult, + GroupBroadcast, GroupBroadcastCommand, GroupEvent, NodeRequest, NodeResult, TargetLockedRemote, }; use futures::StreamExt; use uuid::Uuid; @@ -38,7 +38,11 @@ pub async fn handle( implicated_cid: cid, command: group_request, }); - match peer_remote.send_callback_subscription(request).await { + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = Err("Group Request Join Failed".to_string()); while let Some(evt) = subscription.next().await { diff --git a/citadel-internal-service/src/kernel/requests/group/respond_request.rs b/citadel-internal-service/src/kernel/requests/group/respond_request.rs index a3151e9..3508be7 100644 --- a/citadel-internal-service/src/kernel/requests/group/respond_request.rs +++ b/citadel-internal-service/src/kernel/requests/group/respond_request.rs @@ -5,7 +5,8 @@ use citadel_internal_service_types::{ InternalServiceResponse, }; use citadel_sdk::prelude::{ - GroupBroadcast, GroupBroadcastCommand, GroupChannelCreated, GroupEvent, NodeRequest, NodeResult, + GroupBroadcast, GroupBroadcastCommand, GroupChannelCreated, GroupEvent, NodeRequest, + NodeResult, TargetLockedRemote, }; use futures::StreamExt; use uuid::Uuid; @@ -52,7 +53,11 @@ pub async fn handle( let peer_remote = peer_connection.remote.clone(); drop(server_connection_map); - match peer_remote.send_callback_subscription(request).await { + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = false; if invitation { diff --git a/citadel-internal-service/src/kernel/requests/peer/connect.rs b/citadel-internal-service/src/kernel/requests/peer/connect.rs index 0cb3915..83228a5 100644 --- a/citadel-internal-service/src/kernel/requests/peer/connect.rs +++ b/citadel-internal-service/src/kernel/requests/peer/connect.rs @@ -38,6 +38,7 @@ pub async fn handle( if already_connected { let response = InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, + peer_cid, request_id: Some(request_id), }); @@ -67,11 +68,7 @@ pub async fn handle( .await .get_mut(&cid) .unwrap() - .add_peer_connection( - peer_cid, - sink, - symmetric_identifier_handle_ref.into_owned(), - ); + .add_peer_connection(peer_cid, sink, peer_connect_success.remote); let hm_for_conn = this.tcp_connection_map.clone(); @@ -101,6 +98,7 @@ pub async fn handle( InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, + peer_cid, request_id: Some(request_id), }) } diff --git a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs index 4475f84..bdceb9e 100644 --- a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs +++ b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs @@ -41,7 +41,7 @@ pub async fn handle( spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); @@ -74,7 +74,7 @@ pub async fn handle( spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index 659a982..5dfcadf 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -11,7 +11,7 @@ mod tests { use citadel_internal_service_types::{ DeleteVirtualFileSuccess, DownloadFileFailure, DownloadFileSuccess, FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, - InternalServiceResponse, SendFileRequestFailure, + InternalServiceResponse, SendFileRequestFailure, SendFileRequestSuccess, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -24,7 +24,6 @@ mod tests { use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; - use tokio::sync::mpsc::UnboundedReceiver; use uuid::Uuid; #[tokio::test] @@ -370,13 +369,6 @@ mod tests { let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); info!(target: "citadel","{deserialized_service_a_payload_response:?}"); - // if let InternalServiceResponse::SendFileRequestSuccess(SendFileRequestSuccess { .. }) = - // &deserialized_service_a_payload_response - // { - // } else { - // panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); - // } - exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await;