Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group Chat #26

Merged
merged 29 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
04acfc0
Starting Group Functions
Tjemmmic Sep 6, 2023
53c8a80
Group functions WIP
Tjemmmic Sep 12, 2023
9e92d90
Group functions cleanup
Tjemmmic Sep 27, 2023
628a6fe
Group Broadcast handling WIP
Tjemmmic Oct 4, 2023
3c6213a
cleanup
Oct 9, 2023
c191728
Commit to merge
Tjemmmic Oct 10, 2023
a10c93c
Merge remote-tracking branch 'origin/group_chat' into group_chat
Tjemmmic Oct 10, 2023
318ae2e
Group fixes
Tjemmmic Oct 16, 2023
6b72b4b
Group more fixes and cleanup
Tjemmmic Oct 16, 2023
91489c1
spawn_services refactor
Tjemmmic Oct 24, 2023
9f4b360
test common refactor
Tjemmmic Oct 25, 2023
2fe1537
Finished Test Refactor
Tjemmmic Oct 29, 2023
98ecdbf
Create group test file
Tjemmmic Oct 29, 2023
4bf2e03
SessionSecuritySettings Fix
Tjemmmic Nov 1, 2023
be23227
Group Creation Test Working
Tjemmmic Nov 11, 2023
2e4ce90
resolve conflicts
Nov 11, 2023
a8af842
clippy lints
Nov 11, 2023
c8eb89a
Group Invite Decline Fix
Tjemmmic Nov 13, 2023
ffff1ed
Merge remote-tracking branch 'origin/group_chat' into group_chat
Tjemmmic Nov 13, 2023
59c65f4
Request Join WIP
Tjemmmic Nov 27, 2023
4da47d5
Group Broadcast Handling and Tests WIP
Tjemmmic Dec 4, 2023
98debd5
Group Tests - Invite, Join, Leave, and End
Tjemmmic Dec 9, 2023
3958f81
Group Kick Test
Tjemmmic Dec 13, 2023
8f4947a
Finalizing Group Tests
Tjemmmic Dec 17, 2023
ed17a91
Group Tests Workflow Fix
Tjemmmic Dec 17, 2023
f8505d3
Group Kick response fix
Tjemmmic Dec 17, 2023
5d370a1
Review Cleanup
Tjemmmic Dec 20, 2023
02b974d
Resolving Review Issues
Tjemmmic Dec 22, 2023
e916124
Protocol dependency to Master
Tjemmmic Jan 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ citadel_workspace_lib = { path = "./citadel_workspace_lib", default-features = f

# standard deps
serde = { version = "1.0.104", features = ["derive"] }
citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", branch = "master"}
citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", branch = "group_channel_update" }
Tjemmmic marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1.28.1", default-features = false }
tokio-util = { version = "0.7.8", default-features = false }
bincode2 = { version = "2.0.1", default-features = false }
Expand Down
272 changes: 256 additions & 16 deletions citadel_workspace_service/src/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Connection {
peers: HashMap<u64, PeerConnection>,
associated_tcp_connection: Uuid,
c2s_file_transfer_handlers: HashMap<u64, Option<ObjectTransferHandler>>,
groups: HashMap<MessageGroupKey, GroupConnection>,
}

#[allow(dead_code)]
Expand All @@ -54,6 +55,13 @@ struct PeerConnection {
associated_tcp_connection: Uuid,
}

#[allow(dead_code)]
Tjemmmic marked this conversation as resolved.
Show resolved Hide resolved
pub struct GroupConnection {
key: MessageGroupKey,
tx: GroupChannelSendHalf,
cid: u64,
}

impl Connection {
fn new(
sink: PeerChannelSendHalf,
Expand All @@ -66,6 +74,7 @@ impl Connection {
client_server_remote,
associated_tcp_connection,
c2s_file_transfer_handlers: HashMap::new(),
groups: HashMap::new(),
}
}

Expand Down Expand Up @@ -107,18 +116,13 @@ impl Connection {
}
}

// fn remove_object_transfer_handler(&mut self, peer_cid: u64, object_id: u32) -> Option<Option<ObjectTransferHandler>> {
// if self.implicated_cid() == peer_cid {
// // C2S
// self.c2s_file_transfer_handlers.remove(&object_id)
// } else {
// // P2P
// if let Some(peer_connection) = self.peers.get_mut(&peer_cid) {
// peer_connection.handler_map.remove(&object_id)
// }
// else{None}
// }
// }
pub fn add_group_channel(
&mut self,
group_key: MessageGroupKey,
group_channel: GroupConnection,
) {
self.groups.insert(group_key, group_channel);
}

fn take_file_transfer_handle(
&mut self,
Expand Down Expand Up @@ -217,6 +221,7 @@ impl NetKernel for CitadelWorkspaceService {
}

async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> {
info!(target: "citadel", "NODE EVENT RECEIVED WITH MESSAGE: {message:?}");
Tjemmmic marked this conversation as resolved.
Show resolved Hide resolved
match message {
NodeResult::Disconnect(disconnect) => {
if let Some(conn) = disconnect.v_conn_type {
Expand Down Expand Up @@ -343,16 +348,48 @@ impl NetKernel for CitadelWorkspaceService {
);
}
}
NodeResult::PeerEvent(event) => {
if let PeerSignal::Disconnect {
NodeResult::GroupChannelCreated(group_channel_created) => {
let channel = group_channel_created.channel;
let cid = channel.cid();
let key = channel.key();
let (tx, rx) = channel.split();

let mut server_connection_map = self.server_connection_map.lock().await;
if let Some(connection) = server_connection_map.get_mut(&cid) {
connection.add_group_channel(key, GroupConnection { key, tx, cid });

let uuid = connection.associated_tcp_connection;
request_handler::spawn_group_channel_receiver(
key,
cid,
uuid,
rx,
self.tcp_connection_map.clone(),
);

send_response_to_tcp_client(
&self.tcp_connection_map,
InternalServiceResponse::GroupChannelCreateSuccess(
GroupChannelCreateSuccess {
cid,
group_key: key,
request_id: None,
},
),
connection.associated_tcp_connection,
)
.await;
}
}
NodeResult::PeerEvent(event) => match event.event {
PeerSignal::Disconnect {
peer_conn_type:
PeerConnectionType::LocalGroupPeer {
implicated_cid,
peer_cid,
},
disconnect_response: _,
} = event.event
{
} => {
if let Some(conn) = self.clear_peer_connection(implicated_cid, peer_cid).await {
let response = InternalServiceResponse::Disconnected(Disconnected {
cid: implicated_cid,
Expand All @@ -367,6 +404,31 @@ impl NetKernel for CitadelWorkspaceService {
.await;
}
}
PeerSignal::BroadcastConnected {
implicated_cid,
group_broadcast,
} => {
let mut server_connection_map = self.server_connection_map.lock().await;
handle_group_broadcast(
group_broadcast,
implicated_cid,
&mut server_connection_map,
self.tcp_connection_map.clone(),
)
.await;
}
_ => {}
},

NodeResult::GroupEvent(group_event) => {
let mut server_connection_map = self.server_connection_map.lock().await;
handle_group_broadcast(
group_event.event,
group_event.implicated_cid,
&mut server_connection_map,
self.tcp_connection_map.clone(),
)
.await;
}
_ => {}
}
Expand Down Expand Up @@ -478,6 +540,184 @@ fn handle_connection(
});
}

async fn handle_group_broadcast(
group_broadcast: GroupBroadcast,
implicated_cid: u64,
server_connection_map: &mut HashMap<u64, Connection>,
tcp_connection_map: Arc<Mutex<HashMap<Uuid, UnboundedSender<InternalServiceResponse>>>>,
) {
if let Some(connection) = server_connection_map.get_mut(&implicated_cid) {
let response = match group_broadcast {
GroupBroadcast::Invitation {
sender: peer_cid,
key: group_key,
} => Some(InternalServiceResponse::GroupInvitation(GroupInvitation {
cid: implicated_cid,
peer_cid,
group_key,
request_id: None,
})),

GroupBroadcast::RequestJoin {
sender: peer_cid,
key: group_key,
} => connection
.groups
.get_mut(&group_key)
.map(|_group_connection| {
InternalServiceResponse::GroupJoinRequestReceived(GroupJoinRequestReceived {
cid: implicated_cid,
peer_cid,
group_key,
request_id: None,
})
}),

GroupBroadcast::AcceptMembership { target: _, key: _ } => None,

GroupBroadcast::DeclineMembership { target: _, key } => Some(
InternalServiceResponse::GroupRequestDeclined(GroupRequestDeclined {
cid: implicated_cid,
group_key: key,
request_id: None,
}),
),

GroupBroadcast::Message {
sender: peer_cid,
key: group_key,
message,
} => connection
.groups
.get_mut(&group_key)
.map(|_group_connection| {
InternalServiceResponse::GroupMessageReceived(GroupMessageReceived {
cid: implicated_cid,
peer_cid,
message: message.into_buffer(),
group_key,
request_id: None,
})
}),

GroupBroadcast::MessageResponse {
key: group_key,
success,
} => connection
.groups
.get_mut(&group_key)
.map(|_group_connection| {
InternalServiceResponse::GroupMessageResponse(GroupMessageResponse {
cid: implicated_cid,
success,
group_key,
request_id: None,
})
}),

GroupBroadcast::MemberStateChanged {
key: group_key,
state,
} => Some(InternalServiceResponse::GroupMemberStateChanged(
GroupMemberStateChanged {
cid: implicated_cid,
group_key,
state,
request_id: None,
},
)),

GroupBroadcast::LeaveRoomResponse {
key: group_key,
success,
message,
} => Some(InternalServiceResponse::GroupLeft(GroupLeft {
cid: implicated_cid,
group_key,
success,
message,
request_id: None,
})),

GroupBroadcast::EndResponse {
key: group_key,
success,
} => Some(InternalServiceResponse::GroupEnded(GroupEnded {
cid: implicated_cid,
group_key,
success,
request_id: None,
})),

GroupBroadcast::Disconnected { key: group_key } => connection
.groups
.get_mut(&group_key)
.map(|_group_connection| {
InternalServiceResponse::GroupDisconnected(GroupDisconnected {
cid: implicated_cid,
group_key,
request_id: None,
})
}),

GroupBroadcast::AddResponse {
key: _group_key,
failed_to_invite_list: _failed_to_invite_list,
} => None,

GroupBroadcast::AcceptMembershipResponse { key, success } => {
connection.groups.get_mut(&key).map(|_group_connection| {
InternalServiceResponse::GroupMembershipResponse(GroupMembershipResponse {
cid: implicated_cid,
group_key: key,
success,
request_id: None,
})
})
}

GroupBroadcast::KickResponse {
key: _group_key,
success: _success,
} => None,

GroupBroadcast::ListResponse {
groups: _group_list,
} => None,

GroupBroadcast::CreateResponse { key: _group_key } => None,

GroupBroadcast::GroupNonExists { key: _group_key } => None,

GroupBroadcast::RequestJoinPending { result, key } => Some(
InternalServiceResponse::GroupRequestJoinPending(GroupRequestJoinPending {
cid: implicated_cid,
group_key: key,
result,
request_id: None,
}),
),

_ => None,
};
match response {
Some(internal_service_response) => {
if let Some(connection) = server_connection_map.get_mut(&implicated_cid) {
send_response_to_tcp_client(
&tcp_connection_map,
internal_service_response,
connection.associated_tcp_connection,
)
.await;
}
}
None => {
todo!()
}
}
}
}

fn spawn_tick_updater(
object_transfer_handler: ObjectTransferHandler,
implicated_cid: u64,
Expand Down
Loading
Loading