Skip to content

Commit 81217c7

Browse files
Admin space: Show known origins of Subscribers and Queryables. (#959)
* Subscribers are reported with known sources in adminspace * Return valid empty json in case of serialisation failure * Queryables are reported with known sources in adminspace * Address review comments
1 parent 9ecc903 commit 81217c7

File tree

10 files changed

+216
-39
lines changed

10 files changed

+216
-39
lines changed

zenoh/src/net/routing/hat/client/pubsub.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState;
1717
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
1818
use crate::net::routing::dispatcher::tables::Tables;
1919
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
20-
use crate::net::routing::hat::HatPubSubTrait;
20+
use crate::net::routing::hat::{HatPubSubTrait, Sources};
2121
use crate::net::routing::router::RoutesIndexes;
2222
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2323
use std::borrow::Cow;
24-
use std::collections::{HashMap, HashSet};
24+
use std::collections::HashMap;
2525
use std::sync::Arc;
2626
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
2727
use zenoh_protocol::{
@@ -274,11 +274,19 @@ impl HatPubSubTrait for HatCode {
274274
forget_client_subscription(tables, face, res);
275275
}
276276

277-
fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
278-
let mut subs = HashSet::new();
277+
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
278+
// Compute the list of known suscriptions (keys)
279+
let mut subs = HashMap::new();
279280
for src_face in tables.faces.values() {
280281
for sub in &face_hat!(src_face).remote_subs {
281-
subs.insert(sub.clone());
282+
// Insert the key in the list of known suscriptions
283+
let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty);
284+
// Append src_face as a suscription source in the proper list
285+
match src_face.whatami {
286+
WhatAmI::Router => srcs.routers.push(src_face.zid),
287+
WhatAmI::Peer => srcs.peers.push(src_face.zid),
288+
WhatAmI::Client => srcs.clients.push(src_face.zid),
289+
}
282290
}
283291
}
284292
Vec::from_iter(subs)

zenoh/src/net/routing/hat/client/queries.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState;
1717
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
1818
use crate::net::routing::dispatcher::tables::Tables;
1919
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
20-
use crate::net::routing::hat::HatQueriesTrait;
20+
use crate::net::routing::hat::{HatQueriesTrait, Sources};
2121
use crate::net::routing::router::RoutesIndexes;
2222
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2323
use ordered_float::OrderedFloat;
2424
use std::borrow::Cow;
25-
use std::collections::{HashMap, HashSet};
25+
use std::collections::HashMap;
2626
use std::sync::Arc;
2727
use zenoh_buffers::ZBuf;
2828
use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER};
@@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode {
272272
forget_client_queryable(tables, face, res);
273273
}
274274

275-
fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
276-
let mut qabls = HashSet::new();
275+
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
276+
// Compute the list of known queryables (keys)
277+
let mut qabls = HashMap::new();
277278
for src_face in tables.faces.values() {
278279
for qabl in &face_hat!(src_face).remote_qabls {
279-
qabls.insert(qabl.clone());
280+
// Insert the key in the list of known queryables
281+
let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty);
282+
// Append src_face as a queryable source in the proper list
283+
match src_face.whatami {
284+
WhatAmI::Router => srcs.routers.push(src_face.zid),
285+
WhatAmI::Peer => srcs.peers.push(src_face.zid),
286+
WhatAmI::Client => srcs.clients.push(src_face.zid),
287+
}
280288
}
281289
}
282290
Vec::from_iter(qabls)

zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*;
1919
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
2020
use crate::net::routing::dispatcher::tables::Tables;
2121
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
22-
use crate::net::routing::hat::HatPubSubTrait;
22+
use crate::net::routing::hat::{HatPubSubTrait, Sources};
2323
use crate::net::routing::router::RoutesIndexes;
2424
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2525
use petgraph::graph::NodeIndex;
@@ -605,8 +605,31 @@ impl HatPubSubTrait for HatCode {
605605
}
606606
}
607607

608-
fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
609-
hat!(tables).peer_subs.iter().cloned().collect()
608+
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
609+
// Compute the list of known suscriptions (keys)
610+
hat!(tables)
611+
.peer_subs
612+
.iter()
613+
.map(|s| {
614+
(
615+
s.clone(),
616+
// Compute the list of routers, peers and clients that are known
617+
// sources of those subscriptions
618+
Sources {
619+
routers: vec![],
620+
peers: Vec::from_iter(res_hat!(s).peer_subs.iter().cloned()),
621+
clients: s
622+
.session_ctxs
623+
.values()
624+
.filter_map(|f| {
625+
(f.face.whatami == WhatAmI::Client && f.subs.is_some())
626+
.then_some(f.face.zid)
627+
})
628+
.collect(),
629+
},
630+
)
631+
})
632+
.collect()
610633
}
611634

612635
fn compute_data_route(

zenoh/src/net/routing/hat/linkstate_peer/queries.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*;
1919
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
2020
use crate::net::routing::dispatcher::tables::Tables;
2121
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
22-
use crate::net::routing::hat::HatQueriesTrait;
22+
use crate::net::routing::hat::{HatQueriesTrait, Sources};
2323
use crate::net::routing::router::RoutesIndexes;
2424
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2525
use ordered_float::OrderedFloat;
@@ -670,8 +670,31 @@ impl HatQueriesTrait for HatCode {
670670
}
671671
}
672672

673-
fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
674-
hat!(tables).peer_qabls.iter().cloned().collect()
673+
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
674+
// Compute the list of known queryables (keys)
675+
hat!(tables)
676+
.peer_qabls
677+
.iter()
678+
.map(|s| {
679+
(
680+
s.clone(),
681+
// Compute the list of routers, peers and clients that are known
682+
// sources of those queryables
683+
Sources {
684+
routers: vec![],
685+
peers: Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned()),
686+
clients: s
687+
.session_ctxs
688+
.values()
689+
.filter_map(|f| {
690+
(f.face.whatami == WhatAmI::Client && f.qabl.is_some())
691+
.then_some(f.face.zid)
692+
})
693+
.collect(),
694+
},
695+
)
696+
})
697+
.collect()
675698
}
676699

677700
fn compute_query_route(

zenoh/src/net/routing/hat/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727
use crate::runtime::Runtime;
2828
use std::{any::Any, sync::Arc};
2929
use zenoh_buffers::ZBuf;
30-
use zenoh_config::{unwrap_or_default, Config, WhatAmI};
30+
use zenoh_config::{unwrap_or_default, Config, WhatAmI, ZenohId};
3131
use zenoh_protocol::{
3232
core::WireExpr,
3333
network::{
@@ -47,6 +47,23 @@ zconfigurable! {
4747
pub static ref TREES_COMPUTATION_DELAY_MS: u64 = 100;
4848
}
4949

50+
#[derive(serde::Serialize)]
51+
pub(crate) struct Sources {
52+
routers: Vec<ZenohId>,
53+
peers: Vec<ZenohId>,
54+
clients: Vec<ZenohId>,
55+
}
56+
57+
impl Sources {
58+
pub(crate) fn empty() -> Self {
59+
Self {
60+
routers: vec![],
61+
peers: vec![],
62+
clients: vec![],
63+
}
64+
}
65+
}
66+
5067
pub(crate) trait HatTrait: HatBaseTrait + HatPubSubTrait + HatQueriesTrait {}
5168

5269
pub(crate) trait HatBaseTrait {
@@ -129,7 +146,7 @@ pub(crate) trait HatPubSubTrait {
129146
node_id: NodeId,
130147
);
131148

132-
fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>>;
149+
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;
133150

134151
fn compute_data_route(
135152
&self,
@@ -159,7 +176,7 @@ pub(crate) trait HatQueriesTrait {
159176
node_id: NodeId,
160177
);
161178

162-
fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>>;
179+
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;
163180

164181
fn compute_query_route(
165182
&self,

zenoh/src/net/routing/hat/p2p_peer/pubsub.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ use crate::net::routing::dispatcher::face::FaceState;
1717
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
1818
use crate::net::routing::dispatcher::tables::Tables;
1919
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
20-
use crate::net::routing::hat::HatPubSubTrait;
20+
use crate::net::routing::hat::{HatPubSubTrait, Sources};
2121
use crate::net::routing::router::RoutesIndexes;
2222
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2323
use std::borrow::Cow;
24-
use std::collections::{HashMap, HashSet};
24+
use std::collections::HashMap;
2525
use std::sync::Arc;
2626
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
2727
use zenoh_protocol::{
@@ -275,11 +275,19 @@ impl HatPubSubTrait for HatCode {
275275
forget_client_subscription(tables, face, res);
276276
}
277277

278-
fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
279-
let mut subs = HashSet::new();
278+
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
279+
// Compute the list of known suscriptions (keys)
280+
let mut subs = HashMap::new();
280281
for src_face in tables.faces.values() {
281282
for sub in &face_hat!(src_face).remote_subs {
282-
subs.insert(sub.clone());
283+
// Insert the key in the list of known suscriptions
284+
let srcs = subs.entry(sub.clone()).or_insert_with(Sources::empty);
285+
// Append src_face as a suscription source in the proper list
286+
match src_face.whatami {
287+
WhatAmI::Router => srcs.routers.push(src_face.zid),
288+
WhatAmI::Peer => srcs.peers.push(src_face.zid),
289+
WhatAmI::Client => srcs.clients.push(src_face.zid),
290+
}
283291
}
284292
}
285293
Vec::from_iter(subs)

zenoh/src/net/routing/hat/p2p_peer/queries.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ use crate::net::routing::dispatcher::face::FaceState;
1717
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
1818
use crate::net::routing::dispatcher::tables::Tables;
1919
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
20-
use crate::net::routing::hat::HatQueriesTrait;
20+
use crate::net::routing::hat::{HatQueriesTrait, Sources};
2121
use crate::net::routing::router::RoutesIndexes;
2222
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2323
use ordered_float::OrderedFloat;
2424
use std::borrow::Cow;
25-
use std::collections::{HashMap, HashSet};
25+
use std::collections::HashMap;
2626
use std::sync::Arc;
2727
use zenoh_buffers::ZBuf;
2828
use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER};
@@ -272,11 +272,19 @@ impl HatQueriesTrait for HatCode {
272272
forget_client_queryable(tables, face, res);
273273
}
274274

275-
fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
276-
let mut qabls = HashSet::new();
275+
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
276+
// Compute the list of known queryables (keys)
277+
let mut qabls = HashMap::new();
277278
for src_face in tables.faces.values() {
278279
for qabl in &face_hat!(src_face).remote_qabls {
279-
qabls.insert(qabl.clone());
280+
// Insert the key in the list of known queryables
281+
let srcs = qabls.entry(qabl.clone()).or_insert_with(Sources::empty);
282+
// Append src_face as a queryable source in the proper list
283+
match src_face.whatami {
284+
WhatAmI::Router => srcs.routers.push(src_face.zid),
285+
WhatAmI::Peer => srcs.peers.push(src_face.zid),
286+
WhatAmI::Client => srcs.clients.push(src_face.zid),
287+
}
280288
}
281289
}
282290
Vec::from_iter(qabls)

zenoh/src/net/routing/hat/router/pubsub.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*;
1919
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
2020
use crate::net::routing::dispatcher::tables::Tables;
2121
use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
22-
use crate::net::routing::hat::HatPubSubTrait;
22+
use crate::net::routing::hat::{HatPubSubTrait, Sources};
2323
use crate::net::routing::router::RoutesIndexes;
2424
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2525
use petgraph::graph::NodeIndex;
@@ -925,8 +925,41 @@ impl HatPubSubTrait for HatCode {
925925
}
926926
}
927927

928-
fn get_subscriptions(&self, tables: &Tables) -> Vec<Arc<Resource>> {
929-
hat!(tables).router_subs.iter().cloned().collect()
928+
fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
929+
// Compute the list of known suscriptions (keys)
930+
hat!(tables)
931+
.router_subs
932+
.iter()
933+
.map(|s| {
934+
(
935+
s.clone(),
936+
// Compute the list of routers, peers and clients that are known
937+
// sources of those subscriptions
938+
Sources {
939+
routers: Vec::from_iter(res_hat!(s).router_subs.iter().cloned()),
940+
peers: if hat!(tables).full_net(WhatAmI::Peer) {
941+
Vec::from_iter(res_hat!(s).peer_subs.iter().cloned())
942+
} else {
943+
s.session_ctxs
944+
.values()
945+
.filter_map(|f| {
946+
(f.face.whatami == WhatAmI::Peer && f.subs.is_some())
947+
.then_some(f.face.zid)
948+
})
949+
.collect()
950+
},
951+
clients: s
952+
.session_ctxs
953+
.values()
954+
.filter_map(|f| {
955+
(f.face.whatami == WhatAmI::Client && f.subs.is_some())
956+
.then_some(f.face.zid)
957+
})
958+
.collect(),
959+
},
960+
)
961+
})
962+
.collect()
930963
}
931964

932965
fn compute_data_route(

zenoh/src/net/routing/hat/router/queries.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*;
1919
use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext};
2020
use crate::net::routing::dispatcher::tables::Tables;
2121
use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr};
22-
use crate::net::routing::hat::HatQueriesTrait;
22+
use crate::net::routing::hat::{HatQueriesTrait, Sources};
2323
use crate::net::routing::router::RoutesIndexes;
2424
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
2525
use ordered_float::OrderedFloat;
@@ -1073,8 +1073,41 @@ impl HatQueriesTrait for HatCode {
10731073
}
10741074
}
10751075

1076-
fn get_queryables(&self, tables: &Tables) -> Vec<Arc<Resource>> {
1077-
hat!(tables).router_qabls.iter().cloned().collect()
1076+
fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
1077+
// Compute the list of known queryables (keys)
1078+
hat!(tables)
1079+
.router_qabls
1080+
.iter()
1081+
.map(|s| {
1082+
(
1083+
s.clone(),
1084+
// Compute the list of routers, peers and clients that are known
1085+
// sources of those queryables
1086+
Sources {
1087+
routers: Vec::from_iter(res_hat!(s).router_qabls.keys().cloned()),
1088+
peers: if hat!(tables).full_net(WhatAmI::Peer) {
1089+
Vec::from_iter(res_hat!(s).peer_qabls.keys().cloned())
1090+
} else {
1091+
s.session_ctxs
1092+
.values()
1093+
.filter_map(|f| {
1094+
(f.face.whatami == WhatAmI::Peer && f.qabl.is_some())
1095+
.then_some(f.face.zid)
1096+
})
1097+
.collect()
1098+
},
1099+
clients: s
1100+
.session_ctxs
1101+
.values()
1102+
.filter_map(|f| {
1103+
(f.face.whatami == WhatAmI::Client && f.qabl.is_some())
1104+
.then_some(f.face.zid)
1105+
})
1106+
.collect(),
1107+
},
1108+
)
1109+
})
1110+
.collect()
10781111
}
10791112

10801113
fn compute_query_route(

0 commit comments

Comments
 (0)