Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions doc/design-documents/async_api/assets/event_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
pub struct Listener<Service>
where
Service: service::Service,
{
listener: iceoryx2::port::listener::Listener<Service>,
io: BridgedFd<RawFdBridge<AsyncSelector>>,
}

impl<Service> Listener<Service>
where
Service: service::Service,
<Service::Event as iceoryx2_cal::event::Event>::Listener: FileDescriptorBased,
{
pub(crate) fn from(listener: iceoryx2::port::listener::Listener<Service>) -> Result<Self, CommonErrors> {
// Safety:
// - This FD is owned by iceoryx2 listener and we don't close it on drop of RawFdBridge
// - The FD is kept along with listener so lifetime is take care of
// - Each Listener has its own FD so no sharing is done in iceoryx2 layer
let fd = unsafe { listener.file_descriptor().native_handle() };

Ok(Self {
listener,
io: BridgedFd::new_with_interest(RawFdBridge::from(fd)?, IoEventInterest::READABLE)?,
})
}

/// Returns the [`UniqueListenerId`] of the [`Listener`]
pub fn id(&self) -> UniqueListenerId {
self.listener.id()
}

/// Returns the deadline of the corresponding [`Service`](crate::service::Service).
pub fn deadline(&self) -> Option<Duration> {
self.listener.deadline()
}

/// Async wait for a new [`EventId`]. On error it returns [`ListenerWaitError`] is returned which describes
/// the error in detail.
pub async fn wait_one(&self) -> Result<EventId, ListenerWaitError> {
self.io
.async_call(IoEventInterest::READABLE, |raw_fd| {
raw_fd.io_call(|fd| {
info!("Checking for Iceoryx event on fd: {}", fd);
self.wait_one_internal()
})
})
.await
.map_err(|_| ListenerWaitError::InternalFailure)
.and_then(|r| match r {
Ok(event) => Ok(event),
Err(e) => Err(e),
})
}

fn wait_one_internal(&self) -> IoResult<Result<EventId, ListenerWaitError>> {
loop {
match self.listener.try_wait_one() {
Ok(event) if event.is_some() => return Ok(Ok(event.unwrap())),
Ok(_) => {
// This is None, so there was and error, probably EAGAIN or EWOULDBLOCK
if std::io::Error::last_os_error().kind() == std::io::ErrorKind::WouldBlock {
error!("Iceoryx listener would block, should do re-register!... {}", unsafe {
self.listener.file_descriptor().native_handle()
});
return Err(std::io::ErrorKind::WouldBlock.into());
} else {
panic!("Something went wrong!");
}
}
Err(ListenerWaitError::InterruptSignal) => {
continue;
}
Err(e) => {
error!("Error waiting for Iceoryx event: {}", e);
return Ok(Err(e));
}
}
}
}
}

impl<T> Drop for Listener<T>
where
T: service::Service,
{
fn drop(&mut self) {
// Leave the underlying fd open, as we don't own it and let iceoryx2 handle it
self.io.close_on_drop(false);
}
}

254 changes: 254 additions & 0 deletions doc/design-documents/async_api/async_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# Async API Design Document

## Remarks

For this document currently we describe `Events` and `PubSub` however this analogy shall continue for other messaging patterns later on.

## Terminology

## Overview

A high-level pitch of the feature:

- The new `async` API will provide non_blocking, but still linear, behavior in the code thanks to `async/await`
- The new `async` API will solve a usage of `iceoryx2` API in `async` code which currently need to be handmade manually either via:
- polling and repetition logic within functions, causing additional not needed code on user side
- custom WaitSet usage in separate thread and a bridge to `async` world.
- or some other custom work
- The `async` usage in Rust is already well established technique that is adopted by many crates, including those with highest usage

## Requirements

- **R1: Async API look and feel** - The new `async` API shall provide the same look and feel as standard one

## Use Cases

### Use-Case 1: Waiting for an event

- **As a** developer
- **I want** to wait on `event` API
- **So that** it does not block current thread and continues only once event is delivered

### Use-Case 2: Waiting for a new sample

- **As a** developer
- **I want** to wait on `new sample` in pub-sub
- **So that** it does not block current thread and continues only once sample is delivered

## Usage

### Example: Await on events

```rust
let node = NodeBuilder::new().create::<ipc_threadsafe::Service>().unwrap();

let event = node
.service_builder(&"MyEventName".try_into().unwrap())
.event()
.open_or_create()
.unwrap();

let listener = event.listener_builder().create_async().unwrap();

println!("Awaiting for Iceoryx event in batches while doing something else ...");

listener
.wait_all(&mut |event_id| {
print!("Received Iceoryx event: {:?}\n", event_id);
})
.await
.unwrap();
```

## Implementation

### Achieving async API

Since all iceoryx2 messaging patterns (except `Event`) are poll based, we need to pair them them with the `Event` to achieve possibility
to react in `async` API only on change of data. This means:

- `PubSub` should be paired with `Event`
- `RequestResponse` should be paired with `Event`
- and so on

Due to this, further document assumes direct usage of high level `Event` messaging pattern to facilitate the feature.

> DISCLAIMER: IT may be that iceoryx2 authors will have better idea than that. The only issue I do see now is probably impact on
> zero-trust deployment during configuration where ie. async PubSub Producer shall also have rights to be Event Notifier on some connected
> topic.

### Split of code

The main idea is to split source code into two parts:

1. The Event implementation that is both `OS` dependent and `runtime` dependent since it incurs some IO call
2. All the other API that do need `Event` implementation, but the rest is purely `runtime` independent and can be pure `async`

To facilitate above, below class diagram is showing one of solution

![Alt text](new_classes.svg)

```plantuml
package iceoryx2 {

interface AsyncListenerTrait<Trait> {
' /// Create new instance of async Listener
+ fn new(listener: iceoryx2::Listener) -> Self;
' /// Returns the [`UniqueListenerId`] of the [`Listener`]
+ fn id(&self) -> UniqueListenerId;
' /// Returns the deadline of the corresponding [`Service`](crate::service::Service).
+ fn deadline(&self) -> Option<Duration>;
' /// Async wait for a new [`EventId`]. On error it returns [`AsyncListenerWaitError`] is returned which describes
' /// the error in detail.
+ fn wait_one(&self) -> impl Future<Output = Result<EventId, AsyncListenerWaitError>>;
' /// Async wait for new [`EventId`]s. Collects all [`EventId`]s that were received and
' /// calls the provided callback is with the [`EventId`] as input argument. This will `await` until callback is called at least once
+ fn wait_all<F: FnMut(EventId)>(&self, callback: &mut F) -> impl Future<Output = Result<(), AsyncListenerWaitError>>;
}

enum AsyncListenerWaitError {
ContractViolation = ListenerWaitError::ContractViolation,
InternalFailure = ListenerWaitError::InternalFailure,
...
}


class AsyncSubscriber <<Service,Payload,UserHeader, EventListener: AsyncListenerTrait>> {
sync_subscriber: iceoryx2::port::subscriber::Subscriber<Service, Payload, UserHeader>,
listener: EventListener,

+ API follows as in usual Subscriber ie:
+ async fn receive(&self) -> Result<Sample<Service, Payload, UserHeader>, AsyncReceiveError>

}

class AsyncConnector<Trait> {
type ListenerType: AsyncListenerTrait;

}

note top of AsyncConnector
Let us group external async properties in one place.
This will come in handy if it happens
that we need to know more unknown types that depend on runtime implementation.
Good example can be `Sleep`.
end note

}

package "Some Runtime / Runtime Adapter Crate" {

class AsyncListener {
- listener: iceoryx2::Listener
}

AsyncListener ..|> AsyncListenerTrait
AsyncSubscriber --> AsyncListenerTrait
}

```

### Building objects

The next step is to provide a way to build objects that provide `async` API.

There are two approaches that can be chosen for implementation:

##### 1. Use custom event for each pair (messaging patern, data type) based on `ServiceName`

###### Pros

- all messaging pattern in service will work as usual
- no limitations

###### Cons

- dynamic Service creation to obtain event for each messaging pattern, like for `ServiceNameABC` (PubSub, int) we need to create also
internally service `ServiceNameABC/__internal_pubsub_event` to obtain event for notifications

##### 2. Use event from the service `ServiceName`

###### Pros

- no need to create dynamic event name

###### Cons

- limit a service to only single messaging pattern as using event will cause no way to use it again

Considering above, continuation is done based on option 1. Below shows only small snippet where extension for
creating object can be placed.

![Alt text](port_factory.svg)

```plantuml

class PortFactory {
...
+ fn create_**subscriber/listener**_async<T: AsyncConnector>()
}

note top of PortFactory
Extend each **messaging_pattern* PortFactory by creational method with _async suffix.
This can be done directly or over **some trait** definition.
The **PortFactory** does have access to **service** itself so it shall be possible
to create required instance of internal Event along with wrapping using **T**
end note

```

### Implementation

#### AsyncListener - `messaging-patter == event`

This is purely `runtime` specific implementation but is currently doable with `non-blocking` api of `Listener`. Working example: [Code sample](assets/event_example.rs)
During implementation it may come beneficial to either expose some properties (of current listener) or `add` new sync api with different signature.

#### AsyncSubscriber - `messaging-patter == pubsub`

Pure `async` implementation not using any specifics of `runtime` shall be doable. In case some unexpected dependency will be needed it has to
be exposed over defined abstraction same as `AsyncListenerTrait`

#### Builder

Implement the `Builders` extensions to they can provide `async` versions of objects

## Certification & Safety-Critical Usage

Answer:

- Applicable standards (e.g., ASIL-D, ISO 26262)
- Support for **zero-trust deployments**: can rogue processes break it?
- Evidence for claims (e.g., subscribers cannot corrupt publisher-owned
read-only memory)
- Real-time suitability: any blocking calls, background threads, or indeterminism?
- If unsuitable for zero-trust or real-time use, how do we prevent accidental misuse?

PR: To me idea, we will build async API on top of existing non-blocking, non async implementation and the API will be just a thin wrapper.
The connection to specific `runtime` is outside of project and in gesture of specific `runtime` to guarantee any of above.

## Milestones

### Milestone 1 – Provide Traits and object skeletons

- TBD later

**Results:**

- User will see that there is ongoing work on `async` API

### Milestone 2 – Implement Event in Async Runtime

- TBD later

**Results:**

- User will get first support for async API for specific runtime. This will also open a way for other to implement a bridge to other runtime like `tokio` as basic idea will be shown

### Milestone 2 – Implement PubSub

- TBD later

**Results:**

- PubSub API will have `async` API available
Loading
Loading