diff --git a/commons/zenoh-protocol/src/core/wire_expr.rs b/commons/zenoh-protocol/src/core/wire_expr.rs index 9f5c43266..7a70f1006 100644 --- a/commons/zenoh-protocol/src/core/wire_expr.rs +++ b/commons/zenoh-protocol/src/core/wire_expr.rs @@ -56,7 +56,7 @@ pub const EMPTY_EXPR_ID: ExprId = 0; // ~ suffix ~ if flag K==1 in Message's header // +---------------+ // -#[derive(PartialEq, Eq, Hash, Clone)] +#[derive(PartialEq, Eq, Hash, Clone, Debug)] pub struct WireExpr<'a> { pub scope: ExprId, // 0 marks global scope pub suffix: Cow<'a, str>, @@ -178,16 +178,6 @@ impl<'a> From<&'a keyexpr> for WireExpr<'a> { } } -impl fmt::Debug for WireExpr<'_> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.scope == 0 { - write!(f, "{}", self.suffix) - } else { - write!(f, "{}:{:?}:{}", self.scope, self.mapping, self.suffix) - } - } -} - impl fmt::Display for WireExpr<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.scope == 0 { diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs index ad4976bc7..8b7dd52cb 100644 --- a/commons/zenoh-protocol/src/network/interest.rs +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -31,7 +31,7 @@ pub mod flag { /// /// The behaviour of a INTEREST depends on the INTEREST MODE. /// -/// E.g., the message flow is the following for an [`Interest`] with mode `Current`: +/// E.g., the message flow is the following for an [`Interest`] with mode [`InterestMode::Current`]: /// /// ```text /// A B @@ -51,7 +51,7 @@ pub mod flag { /// | | /// ``` /// -/// And the message flow is the following for an [`Interest`] with mode `CurrentFuture`: +/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::CurrentFuture`]: /// /// ```text /// A B @@ -78,8 +78,30 @@ pub mod flag { /// | INTEREST FINAL | /// |------------------>| -- Mode: Final /// | | This stops the transmission of subscriber declarations/undeclarations. -/// | | +/// | | +/// ``` +/// +/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]: +/// +/// ```text +/// A B +/// | INTEREST | +/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. +/// | | +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | UNDECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | | +/// | ... | +/// | | +/// | INTEREST FINAL | +/// |------------------>| -- Mode: Final +/// | | This stops the transmission of subscriber declarations/undeclarations. +/// | | +/// ``` /// +/// ```text /// Flags: /// - |: Mode The mode of the interest* /// -/ @@ -100,7 +122,7 @@ pub mod flag { /// ~ [int_exts] ~ if Z==1 /// +---------------+ /// -/// *Mode of declaration: +/// Mode of declaration: /// - Mode 0b00: Final /// - Mode 0b01: Current /// - Mode 0b10: Future @@ -116,6 +138,16 @@ pub mod flag { /// If R==0 then M should be set to 0. /// - if A==1 then the replies SHOULD be aggregated /// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Interest { + pub id: InterestId, + pub mode: InterestMode, + pub options: InterestOptions, + pub wire_expr: Option>, + pub ext_qos: ext::QoSType, + pub ext_tstamp: Option, + pub ext_nodeid: ext::NodeIdType, +} /// The resolution of a RequestId pub type DeclareRequestId = u32; @@ -146,17 +178,6 @@ impl InterestMode { } } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Interest { - pub id: InterestId, - pub mode: InterestMode, - pub options: InterestOptions, - pub wire_expr: Option>, - pub ext_qos: ext::QoSType, - pub ext_tstamp: Option, - pub ext_nodeid: ext::NodeIdType, -} - pub mod ext { use crate::{ common::{ZExtZ64, ZExtZBuf}, diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index 6180828f1..17f5883fc 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -13,6 +13,7 @@ // use std::{ + fmt, sync::{Arc, Weak}, time::Duration, }; @@ -49,6 +50,28 @@ pub(crate) struct CurrentInterest { pub(crate) mode: InterestMode, } +#[derive(PartialEq, Clone)] +pub(crate) struct RemoteInterest { + pub(crate) res: Option>, + pub(crate) options: InterestOptions, + pub(crate) mode: InterestMode, +} + +impl fmt::Debug for RemoteInterest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemoteInterest") + .field("res", &self.res.as_ref().map(|res| res.expr())) + .field("options", &self.options) + .finish() + } +} + +impl RemoteInterest { + pub(crate) fn matches(&self, res: &Arc) -> bool { + self.res.as_ref().map(|r| r.matches(res)).unwrap_or(true) + } +} + pub(crate) fn declare_final( face: &mut Arc, id: InterestId, diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index eab03fc99..c6cfcf94f 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -27,7 +27,7 @@ use super::{face_hat, face_hat_mut, token::declare_token_interest, HatCode, HatF use crate::net::routing::{ dispatcher::{ face::{FaceState, InterestState}, - interests::{CurrentInterest, CurrentInterestCleanup}, + interests::{CurrentInterest, CurrentInterestCleanup, RemoteInterest}, resource::Resource, tables::{Tables, TablesLock}, }, @@ -43,7 +43,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) .cloned() .collect::>>() { - for (res, options) in face_hat_mut!(&mut src_face).remote_interests.values() { + for RemoteInterest { res, options, .. } in + face_hat_mut!(&mut src_face).remote_interests.values() + { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(face).local_interests.insert( id, @@ -94,9 +96,14 @@ impl HatInterestTrait for HatCode { send_declare, ) } - face_hat_mut!(face) - .remote_interests - .insert(id, (res.as_ref().map(|res| (*res).clone()), options)); + face_hat_mut!(face).remote_interests.insert( + id, + RemoteInterest { + res: res.as_ref().map(|res| (*res).clone()), + options, + mode, + }, + ); let interest = Arc::new(CurrentInterest { src_face: face.clone(), @@ -198,7 +205,8 @@ impl HatInterestTrait for HatCode { .collect::>() { let local_interest = dst_face.local_interests.get(&id).unwrap(); - if local_interest.res == interest.0 && local_interest.options == interest.1 + if local_interest.res == interest.res + && local_interest.options == interest.options { dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 169b6ccbf..5c540fd80 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -27,7 +27,7 @@ use token::{token_new_face, undeclare_simple_token}; use zenoh_config::WhatAmI; use zenoh_protocol::network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId}, - interest::{InterestId, InterestOptions}, + interest::InterestId, Oam, }; use zenoh_result::ZResult; @@ -48,7 +48,7 @@ use super::{ }; use crate::net::{ routing::{ - dispatcher::face::Face, + dispatcher::{face::Face, interests::RemoteInterest}, router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, runtime::Runtime, @@ -296,7 +296,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestOptions)>, + remote_interests: HashMap, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 52677cc07..098515c03 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -191,7 +191,7 @@ fn propagate_forget_simple_token( } else if face_hat!(face) .remote_interests .values() - .any(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .any(|i| i.options.tokens() && i.matches(res)) { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. diff --git a/zenoh/src/net/routing/hat/linkstate_peer/interests.rs b/zenoh/src/net/routing/hat/linkstate_peer/interests.rs index 6b23f8d65..27194c276 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/interests.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/interests.rs @@ -27,6 +27,7 @@ use super::{ use crate::net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, resource::Resource, tables::{Tables, TablesLock}, }, @@ -80,9 +81,14 @@ impl HatInterestTrait for HatCode { ) } if mode.future() { - face_hat_mut!(face) - .remote_interests - .insert(id, (res.cloned(), options)); + face_hat_mut!(face).remote_interests.insert( + id, + RemoteInterest { + res: res.cloned(), + options, + mode, + }, + ); } if mode.current() { send_declare( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index a5d160827..a2cb1cf1b 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::ZenohIdProto, network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId}, - interest::{InterestId, InterestOptions}, + interest::InterestId, oam::id::OAM_LINKSTATE, Oam, }, @@ -56,7 +56,7 @@ use crate::net::{ codec::Zenoh080Routing, protocol::linkstate::LinkStateList, routing::{ - dispatcher::face::Face, + dispatcher::{face::Face, interests::RemoteInterest}, hat::TREES_COMPUTATION_DELAY_MS, router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, @@ -489,7 +489,7 @@ impl HatContext { struct HatFace { link_id: usize, next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestOptions)>, + remote_interests: HashMap, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_tokens: HashMap, SubscriberId>, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 8b7cce47f..fc2074c0b 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -25,7 +25,7 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -39,6 +39,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, @@ -129,13 +130,16 @@ fn propagate_simple_subscription_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| { - o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - }) + .filter(|i| i.options.subscribers() && i.matches(res)) .cloned() - .collect::>, InterestOptions)>>(); + .collect::>(); - for (int_res, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 181db04f9..8668c01ba 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -183,7 +183,7 @@ fn propagate_simple_queryable( && face_hat!(dst_face) .remote_interests .values() - .any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .any(|i| i.options.queryables() && i.matches(res)) { let id = current .map(|c| c.0) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/token.rs b/zenoh/src/net/routing/hat/linkstate_peer/token.rs index 9741df474..e016b70de 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/token.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/token.rs @@ -20,7 +20,7 @@ use zenoh_protocol::{ network::{ declare::{common::ext::WireExprType, TokenId}, ext, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, Declare, DeclareBody, DeclareToken, UndeclareToken, }, }; @@ -31,7 +31,7 @@ use super::{ HatCode, HatContext, HatFace, HatTables, }; use crate::net::routing::{ - dispatcher::{face::FaceState, tables::Tables}, + dispatcher::{face::FaceState, interests::RemoteInterest, tables::Tables}, hat::{CurrentFutureTrait, HatTokenTrait, SendDeclare}, router::{NodeId, Resource, SessionContext}, RoutingContext, @@ -116,11 +116,16 @@ fn propagate_simple_token_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .filter(|i| i.options.tokens() && i.matches(res)) .cloned() - .collect::>, InterestOptions)>>(); - - for (int_res, options) in matching_interests { + .collect::>(); + + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 860cd7227..aaf24dd5b 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -30,7 +30,7 @@ use super::{ use crate::net::routing::{ dispatcher::{ face::{FaceState, InterestState}, - interests::{CurrentInterest, CurrentInterestCleanup}, + interests::{CurrentInterest, CurrentInterestCleanup, RemoteInterest}, resource::Resource, tables::{Tables, TablesLock}, }, @@ -47,7 +47,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) .collect::>>() { if face.whatami == WhatAmI::Router { - for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() { + for RemoteInterest { res, options, .. } in + face_hat_mut!(&mut src_face).remote_interests.values() + { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(face).local_interests.insert( id, @@ -123,9 +125,14 @@ impl HatInterestTrait for HatCode { send_declare, ) } - face_hat_mut!(face) - .remote_interests - .insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options)); + face_hat_mut!(face).remote_interests.insert( + id, + RemoteInterest { + res: res.as_ref().map(|res| (*res).clone()), + options, + mode, + }, + ); let interest = Arc::new(CurrentInterest { src_face: face.clone(), @@ -221,7 +228,8 @@ impl HatInterestTrait for HatCode { .collect::>() { let local_interest = dst_face.local_interests.get(&id).unwrap(); - if local_interest.res == interest.0 && local_interest.options == interest.2 + if local_interest.res == interest.res + && local_interest.options == interest.options { dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index b4da9a241..92f60ab2f 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestOptions}, oam::id::OAM_LINKSTATE, Declare, DeclareBody, DeclareFinal, Oam, }, @@ -59,7 +59,10 @@ use crate::net::{ codec::Zenoh080Routing, protocol::linkstate::LinkStateList, routing::{ - dispatcher::face::{Face, InterestState}, + dispatcher::{ + face::{Face, InterestState}, + interests::RemoteInterest, + }, router::{compute_data_routes, compute_query_routes, RoutesIndexes}, RoutingContext, }, @@ -398,7 +401,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestMode, InterestOptions)>, + remote_interests: HashMap, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_tokens: HashMap, TokenId>, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 56a3419b8..776b472c8 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -24,7 +24,7 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -35,6 +35,7 @@ use crate::{ net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, pubsub::SubscriberInfo, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, @@ -84,13 +85,16 @@ fn propagate_simple_subscription_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, _, o)| { - o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - }) + .filter(|i| i.options.subscribers() && i.matches(res)) .cloned() - .collect::>, InterestMode, InterestOptions)>>(); + .collect::>(); - for (int_res, _, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index c75f498ff..052d40169 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -97,9 +97,7 @@ fn propagate_simple_queryable_to( || face_hat!(dst_face) .remote_interests .values() - .any(|(r, _, o)| { - o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - })) + .any(|i| i.options.queryables() && i.matches(res))) && src_face .as_ref() .map(|src_face| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 76fd87854..490cd5b5a 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -18,14 +18,14 @@ use zenoh_config::WhatAmI; use zenoh_protocol::network::{ declare::{common::ext::WireExprType, TokenId}, ext, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, Declare, DeclareBody, DeclareToken, UndeclareToken, }; use zenoh_sync::get_mut_unchecked; use super::{face_hat, face_hat_mut, HatCode, HatFace, INITIAL_INTEREST_ID}; use crate::net::routing::{ - dispatcher::{face::FaceState, tables::Tables}, + dispatcher::{face::FaceState, interests::RemoteInterest, tables::Tables}, hat::{CurrentFutureTrait, HatTokenTrait, SendDeclare}, router::{NodeId, Resource, SessionContext}, RoutingContext, @@ -86,15 +86,20 @@ fn propagate_simple_token_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, m, o)| { - o.tokens() - && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - && (m.current() || src_interest_id.is_none()) + .filter(|i| { + i.options.tokens() + && i.matches(res) + && (i.mode.current() || src_interest_id.is_none()) }) .cloned() - .collect::>, InterestMode, InterestOptions)>>(); + .collect::>(); - for (int_res, _, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { @@ -268,9 +273,10 @@ fn propagate_forget_simple_token( ), ); } else if src_face.id != face.id - && face_hat!(face).remote_interests.values().any(|(r, _, o)| { - o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() - }) + && face_hat!(face) + .remote_interests + .values() + .any(|i| i.options.tokens() && i.matches(res) && !i.options.aggregate()) { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. @@ -320,11 +326,11 @@ fn propagate_forget_simple_token( res.expr(), ), ); - } else if face_hat!(face).remote_interests.values().any(|(r, _, o)| { - o.tokens() - && r.as_ref().map(|r| r.matches(&res)).unwrap_or(true) - && !o.aggregate() - }) { + } else if face_hat!(face) + .remote_interests + .values() + .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) + { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. send_declare( diff --git a/zenoh/src/net/routing/hat/router/interests.rs b/zenoh/src/net/routing/hat/router/interests.rs index e3c74b8c9..18929ba72 100644 --- a/zenoh/src/net/routing/hat/router/interests.rs +++ b/zenoh/src/net/routing/hat/router/interests.rs @@ -30,6 +30,7 @@ use super::{ use crate::net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, resource::Resource, tables::{Tables, TablesLock}, }, @@ -90,9 +91,14 @@ impl HatInterestTrait for HatCode { ) } if mode.future() { - face_hat_mut!(face) - .remote_interests - .insert(id, (res.cloned(), options)); + face_hat_mut!(face).remote_interests.insert( + id, + RemoteInterest { + res: res.cloned(), + options, + mode, + }, + ); } if mode.current() { send_declare( diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 5e1906920..d383076a4 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -31,7 +31,7 @@ use zenoh_protocol::{ core::ZenohIdProto, network::{ declare::{queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId}, - interest::{InterestId, InterestOptions}, + interest::InterestId, oam::id::OAM_LINKSTATE, Oam, }, @@ -57,7 +57,7 @@ use crate::net::{ codec::Zenoh080Routing, protocol::linkstate::LinkStateList, routing::{ - dispatcher::face::Face, + dispatcher::{face::Face, interests::RemoteInterest}, hat::TREES_COMPUTATION_DELAY_MS, router::{compute_data_routes, compute_query_routes, RoutesIndexes}, }, @@ -854,7 +854,7 @@ impl HatContext { struct HatFace { link_id: usize, next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestOptions)>, + remote_interests: HashMap, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_qabls: HashMap, (QueryableId, QueryableInfoType)>, diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 45524f0e2..a1fb8de16 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -25,7 +25,7 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -40,6 +40,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, @@ -117,11 +118,16 @@ fn propagate_simple_subscription_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .filter(|i| i.options.subscribers() && i.matches(res)) .cloned() - .collect::>, InterestOptions)>>(); + .collect::>(); - for (int_res, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 2be4c1d5f..34e05c43c 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -252,7 +252,7 @@ fn propagate_simple_queryable( && face_hat!(dst_face) .remote_interests .values() - .any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .any(|i| i.options.queryables() && i.matches(res)) && if full_peers_net { dst_face.whatami == WhatAmI::Client } else { diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index b506764bf..ef60133d1 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -20,7 +20,7 @@ use zenoh_protocol::{ network::{ declare::{common::ext::WireExprType, TokenId}, ext, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, Declare, DeclareBody, DeclareToken, UndeclareToken, }, }; @@ -32,7 +32,7 @@ use super::{ HatContext, HatFace, HatTables, }; use crate::net::routing::{ - dispatcher::{face::FaceState, tables::Tables}, + dispatcher::{face::FaceState, interests::RemoteInterest, tables::Tables}, hat::{CurrentFutureTrait, HatTokenTrait, SendDeclare}, router::{NodeId, Resource, SessionContext}, RoutingContext, @@ -104,11 +104,16 @@ fn propagate_simple_token_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .filter(|i| i.options.tokens() && i.matches(res)) .cloned() - .collect::>, InterestOptions)>>(); + .collect::>(); - for (int_res, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + .. + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { @@ -420,9 +425,11 @@ fn propagate_forget_simple_token( && (src_face.whatami != WhatAmI::Peer || face.whatami != WhatAmI::Peer || hat!(tables).failover_brokering(src_face.zid, face.zid)) - }) && face_hat!(face).remote_interests.values().any(|(r, o)| { - o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() - }) { + }) && face_hat!(face) + .remote_interests + .values() + .any(|i| i.options.tokens() && i.matches(res) && !i.options.aggregate()) + { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. send_declare( @@ -475,15 +482,16 @@ fn propagate_forget_simple_token( res.expr(), ), ); - } else if face_hat!(face).remote_interests.values().any(|(r, o)| { - o.tokens() - && r.as_ref().map(|r| r.matches(&res)).unwrap_or(true) - && !o.aggregate() - }) && src_face.map_or(true, |src_face| { - src_face.whatami != WhatAmI::Peer - || face.whatami != WhatAmI::Peer - || hat!(tables).failover_brokering(src_face.zid, face.zid) - }) { + } else if face_hat!(face) + .remote_interests + .values() + .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) + && src_face.map_or(true, |src_face| { + src_face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables).failover_brokering(src_face.zid, face.zid) + }) + { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. send_declare(