Skip to content

Commit

Permalink
Add generics to InternalServiceConnector
Browse files Browse the repository at this point in the history
  • Loading branch information
tbraun96 committed Mar 31, 2024
1 parent 184a642 commit 3f9a6af
Show file tree
Hide file tree
Showing 50 changed files with 198 additions and 152 deletions.
5 changes: 4 additions & 1 deletion citadel-internal-service-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ tokio = { workspace = true, features = ["net", "rt", "macros"] }
tokio-util = { workspace = true, features = ["codec"] }
bincode2 = { workspace = true }
serde = { workspace = true }
futures = { workspace = true, features = ["alloc"] }
futures = { workspace = true, features = ["alloc"] }
uuid = { workspace = true }
citadel_logging = { workspace = true }
async-trait = "0.1.79"
Original file line number Diff line number Diff line change
@@ -1,56 +1,28 @@
use crate::codec::SerializingCodec;
use crate::io_interface::IOInterface;
use citadel_internal_service_types::{
InternalServicePayload, InternalServiceRequest, InternalServiceResponse,
};
use futures::stream::{SplitSink, SplitStream};
use futures::{Sink, Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::net::TcpStream;
use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec};

pub struct InternalServiceConnector {
pub sink: WrappedSink,
pub stream: WrappedStream,
pub struct InternalServiceConnector<T: IOInterface> {
pub sink: WrappedSink<T>,
pub stream: WrappedStream<T>,
}

pub struct WrappedStream {
inner: SplitStream<Framed<TcpStream, SerializingCodec<InternalServicePayload>>>,
pub struct WrappedStream<T: IOInterface> {
pub inner: T::Stream,
}

pub struct WrappedSink {
inner: SplitSink<
Framed<TcpStream, SerializingCodec<InternalServicePayload>>,
InternalServicePayload,
>,
pub struct WrappedSink<T: IOInterface> {
pub inner: T::Sink,
}

impl InternalServiceConnector {
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Self, Box<dyn std::error::Error>> {
let conn = TcpStream::connect(addr).await?;
let (sink, mut stream) = wrap_tcp_conn(conn).split();
let greeter_packet = stream
.next()
.await
.ok_or("Failed to receive greeting packet")??;
if matches!(
greeter_packet,
InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_))
) {
let stream = WrappedStream { inner: stream };
let sink = WrappedSink { inner: sink };
Ok(Self { sink, stream })
} else {
Err("Failed to receive greeting packet")?
}
}

pub fn split(self) -> (WrappedSink, WrappedStream) {
(self.sink, self.stream)
}
}

impl Stream for WrappedStream {
impl<T: IOInterface> Stream for WrappedStream<T> {
type Item = InternalServiceResponse;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -63,7 +35,7 @@ impl Stream for WrappedStream {
}
}

impl Sink<InternalServiceRequest> for WrappedSink {
impl<T: IOInterface> Sink<InternalServiceRequest> for WrappedSink<T> {
type Error = std::io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::io_interface::IOInterface;
use async_trait::async_trait;
use citadel_internal_service_types::InternalServicePayload;
use citadel_sdk::async_trait;
use futures::Sink;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct InMemoryInterface {
sink: Option<futures::channel::mpsc::UnboundedSender<InternalServicePayload>>,
stream: Option<futures::channel::mpsc::UnboundedReceiver<InternalServicePayload>>,
pub sink: Option<futures::channel::mpsc::UnboundedSender<InternalServicePayload>>,
pub stream: Option<futures::channel::mpsc::UnboundedReceiver<InternalServicePayload>>,
}

#[async_trait]
Expand All @@ -25,7 +25,7 @@ impl IOInterface for InMemoryInterface {
}
}

pub struct InMemorySink(futures::channel::mpsc::UnboundedSender<InternalServicePayload>);
pub struct InMemorySink(pub futures::channel::mpsc::UnboundedSender<InternalServicePayload>);

impl Sink<InternalServicePayload> for InMemorySink {
type Error = std::io::Error;
Expand Down Expand Up @@ -58,7 +58,7 @@ impl Sink<InternalServicePayload> for InMemorySink {
}
}

pub struct InMemoryStream(futures::channel::mpsc::UnboundedReceiver<InternalServicePayload>);
pub struct InMemoryStream(pub futures::channel::mpsc::UnboundedReceiver<InternalServicePayload>);

impl futures::Stream for InMemoryStream {
type Item = std::io::Result<InternalServicePayload>;
Expand Down
13 changes: 13 additions & 0 deletions citadel-internal-service-connector/src/io_interface/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use async_trait::async_trait;
use citadel_internal_service_types::InternalServicePayload;
use futures::{Sink, Stream};

pub mod in_memory;
pub mod tcp;

#[async_trait]
pub trait IOInterface: Sized + Send + 'static {
type Sink: Sink<InternalServicePayload, Error = std::io::Error> + Unpin + Send + 'static;
type Stream: Stream<Item = std::io::Result<InternalServicePayload>> + Unpin + Send + 'static;
async fn next_connection(&mut self) -> Option<(Self::Sink, Self::Stream)>;
}
62 changes: 62 additions & 0 deletions citadel-internal-service-connector/src/io_interface/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::codec::SerializingCodec;
use crate::connector::{wrap_tcp_conn, InternalServiceConnector, WrappedSink, WrappedStream};
use crate::io_interface::IOInterface;
use async_trait::async_trait;
use citadel_internal_service_types::{InternalServicePayload, InternalServiceResponse};
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
use tokio_util::codec::Framed;

pub struct TcpIOInterface {
pub listener: TcpListener,
}

impl TcpIOInterface {
pub async fn new<T: ToSocketAddrs>(bind_address: T) -> std::io::Result<Self> {
let listener = TcpListener::bind(bind_address).await?;
Ok(Self { listener })
}
}

impl InternalServiceConnector<TcpIOInterface> {
pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Self, Box<dyn std::error::Error>> {
let conn = TcpStream::connect(addr).await?;
let (sink, mut stream) = wrap_tcp_conn(conn).split();
let greeter_packet = stream
.next()
.await
.ok_or("Failed to receive greeting packet")??;
if matches!(
greeter_packet,
InternalServicePayload::Response(InternalServiceResponse::ServiceConnectionAccepted(_))
) {
let stream = WrappedStream { inner: stream };
let sink = WrappedSink { inner: sink };
Ok(Self { sink, stream })
} else {
Err("Failed to receive greeting packet")?
}
}

pub fn split(self) -> (WrappedSink<TcpIOInterface>, WrappedStream<TcpIOInterface>) {
(self.sink, self.stream)
}
}

#[async_trait]
impl IOInterface for TcpIOInterface {
type Sink = SplitSink<
Framed<TcpStream, SerializingCodec<InternalServicePayload>>,
InternalServicePayload,
>;
type Stream = SplitStream<Framed<TcpStream, SerializingCodec<InternalServicePayload>>>;

async fn next_connection(&mut self) -> Option<(Self::Sink, Self::Stream)> {
self.listener
.accept()
.await
.ok()
.map(|(stream, _)| wrap_tcp_conn(stream).split())
}
}
4 changes: 3 additions & 1 deletion citadel-internal-service-connector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub mod codec;
pub mod util;
pub mod connector;

pub mod io_interface;
2 changes: 2 additions & 0 deletions citadel-internal-service-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::path::PathBuf;
use std::time::Duration;
use uuid::Uuid;

pub mod service;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ConnectSuccess {
pub cid: u64,
Expand Down
1 change: 1 addition & 0 deletions citadel-internal-service-types/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

37 changes: 0 additions & 37 deletions citadel-internal-service/src/io_interface/tcp.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
use crate::kernel::{send_to_kernel, sink_send_payload, CitadelWorkspaceService, Connection};
use crate::kernel::{send_to_kernel, sink_send_payload, Connection};
use citadel_internal_service_connector::io_interface::IOInterface;
use citadel_internal_service_types::{
InternalServicePayload, InternalServiceRequest, InternalServiceResponse,
ServiceConnectionAccepted,
};
use citadel_logging::{error, info, warn};
use citadel_sdk::async_trait;
use futures::{Sink, Stream, StreamExt};
use futures::StreamExt;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tcp::TcpIOInterface;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::Mutex;
use uuid::Uuid;

pub mod in_memory;
pub mod tcp;

#[async_trait]
pub trait IOInterface: Sized + Send + 'static {
type Sink: Sink<InternalServicePayload, Error = std::io::Error> + Unpin + Send + 'static;
type Stream: Stream<Item = std::io::Result<InternalServicePayload>> + Unpin + Send + 'static;
async fn next_connection(&mut self) -> Option<(Self::Sink, Self::Stream)>;

pub trait IOInterfaceExt: IOInterface {
#[allow(clippy::too_many_arguments)]
fn spawn_connection_handler(
&mut self,
Expand Down Expand Up @@ -84,10 +74,4 @@ pub trait IOInterface: Sized + Send + 'static {
}
}

impl CitadelWorkspaceService<TcpIOInterface> {
pub async fn new_tcp(
bind_address: SocketAddr,
) -> std::io::Result<CitadelWorkspaceService<TcpIOInterface>> {
Ok(TcpIOInterface::new(bind_address).await?.into())
}
}
impl<T: IOInterface> IOInterfaceExt for T {}
58 changes: 51 additions & 7 deletions citadel-internal-service/src/kernel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::io_interface::IOInterface;
use crate::kernel::ext::IOInterfaceExt;
use crate::kernel::requests::{handle_request, HandledRequestResult};
use citadel_internal_service_connector::connector::{
InternalServiceConnector, WrappedSink, WrappedStream,
};
use citadel_internal_service_connector::io_interface::in_memory::{
InMemoryInterface, InMemorySink, InMemoryStream,
};
use citadel_internal_service_connector::io_interface::tcp::TcpIOInterface;
use citadel_internal_service_connector::io_interface::IOInterface;
use citadel_internal_service_types::*;
use citadel_logging::{error, info, warn};
use citadel_sdk::prefabs::ClientServerRemote;
Expand All @@ -9,11 +17,13 @@ use citadel_sdk::prelude::*;
use futures::stream::StreamExt;
use futures::{Sink, SinkExt};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use uuid::Uuid;

pub(crate) mod ext;
pub(crate) mod requests;
pub(crate) mod responses;

Expand Down Expand Up @@ -56,18 +66,52 @@ impl<T: IOInterface> CitadelWorkspaceService<T> {
}
}

impl CitadelWorkspaceService<TcpIOInterface> {
pub async fn new_tcp(
bind_address: SocketAddr,
) -> std::io::Result<CitadelWorkspaceService<TcpIOInterface>> {
Ok(TcpIOInterface::new(bind_address).await?.into())
}
}

impl CitadelWorkspaceService<InMemoryInterface> {
/// Generates an in-memory service connector and kernel. This is useful for programs that do not need
/// networking to connect between the application and the internal service
pub fn new_in_memory() -> (
InternalServiceConnector<InMemoryInterface>,
CitadelWorkspaceService<InMemoryInterface>,
) {
let (tx_to_consumer, rx_from_consumer) = futures::channel::mpsc::unbounded();
let (tx_to_svc, rx_from_svc) = futures::channel::mpsc::unbounded();
let connector = InternalServiceConnector {
sink: WrappedSink {
inner: InMemorySink(tx_to_svc),
},
stream: WrappedStream {
inner: InMemoryStream(rx_from_svc),
},
};
let kernel = InMemoryInterface {
sink: Some(tx_to_consumer),
stream: Some(rx_from_consumer),
}
.into();
(connector, kernel)
}
}

#[allow(dead_code)]
pub struct Connection {
sink_to_server: PeerChannelSendHalf,
client_server_remote: ClientServerRemote,
peers: HashMap<u64, PeerConnection>,
pub sink_to_server: PeerChannelSendHalf,
pub client_server_remote: ClientServerRemote,
pub peers: HashMap<u64, PeerConnection>,
pub(crate) associated_tcp_connection: Uuid,
c2s_file_transfer_handlers: HashMap<u64, Option<ObjectTransferHandler>>,
groups: HashMap<MessageGroupKey, GroupConnection>,
pub c2s_file_transfer_handlers: HashMap<u64, Option<ObjectTransferHandler>>,
pub groups: HashMap<MessageGroupKey, GroupConnection>,
}

#[allow(dead_code)]
struct PeerConnection {
pub struct PeerConnection {
sink: PeerChannelSendHalf,
remote: PeerRemote,
handler_map: HashMap<u64, Option<ObjectTransferHandler>>,
Expand Down
2 changes: 1 addition & 1 deletion citadel-internal-service/src/kernel/requests/connect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::io_interface::IOInterface;
use crate::kernel::requests::HandledRequestResult;
use crate::kernel::{create_client_server_remote, CitadelWorkspaceService, Connection};
use citadel_internal_service_connector::io_interface::IOInterface;
use citadel_internal_service_types::{
ConnectFailure, InternalServiceRequest, InternalServiceResponse, MessageNotification,
};
Expand Down
Loading

0 comments on commit 3f9a6af

Please sign in to comment.