Skip to content

Commit

Permalink
Bugfixing - nonblocking code
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Feb 9, 2022
1 parent 9c06a74 commit 3e822e7
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 75 deletions.
9 changes: 0 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ pub mod nonblocking {
fn recv(&mut self) -> Self::RecvFuture<'_>;
}

pub trait Channel: Service {
type Data;

type Sender: Sender<Data = Self::Data, Error = Self::Error>;
type Receiver: Receiver<Data = Self::Data, Error = Self::Error>;

fn split(self) -> (Self::Sender, Self::Receiver);
}

// TODO: Not clear yet if necessary
// pub mod adapter {
// use core::future::Future;
Expand Down
6 changes: 4 additions & 2 deletions src/mqtt/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ pub mod nonblocking {
/// core.stream.Stream is not stable yet and on top of that it has an Item which is not
/// parameterizable by lifetime (GATs). Therefore, we have to use a Future instead
pub trait Connection: Service {
type Message: Message;
type Message<'a>: Message
where
Self: 'a;

type Reference<'a>: Deref<Target = Option<Result<Event<Self::Message>, Self::Error>>>
type Reference<'a>: Deref<Target = Option<Result<Event<Self::Message<'a>>, Self::Error>>>
where
Self: 'a;

Expand Down
187 changes: 123 additions & 64 deletions src/utils/nonblocking/mqtt/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use core::fmt::{Debug, Display};
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::ops::Deref;
use core::pin::Pin;
Expand Down Expand Up @@ -50,87 +51,98 @@ where
}
}

pub struct Payload<M, E>
where
M: Message,
{
event: Option<Option<Result<Event<M>, E>>>,
pub struct Payload {
event: Option<*const core::ffi::c_void>,
waker: Option<Waker>,
processed: bool,
}

struct ConnectionState<CV, M, E>
unsafe impl Send for Payload {}
unsafe impl Sync for Payload {}

struct ConnectionState<CV>
where
CV: Condvar,
M: Message + Send,
E: Send,
{
payload: CV::Mutex<Payload<M, E>>,
payload: CV::Mutex<Payload>,
processed: CV,
}

#[derive(Clone)]
pub struct Connection<CV, M, E>(Arc<ConnectionState<CV, M, E>>)
where
CV: Condvar,
M: Message + Send,
E: Send;

pub struct NextFuture<'a, CV, M, E>(&'a ConnectionState<CV, M, E>)
pub struct EventRef<'a, CV, M, E>
where
CV: Condvar,
M: Message + Send,
E: Send;
<CV as Condvar>::Mutex<Payload>: 'a,
M: Message + 'a,
E: 'a,
{
payload: <<CV as Condvar>::Mutex<Payload> as Mutex>::Guard<'a>,
_message: PhantomData<fn() -> M>,
_error: PhantomData<fn() -> E>,
}

impl<'a, CV, M, E> Drop for NextFuture<'a, CV, M, E>
impl<'a, CV, M, E> Deref for EventRef<'a, CV, M, E>
where
CV: Condvar,
M: Message + Send,
E: Send,
<CV as Condvar>::Mutex<Payload>: 'a,
M: Message + 'a,
E: 'a,
{
fn drop(&mut self) {
let mut payload = self.0.payload.lock();
type Target = Option<Result<Event<M>, E>>;

payload.event = None;
payload.waker = None;
fn deref(&self) -> &Self::Target {
let event = self.payload.event.unwrap() as *const Self::Target;

self.0.processed.notify_all();
unsafe { event.as_ref().unwrap() }
}
}

pub struct EventRef<'a, CV, M, E>(<<CV as Condvar>::Mutex<Payload<M, E>> as Mutex>::Guard<'a>)
pub struct NextFuture<'a, CV, M, E>
where
CV: Condvar,
<CV as Condvar>::Mutex<Payload<M, E>>: 'a,
M: Message + Send + 'a,
E: Send + 'a;
CV: Condvar + 'a,
M: Message + 'a,
E: Send + 'a,
{
connection_state: &'a ConnectionState<CV>,
_message: PhantomData<fn() -> M>,
_error: PhantomData<fn() -> E>,
}

impl<'a, CV, M, E> Deref for EventRef<'a, CV, M, E>
impl<'a, CV, M, E> Drop for NextFuture<'a, CV, M, E>
where
CV: Condvar,
<CV as Condvar>::Mutex<Payload<M, E>>: 'a,
M: Message + Send + 'a,
CV: Condvar + 'a,
M: Message + 'a,
E: Send + 'a,
{
type Target = Option<Result<Event<M>, E>>;
fn drop(&mut self) {
let mut payload = self.connection_state.payload.lock();

fn deref(&self) -> &Self::Target {
self.0.event.as_ref().unwrap()
payload.waker = None;

if payload.processed {
payload.event = None;
self.connection_state.processed.notify_all();
}
}
}

impl<'a, CV, M, E> Future for NextFuture<'a, CV, M, E>
where
CV: Condvar,
M: Message + Send,
E: Send,
CV: Condvar + 'a,
M: Message + 'a,
E: Send + 'a,
{
type Output = EventRef<'a, CV, M, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut payload = self.0.payload.lock();
let mut payload = self.connection_state.payload.lock();

if payload.event.is_some() {
Poll::Ready(EventRef(payload))
payload.processed = true;
Poll::Ready(EventRef {
payload,
_message: PhantomData,
_error: PhantomData,
})
} else {
payload.waker = Some(cx.waker().clone());

Expand All @@ -139,54 +151,93 @@ where
}
}

pub struct Connection<CV, M, E>
where
CV: Condvar,
M: Message,
E: Send,
{
connection_state: Arc<ConnectionState<CV>>,
_message: PhantomData<fn() -> M>,
_error: PhantomData<fn() -> E>,
}

impl<CV, M, E> Connection<CV, M, E>
where
CV: Condvar,
M: Message + Send,
M: Message,
E: Send,
{
pub fn new() -> Self {
Self(Arc::new(ConnectionState {
payload: CV::Mutex::new(Payload {
event: None,
waker: None,
Self {
connection_state: Arc::new(ConnectionState {
payload: CV::Mutex::new(Payload {
event: None,
waker: None,
processed: false,
}),
processed: CV::new(),
}),
processed: CV::new(),
}))
_message: PhantomData,
_error: PhantomData,
}
}

pub fn post(&self, event: Option<Result<Event<M>, E>>) {
let mut payload = self.0.payload.lock();
pub fn post<'a>(&'a self, event: Option<Result<Event<M>, E>>)
where
M: 'a,
E: 'a,
{
let mut payload = self.connection_state.payload.lock();

while payload.event.is_some() {
payload = self.0.processed.wait(payload);
payload = self.connection_state.processed.wait(payload);
}

payload.event = Some(event);
payload.event = Some(&event as *const _ as *const _);

let waker = mem::replace(&mut payload.waker, None);

if let Some(waker) = waker {
waker.wake();
}

while payload.event.is_some() {
payload = self.connection_state.processed.wait(payload);
}
}
}

impl<CV, M, E> Default for Connection<CV, M, E>
where
CV: Condvar,
M: Message + Send,
M: Message,
E: Send,
{
fn default() -> Self {
Self::new()
}
}

impl<CV, M, E> Clone for Connection<CV, M, E>
where
CV: Condvar,
M: Message,
E: Send,
{
fn clone(&self) -> Self {
Self {
connection_state: self.connection_state.clone(),
_message: PhantomData,
_error: PhantomData,
}
}
}

impl<CV, M, E> crate::service::Service for Connection<CV, M, E>
where
CV: Condvar,
M: Message + Send,
M: Message,
E: Debug + Display + Send + Sync + 'static,
{
type Error = E;
Expand All @@ -195,25 +246,33 @@ where
impl<CV, M, E> crate::mqtt::client::nonblocking::Connection for Connection<CV, M, E>
where
CV: Condvar,
M: Message + Send,
M: Message,
E: Debug + Display + Send + Sync + 'static,
{
type Message = M;
type Message<'a>
where
CV: 'a,
M: 'a,
= M;

type Reference<'a>
where
CV: 'a,
<CV as Condvar>::Mutex<Payload<M, E>>: 'a,
M: Message + Send + 'a,
<CV as Condvar>::Mutex<Payload>: 'a,
M: Message + 'a,
E: Send + 'a,
= EventRef<'a, CV, Self::Message, Self::Error>;
= EventRef<'a, CV, Self::Message<'a>, Self::Error>;

type NextFuture<'a>
where
Self: 'a,
= NextFuture<'a, CV, Self::Message, Self::Error>;
= NextFuture<'a, CV, Self::Message<'a>, Self::Error>;

fn next(&mut self) -> Self::NextFuture<'_> {
NextFuture(&self.0)
NextFuture {
connection_state: &self.connection_state,
_message: PhantomData,
_error: PhantomData,
}
}
}

0 comments on commit 3e822e7

Please sign in to comment.