Skip to content

Commit

Permalink
Select links based on message Priority & Reliability (#1398)
Browse files Browse the repository at this point in the history
* Add wip `QoS`-based priority-to-link dispatch impl

* Improve `QoS` state machine

* Add `PriorityRange` negotiation tests

* Refactor link selection function

* Minor edits

* Add Link selectioh tests

* Minor edits

* More minor edits

* Never disobey Clippy

* Implement Reliability negotiation

* Apply negotiated Reliability to Link config

* Document Endpoint `reliability` metadata

* I'm sorry Clippy

* Make `PriorityRange` inclusive

* Clippy lints are inevitable

* Make Reliability negotiation stricter

* Refactor negotiation tests

* We are still not `core::error::Error`

* Use `RangeInclusive`

* Clippy at it again

* Split `State` into `StateOpen` and `StateAccept`

* Remove `NewLinkUnicast`

* Fix test typos

* Patch `Link::src` and `Link::dst` with negotiated metadata

* Optimize `QoS` extension overhead

* Implement `Display` instead of `ToString` for `PriorityRange`

* Fix typo (metdata -> metadata)

* Fix `n_exts` in `INIT` codec

* Add missing `'static` lifetime in const

* Don't compare `Link` to `TransportLinkUnicast`

* Don't set Link Reliability if not configured

* Update DEFAULT_CONFIG

* Move metadata docs to `Endpoint`

* Add Endpoint examples

* Fix doc list items without indentation

* Update Endpoint links in DEFAULT_CONFIG

* Change `x..=y` syntax to `x-y`

* Connect to multiple links with distinct priorities when scouting (wip)

* Add `LocatorInspector::is_reliable`

* Compare locator Reliability even if not set

* Apply `cargo +stable clippy --fix`

* Get transport at each locator iteration

* Don't try to connect to peer whaile already connecting to it

* Fix return value of `Runtime::connect`

* Fix `PriorityRange` formatting

---------

Co-authored-by: OlivierHecart <olivier.hecart@adlinktech.com>
  • Loading branch information
fuzzypixelz and OlivierHecart authored Sep 24, 2024
1 parent bf010dc commit 965e905
Show file tree
Hide file tree
Showing 42 changed files with 1,511 additions and 309 deletions.
16 changes: 10 additions & 6 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
/// The list of endpoints to connect to.
/// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }).
///
/// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html
endpoints: [
// "<proto>/<address>"
],
Expand Down Expand Up @@ -67,6 +69,8 @@
/// The list of endpoints to listen on.
/// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"])
/// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }).
///
/// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html
endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] },

/// Global listen configuration,
Expand Down Expand Up @@ -333,11 +337,11 @@
},
},
link: {
/// An optional whitelist of protocols to be used for accepting and opening sessions.
/// If not configured, all the supported protocols are automatically whitelisted.
/// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"]
/// For example, to only enable "tls" and "quic":
// protocols: ["tls", "quic"],
/// An optional whitelist of protocols to be used for accepting and opening sessions. If not
/// configured, all the supported protocols are automatically whitelisted. The supported
/// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For
/// example, to only enable "tls" and "quic": protocols: ["tls", "quic"],
///
/// Configure the zenoh TX parameters of a link
tx: {
/// The resolution in bits to be used for the message sequence numbers.
Expand Down Expand Up @@ -394,7 +398,7 @@
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
26 changes: 26 additions & 0 deletions commons/zenoh-codec/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -59,6 +60,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -98,6 +100,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -173,6 +179,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -190,6 +197,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -229,6 +241,7 @@ where
resolution,
batch_size,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -255,6 +268,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand All @@ -269,6 +283,7 @@ where
header |= flag::S;
}
let mut n_exts = (ext_qos.is_some() as u8)
+ (ext_qos_optimized.is_some() as u8)
+ (ext_auth.is_some() as u8)
+ (ext_mlink.is_some() as u8)
+ (ext_lowlatency.is_some() as u8)
Expand Down Expand Up @@ -311,6 +326,10 @@ where
n_exts -= 1;
self.write(&mut *writer, (qos, n_exts != 0))?;
}
if let Some(qos_optimized) = ext_qos_optimized.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (qos_optimized, n_exts != 0))?;
}
#[cfg(feature = "shared-memory")]
if let Some(shm) = ext_shm.as_ref() {
n_exts -= 1;
Expand Down Expand Up @@ -389,6 +408,7 @@ where

// Extensions
let mut ext_qos = None;
let mut ext_qos_optimized = None;
#[cfg(feature = "shared-memory")]
let mut ext_shm = None;
let mut ext_auth = None;
Expand All @@ -406,6 +426,11 @@ where
ext_qos = Some(q);
has_ext = ext;
}
ext::QoSOptimized::ID => {
let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?;
ext_qos_optimized = Some(q);
has_ext = ext;
}
#[cfg(feature = "shared-memory")]
ext::Shm::ID => {
let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?;
Expand Down Expand Up @@ -446,6 +471,7 @@ where
batch_size,
cookie,
ext_qos,
ext_qos_optimized,
#[cfg(feature = "shared-memory")]
ext_shm,
ext_auth,
Expand Down
26 changes: 24 additions & 2 deletions commons/zenoh-protocol/src/core/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ impl fmt::Debug for AddressMut<'_> {
pub struct Metadata<'a>(pub(super) &'a str);

impl<'a> Metadata<'a> {
pub const RELIABILITY: &'static str = "reliability";
pub const PRIORITIES: &'static str = "priorities";

pub fn as_str(&self) -> &'a str {
self.0
}
Expand Down Expand Up @@ -443,10 +446,29 @@ impl fmt::Debug for ConfigMut<'_> {

/// A string that respects the [`EndPoint`] canon form: `<locator>[#<config>]`.
///
/// `<locator>` is a valid [`Locator`] and `<config>` is of the form `<key1>=<value1>;...;<keyN>=<valueN>` where keys are alphabetically sorted.
/// `<config>` is optional and can be provided to configure some aspectes for an [`EndPoint`], e.g. the interface to listen on or connect to.
/// `<locator>` is a valid [`Locator`] and `<config>` is of the form
/// `<key1>=<value1>;...;<keyN>=<valueN>` where keys are alphabetically sorted. `<config>` is
/// optional and can be provided to configure some aspects for an [`EndPoint`], e.g. the interface
/// to listen on or connect to.
///
/// A full [`EndPoint`] string is hence in the form of `<proto>/<address>[?<metadata>][#config]`.
///
/// ## Metadata
///
/// - **`priorities`**: a range bounded inclusively below and above (e.g. `2-4` signifies
/// priorities 2, 3 and 4). This value is used to select the link used for transmission based on the
/// Priority of the message in question.
///
/// For example, `tcp/localhost:7447?priorities=1-3` assigns priorities
/// [`crate::core::Priority::RealTime`], [`crate::core::Priority::InteractiveHigh`] and
/// [`crate::core::Priority::InteractiveLow`] to the established link.
///
/// - **`reliability`**: either "best_effort" or "reliable". This value is used to select the link
/// used for transmission based on the Reliability of the message in question.
///
/// For example, `tcp/localhost:7447?priorities=6-7;reliability=best_effort` assigns priorities
/// [`crate::core::Priority::DataLow`] and [`crate::core::Priority::Background`], and
/// [`crate::core::Reliability::BestEffort`] to the established link.
#[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(into = "String")]
#[serde(try_from = "String")]
Expand Down
4 changes: 4 additions & 0 deletions commons/zenoh-protocol/src/core/locator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl Locator {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

pub fn to_endpoint(&self) -> EndPoint {
self.0.clone()
}
}

impl From<EndPoint> for Locator {
Expand Down
Loading

0 comments on commit 965e905

Please sign in to comment.