Skip to content

Commit

Permalink
Routing refactoring I (#1486)
Browse files Browse the repository at this point in the history
* Make `Debug` impl of `WireExpr` always show all fields

* Improve `INTEREST` message description

* Add `RemoteInterest` struct
  • Loading branch information
fuzzypixelz authored Sep 30, 2024
1 parent d60ff4d commit 09d1d1d
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 117 deletions.
12 changes: 1 addition & 11 deletions commons/zenoh-protocol/src/core/wire_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub const EMPTY_EXPR_ID: ExprId = 0;
// ~ suffix ~ if flag K==1 in Message's header
// +---------------+
//
#[derive(PartialEq, Eq, Hash, Clone)]
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct WireExpr<'a> {
pub scope: ExprId, // 0 marks global scope
pub suffix: Cow<'a, str>,
Expand Down Expand Up @@ -178,16 +178,6 @@ impl<'a> From<&'a keyexpr> for WireExpr<'a> {
}
}

impl fmt::Debug for WireExpr<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.scope == 0 {
write!(f, "{}", self.suffix)
} else {
write!(f, "{}:{:?}:{}", self.scope, self.mapping, self.suffix)
}
}
}

impl fmt::Display for WireExpr<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.scope == 0 {
Expand Down
51 changes: 36 additions & 15 deletions commons/zenoh-protocol/src/network/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod flag {
///
/// The behaviour of a INTEREST depends on the INTEREST MODE.
///
/// E.g., the message flow is the following for an [`Interest`] with mode `Current`:
/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]:
///
/// ```text
/// A B
Expand All @@ -51,7 +51,7 @@ pub mod flag {
/// | |
/// ```
///
/// And the message flow is the following for an [`Interest`] with mode `CurrentFuture`:
/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]:
///
/// ```text
/// A B
Expand All @@ -78,8 +78,30 @@ pub mod flag {
/// | INTEREST FINAL |
/// |------------------>| -- Mode: Final
/// | | This stops the transmission of subscriber declarations/undeclarations.
/// | |
/// | |
/// ```
///
/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]:
///
/// ```text
/// A B
/// | INTEREST |
/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// | UNDECL SUBSCRIBER |
/// |<------------------| -- With interest_id field not set
/// | |
/// | ... |
/// | |
/// | INTEREST FINAL |
/// |------------------>| -- Mode: Final
/// | | This stops the transmission of subscriber declarations/undeclarations.
/// | |
/// ```
///
/// ```text
/// Flags:
/// - |: Mode The mode of the interest*
/// -/
Expand All @@ -100,7 +122,7 @@ pub mod flag {
/// ~ [int_exts] ~ if Z==1
/// +---------------+
///
/// *Mode of declaration:
/// Mode of declaration:
/// - Mode 0b00: Final
/// - Mode 0b01: Current
/// - Mode 0b10: Future
Expand All @@ -116,6 +138,16 @@ pub mod flag {
/// If R==0 then M should be set to 0.
/// - if A==1 then the replies SHOULD be aggregated
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Interest {
pub id: InterestId,
pub mode: InterestMode,
pub options: InterestOptions,
pub wire_expr: Option<WireExpr<'static>>,
pub ext_qos: ext::QoSType,
pub ext_tstamp: Option<ext::TimestampType>,
pub ext_nodeid: ext::NodeIdType,
}

/// The resolution of a RequestId
pub type DeclareRequestId = u32;
Expand Down Expand Up @@ -146,17 +178,6 @@ impl InterestMode {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Interest {
pub id: InterestId,
pub mode: InterestMode,
pub options: InterestOptions,
pub wire_expr: Option<WireExpr<'static>>,
pub ext_qos: ext::QoSType,
pub ext_tstamp: Option<ext::TimestampType>,
pub ext_nodeid: ext::NodeIdType,
}

pub mod ext {
use crate::{
common::{ZExtZ64, ZExtZBuf},
Expand Down
23 changes: 23 additions & 0 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

use std::{
fmt,
sync::{Arc, Weak},
time::Duration,
};
Expand Down Expand Up @@ -49,6 +50,28 @@ pub(crate) struct CurrentInterest {
pub(crate) mode: InterestMode,
}

#[derive(PartialEq, Clone)]
pub(crate) struct RemoteInterest {
pub(crate) res: Option<Arc<Resource>>,
pub(crate) options: InterestOptions,
pub(crate) mode: InterestMode,
}

impl fmt::Debug for RemoteInterest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RemoteInterest")
.field("res", &self.res.as_ref().map(|res| res.expr()))
.field("options", &self.options)
.finish()
}
}

impl RemoteInterest {
pub(crate) fn matches(&self, res: &Arc<Resource>) -> bool {
self.res.as_ref().map(|r| r.matches(res)).unwrap_or(true)
}
}

pub(crate) fn declare_final(
face: &mut Arc<FaceState>,
id: InterestId,
Expand Down
20 changes: 14 additions & 6 deletions zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::{face_hat, face_hat_mut, token::declare_token_interest, HatCode, HatF
use crate::net::routing::{
dispatcher::{
face::{FaceState, InterestState},
interests::{CurrentInterest, CurrentInterestCleanup},
interests::{CurrentInterest, CurrentInterestCleanup, RemoteInterest},
resource::Resource,
tables::{Tables, TablesLock},
},
Expand All @@ -43,7 +43,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc<FaceState>)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
for (res, options) in face_hat_mut!(&mut src_face).remote_interests.values() {
for RemoteInterest { res, options, .. } in
face_hat_mut!(&mut src_face).remote_interests.values()
{
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(face).local_interests.insert(
id,
Expand Down Expand Up @@ -94,9 +96,14 @@ impl HatInterestTrait for HatCode {
send_declare,
)
}
face_hat_mut!(face)
.remote_interests
.insert(id, (res.as_ref().map(|res| (*res).clone()), options));
face_hat_mut!(face).remote_interests.insert(
id,
RemoteInterest {
res: res.as_ref().map(|res| (*res).clone()),
options,
mode,
},
);

let interest = Arc::new(CurrentInterest {
src_face: face.clone(),
Expand Down Expand Up @@ -198,7 +205,8 @@ impl HatInterestTrait for HatCode {
.collect::<Vec<InterestId>>()
{
let local_interest = dst_face.local_interests.get(&id).unwrap();
if local_interest.res == interest.0 && local_interest.options == interest.1
if local_interest.res == interest.res
&& local_interest.options == interest.options
{
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use token::{token_new_face, undeclare_simple_token};
use zenoh_config::WhatAmI;
use zenoh_protocol::network::{
declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId},
interest::{InterestId, InterestOptions},
interest::InterestId,
Oam,
};
use zenoh_result::ZResult;
Expand All @@ -48,7 +48,7 @@ use super::{
};
use crate::net::{
routing::{
dispatcher::face::Face,
dispatcher::{face::Face, interests::RemoteInterest},
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
},
runtime::Runtime,
Expand Down Expand Up @@ -296,7 +296,7 @@ impl HatContext {

struct HatFace {
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestOptions)>,
remote_interests: HashMap<InterestId, RemoteInterest>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_qabls: HashMap<Arc<Resource>, (QueryableId, QueryableInfoType)>,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn propagate_forget_simple_token(
} else if face_hat!(face)
.remote_interests
.values()
.any(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true))
.any(|i| i.options.tokens() && i.matches(res))
{
// Token has never been declared on this face.
// Send an Undeclare with a one shot generated id and a WireExpr ext.
Expand Down
12 changes: 9 additions & 3 deletions zenoh/src/net/routing/hat/linkstate_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::{
use crate::net::routing::{
dispatcher::{
face::FaceState,
interests::RemoteInterest,
resource::Resource,
tables::{Tables, TablesLock},
},
Expand Down Expand Up @@ -80,9 +81,14 @@ impl HatInterestTrait for HatCode {
)
}
if mode.future() {
face_hat_mut!(face)
.remote_interests
.insert(id, (res.cloned(), options));
face_hat_mut!(face).remote_interests.insert(
id,
RemoteInterest {
res: res.cloned(),
options,
mode,
},
);
}
if mode.current() {
send_declare(
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use zenoh_protocol::{
core::ZenohIdProto,
network::{
declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId},
interest::{InterestId, InterestOptions},
interest::InterestId,
oam::id::OAM_LINKSTATE,
Oam,
},
Expand All @@ -56,7 +56,7 @@ use crate::net::{
codec::Zenoh080Routing,
protocol::linkstate::LinkStateList,
routing::{
dispatcher::face::Face,
dispatcher::{face::Face, interests::RemoteInterest},
hat::TREES_COMPUTATION_DELAY_MS,
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
},
Expand Down Expand Up @@ -489,7 +489,7 @@ impl HatContext {
struct HatFace {
link_id: usize,
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_interests: HashMap<InterestId, (Option<Arc<Resource>>, InterestOptions)>,
remote_interests: HashMap<InterestId, RemoteInterest>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_tokens: HashMap<Arc<Resource>, SubscriberId>,
Expand Down
16 changes: 10 additions & 6 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use zenoh_protocol::{
common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId,
UndeclareSubscriber,
},
interest::{InterestId, InterestMode, InterestOptions},
interest::{InterestId, InterestMode},
},
};
use zenoh_sync::get_mut_unchecked;
Expand All @@ -39,6 +39,7 @@ use crate::key_expr::KeyExpr;
use crate::net::routing::{
dispatcher::{
face::FaceState,
interests::RemoteInterest,
pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo},
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
Expand Down Expand Up @@ -129,13 +130,16 @@ fn propagate_simple_subscription_to(
let matching_interests = face_hat!(dst_face)
.remote_interests
.values()
.filter(|(r, o)| {
o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)
})
.filter(|i| i.options.subscribers() && i.matches(res))
.cloned()
.collect::<Vec<(Option<Arc<Resource>>, InterestOptions)>>();
.collect::<Vec<_>>();

for (int_res, options) in matching_interests {
for RemoteInterest {
res: int_res,
options,
..
} in matching_interests
{
let res = if options.aggregate() {
int_res.as_ref().unwrap_or(res)
} else {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn propagate_simple_queryable(
&& face_hat!(dst_face)
.remote_interests
.values()
.any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true))
.any(|i| i.options.queryables() && i.matches(res))
{
let id = current
.map(|c| c.0)
Expand Down
17 changes: 11 additions & 6 deletions zenoh/src/net/routing/hat/linkstate_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use zenoh_protocol::{
network::{
declare::{common::ext::WireExprType, TokenId},
ext,
interest::{InterestId, InterestMode, InterestOptions},
interest::{InterestId, InterestMode},
Declare, DeclareBody, DeclareToken, UndeclareToken,
},
};
Expand All @@ -31,7 +31,7 @@ use super::{
HatCode, HatContext, HatFace, HatTables,
};
use crate::net::routing::{
dispatcher::{face::FaceState, tables::Tables},
dispatcher::{face::FaceState, interests::RemoteInterest, tables::Tables},
hat::{CurrentFutureTrait, HatTokenTrait, SendDeclare},
router::{NodeId, Resource, SessionContext},
RoutingContext,
Expand Down Expand Up @@ -116,11 +116,16 @@ fn propagate_simple_token_to(
let matching_interests = face_hat!(dst_face)
.remote_interests
.values()
.filter(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true))
.filter(|i| i.options.tokens() && i.matches(res))
.cloned()
.collect::<Vec<(Option<Arc<Resource>>, InterestOptions)>>();

for (int_res, options) in matching_interests {
.collect::<Vec<_>>();

for RemoteInterest {
res: int_res,
options,
..
} in matching_interests
{
let res = if options.aggregate() {
int_res.as_ref().unwrap_or(res)
} else {
Expand Down
Loading

0 comments on commit 09d1d1d

Please sign in to comment.