Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intra-kernel compatability #36

Merged
merged 19 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ citadel-internal-service-connector = { path = "./citadel-internal-service-connec
citadel-internal-service-macros = { path = "./citadel-internal-service-macros", default-features = false, version = "0.1.0" }

# Avarok deps
citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" }
citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" }
citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", default-features = false }
citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" }
citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat" }
citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/", branch = "intra-kernel-compat", default-features = false }

# Standard deps
serde = { version = "1.0.104", features = ["derive"] }
Expand All @@ -41,4 +41,5 @@ uuid = { version="1.3.3", features = [
anyhow = "1.0.71"
async-recursion = { version = "1.0.4" }
parking_lot = { version = "0.12.1" }
structopt = { version = "0.3.26" }
structopt = { version = "0.3.26" }
lazy_static = "1.4.0"
1 change: 1 addition & 0 deletions citadel-internal-service-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct DeleteVirtualFileFailure {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PeerConnectSuccess {
pub cid: u64,
pub peer_cid: u64,
pub request_id: Option<Uuid>,
}

Expand Down
29 changes: 17 additions & 12 deletions citadel-internal-service/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl NetKernel for CitadelWorkspaceService {
}

async fn on_start(&self) -> Result<(), NetworkError> {
let mut remote = self.remote.clone().unwrap();
let remote = self.remote.clone().unwrap();
let remote_for_closure = remote.clone();
let listener = tokio::net::TcpListener::bind(self.bind_address).await?;

Expand Down Expand Up @@ -198,14 +198,20 @@ impl NetKernel for CitadelWorkspaceService {
let inbound_command_task = async move {
while let Some((command, conn_id)) = rx.recv().await {
// TODO: handle error once payload_handler is fallible
handle_request(
command,
conn_id,
server_connection_map,
&mut remote,
tcp_connection_map,
)
.await;
let mut remote = remote.clone();
let server_connection_map = server_connection_map.clone();
let tcp_connection_map = tcp_connection_map.clone();
// Spawn the task, that way, we can handle multiple requests in parallel
tokio::task::spawn(async move {
handle_request(
command,
conn_id,
&server_connection_map,
&mut remote,
&tcp_connection_map,
)
.await;
});
}
Ok(())
};
Expand All @@ -221,7 +227,6 @@ impl NetKernel for CitadelWorkspaceService {
}

async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> {
info!(target: "citadel", "NODE EVENT RECEIVED WITH MESSAGE: {message:?}");
match message {
NodeResult::Disconnect(disconnect) => {
if let Some(conn) = disconnect.v_conn_type {
Expand Down Expand Up @@ -591,8 +596,8 @@ fn handle_connection(
}
}
}
Err(_) => {
warn!(target: "citadel", "Bad message from client");
Err(err) => {
warn!(target: "citadel", "Bad message from client: {err:?}");
}
}
}
Expand Down
205 changes: 182 additions & 23 deletions citadel-internal-service/src/kernel/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use citadel_internal_service_types::*;
use citadel_logging::tracing::log;
use citadel_logging::{error, info};
use citadel_sdk::prefabs::ClientServerRemote;
use citadel_sdk::prelude::remote_specialization::PeerRemote;
use citadel_sdk::prelude::results::PeerRegisterStatus;
use citadel_sdk::prelude::*;
use futures::StreamExt;
use std::collections::HashMap;
Expand Down Expand Up @@ -299,7 +301,6 @@ pub async fn handle_request(
session_security_settings,
request_id,
} => {
info!(target: "citadel", "About to connect to server {server_addr:?} for user {username}");
match remote
.register(
server_addr,
Expand Down Expand Up @@ -330,17 +331,23 @@ pub async fn handle_request(
request_id,
};

handle_request(
connect_command,
uuid,
server_connection_map,
remote,
tcp_connection_map,
)
.await
let server_connection_map = server_connection_map.clone();
let mut remote = remote.clone();
let tcp_connection_map = tcp_connection_map.clone();
tokio::task::spawn(async move {
handle_request(
connect_command,
uuid,
&server_connection_map,
&mut remote,
&tcp_connection_map,
)
.await
});
}
},
Err(err) => {
citadel_logging::error!(target: "citadel", "Failure on register: {err:?}");
let response = InternalServiceResponse::RegisterFailure(
citadel_internal_service_types::RegisterFailure {
message: err.into_string(),
Expand Down Expand Up @@ -777,9 +784,68 @@ pub async fn handle_request(
session_security_settings,
);

let remote_for_scanner = &remote;
let scan_for_status = |status| async move {
if let NodeResult::PeerEvent(PeerEvent {
event:
PeerSignal::PostRegister {
peer_conn_type,
inviter_username: _,
invitee_username: _,
ticket_opt: _,
invitee_response: resp,
},
ticket: _,
}) = status
{
match resp {
Some(PeerResponse::Accept(..)) => Ok(Some(PeerRegisterStatus::Accepted)),
Some(PeerResponse::Decline) => Ok(Some(PeerRegisterStatus::Declined)),
Some(PeerResponse::Timeout) => Ok(Some(PeerRegisterStatus::Failed { reason: Some("Timeout on register request. Peer did not accept in time. Try again later".to_string()) })),
_ => {
// This may be a signal needing to be routed
if peer_conn_type.get_original_target_cid() == peer_cid {
// If the target peer is the same as the peer_cid, that means that
// the peer is in the same kernel space. Route the signal through the TCP client
let lock = server_connection_map.lock().await;
if let Some(peer) = lock.get(&peer_cid) {
let username_of_sender = remote_for_scanner.account_manager().get_username_by_cid(cid).await
.map_err(|err| NetworkError::msg(format!("Unable to get username for cid: {cid}. Error: {err:?}")))?
.ok_or_else(|| NetworkError::msg(format!("Unable to get username for cid: {cid}")))?;

let peer_uuid = peer.associated_tcp_connection;
drop(lock);

log::debug!(target: "citadel", "Routing signal meant for other peer in intra-kernel space");
send_response_to_tcp_client(
tcp_connection_map,
InternalServiceResponse::PeerRegisterNotification(
PeerRegisterNotification {
cid: peer_cid,
peer_cid: cid,
peer_username: username_of_sender,
request_id: Some(request_id),
},
),
peer_uuid
).await
}
}

Ok(None)
}
}
} else {
Ok(None)
}
};

match client_to_server_remote.propose_target(cid, peer_cid).await {
Ok(symmetric_identifier_handle_ref) => {
match symmetric_identifier_handle_ref.register_to_peer().await {
match symmetric_identifier_handle_ref
.register_to_peer_with_fn(scan_for_status)
.await
{
Ok(_peer_register_success) => {
let account_manager = symmetric_identifier_handle_ref.account_manager();
if let Ok(target_information) =
Expand All @@ -796,14 +862,19 @@ pub async fn handle_request(
request_id,
};

handle_request(
connect_command,
uuid,
server_connection_map,
remote,
tcp_connection_map,
)
.await;
let server_connection_map = server_connection_map.clone();
let mut remote = remote.clone();
let tcp_connection_map = tcp_connection_map.clone();
tokio::task::spawn(async move {
handle_request(
connect_command,
uuid,
&server_connection_map,
&mut remote,
&tcp_connection_map,
)
.await;
});
}
false => {
send_response_to_tcp_client(
Expand Down Expand Up @@ -864,7 +935,6 @@ pub async fn handle_request(
session_security_settings,
request_id,
} => {
// TODO: check to see if peer is already in the hashmap
let client_to_server_remote = ClientServerRemote::new(
VirtualTargetType::LocalGroupPeer {
implicated_cid: cid,
Expand All @@ -873,10 +943,98 @@ pub async fn handle_request(
remote.clone(),
session_security_settings,
);

match client_to_server_remote.find_target(cid, peer_cid).await {
Ok(symmetric_identifier_handle_ref) => {
match symmetric_identifier_handle_ref
.connect_to_peer_custom(session_security_settings, udp_mode)
Ok(symmetric_identifier_handle) => {
let symmetric_identifier_handle = &symmetric_identifier_handle.into_owned();

let remote_for_scanner = &remote;
let signal_scanner = |status| async move {
match status {
NodeResult::PeerChannelCreated(PeerChannelCreated {
ticket: _,
channel,
udp_rx_opt,
}) => {
let username =
symmetric_identifier_handle.target_username().cloned();

let remote = PeerRemote {
inner: (*remote_for_scanner).clone(),
peer: symmetric_identifier_handle
.try_as_peer_connection()
.await?
.as_virtual_connection(),
username,
session_security_settings,
};

Ok(Some(results::PeerConnectSuccess {
remote,
channel,
udp_channel_rx: udp_rx_opt,
incoming_object_transfer_handles: None,
}))
}

NodeResult::PeerEvent(PeerEvent {
event:
PeerSignal::PostConnect {
peer_conn_type: _,
ticket_opt: _,
invitee_response: Some(PeerResponse::Decline),
..
},
..
}) => Err(NetworkError::msg(format!(
"Peer {peer_cid} declined connection"
))),

NodeResult::PeerEvent(PeerEvent {
event:
PeerSignal::PostConnect {
peer_conn_type,
ticket_opt: _,
invitee_response: None,
..
},
..
}) => {
// Route the signal to the intra-kernel user
if peer_conn_type.get_original_target_cid() == peer_cid {
let lock = server_connection_map.lock().await;
if let Some(conn) = lock.get(&peer_cid) {
let peer_uuid = conn.associated_tcp_connection;
send_response_to_tcp_client(
tcp_connection_map,
InternalServiceResponse::PeerConnectNotification(
PeerConnectNotification {
cid: peer_cid,
peer_cid: cid,
session_security_settings,
udp_mode,
request_id: Some(request_id),
},
),
peer_uuid,
)
.await
}
}

Ok(None)
}

_ => Ok(None),
}
};

match symmetric_identifier_handle
.connect_to_peer_with_fn(
session_security_settings,
udp_mode,
signal_scanner,
)
.await
{
Ok(peer_connect_success) => {
Expand All @@ -890,7 +1048,7 @@ pub async fn handle_request(
.add_peer_connection(
peer_cid,
sink,
symmetric_identifier_handle_ref.into_owned(),
symmetric_identifier_handle.clone(),
);

let hm_for_conn = tcp_connection_map.clone();
Expand All @@ -899,6 +1057,7 @@ pub async fn handle_request(
tcp_connection_map,
InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess {
cid,
peer_cid,
request_id: Some(request_id),
}),
uuid,
Expand Down
Loading
Loading