Skip to content

Commit

Permalink
refactor: remove state from entities (#1407)
Browse files Browse the repository at this point in the history
* refactor: remove state from entities

Entities having having a reference to their state prevented to drop their
callback when closing the session.

* fix: fix unstable
  • Loading branch information
wyfo authored Sep 12, 2024
1 parent cb64ddb commit ac36506
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 60 deletions.
6 changes: 3 additions & 3 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ lazy_static::lazy_static!(
);

pub(crate) fn init(session: WeakSession) {
if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) {
if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) {
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR)
.to_wire(&session)
.to_owned();
Expand Down Expand Up @@ -104,7 +104,7 @@ pub(crate) fn on_admin_query(session: &WeakSession, query: Query) {
}
}

if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) {
if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) {
for transport in zenoh_runtime::ZRuntime::Net
.block_in_place(session.runtime.manager().get_transports_unicast())
{
Expand Down Expand Up @@ -155,7 +155,7 @@ impl TransportMulticastEventHandler for Handler {
&self,
peer: zenoh_transport::TransportPeer,
) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
if let Ok(own_zid) = keyexpr::new(&self.session.runtime.zid().to_string()) {
if let Ok(own_zid) = keyexpr::new(&self.session.zid().to_string()) {
if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) {
let expr = WireExpr::from(
&(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid),
Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
// internal function for performing the publication
fn create_one_shot_publisher(self) -> ZResult<Publisher<'b>> {
Ok(Publisher {
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: self.session.downgrade(),
id: 0, // This is a one shot Publisher
key_expr: self.key_expr?,
Expand Down Expand Up @@ -394,8 +392,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
.0
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: self.session.downgrade(),
id,
key_expr,
Expand Down
5 changes: 2 additions & 3 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,9 @@ where
)
.map(|sub_state| Subscriber {
inner: SubscriberInner {
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: self.session.downgrade(),
state: sub_state,
id: sub_state.id,
key_expr: sub_state.key_expr.clone(),
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: self.undeclare_on_drop,
},
Expand Down
13 changes: 5 additions & 8 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use {
},
std::{collections::HashSet, sync::Arc, sync::Mutex},
zenoh_config::wrappers::EntityGlobalId,
zenoh_config::ZenohId,
zenoh_protocol::core::EntityGlobalIdProto,
};

Expand Down Expand Up @@ -104,8 +103,6 @@ impl fmt::Debug for PublisherState {
/// ```
#[derive(Debug)]
pub struct Publisher<'a> {
#[cfg(feature = "unstable")]
pub(crate) session_id: ZenohId,
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
Expand Down Expand Up @@ -139,7 +136,7 @@ impl<'a> Publisher<'a> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.session_id.into(),
zid: self.session.zid().into(),
eid: self.id,
}
.into()
Expand Down Expand Up @@ -783,7 +780,7 @@ where
inner: MatchingListenerInner {
session: self.publisher.session.clone(),
matching_listeners: self.publisher.matching_listeners.clone(),
state,
id: state.id,
undeclare_on_drop: self.undeclare_on_drop,
},
handler,
Expand Down Expand Up @@ -829,7 +826,7 @@ impl fmt::Debug for MatchingListenerState {
pub(crate) struct MatchingListenerInner {
pub(crate) session: WeakSession,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) state: Arc<MatchingListenerState>,
pub(crate) id: Id,
pub(crate) undeclare_on_drop: bool,
}

Expand Down Expand Up @@ -889,10 +886,10 @@ impl<Handler> MatchingListener<Handler> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.inner.undeclare_on_drop = false;
zlock!(self.inner.matching_listeners).remove(&self.inner.state.id);
zlock!(self.inner.matching_listeners).remove(&self.inner.id);
self.inner
.session
.undeclare_matches_listener_inner(self.inner.state.id)
.undeclare_matches_listener_inner(self.inner.id)
}
}

Expand Down
17 changes: 6 additions & 11 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use zenoh_result::ZResult;
#[zenoh_macros::unstable]
use {
crate::api::{query::ReplyKeyExpr, sample::SourceInfo},
zenoh_config::wrappers::{EntityGlobalId, ZenohId},
zenoh_config::wrappers::EntityGlobalId,
zenoh_protocol::core::EntityGlobalIdProto,
};

Expand Down Expand Up @@ -545,11 +545,8 @@ impl fmt::Debug for QueryableState {

#[derive(Debug)]
pub(crate) struct QueryableInner {
#[cfg(feature = "unstable")]
pub(crate) session_id: ZenohId,
pub(crate) session: WeakSession,
pub(crate) state: Arc<QueryableState>,
// Queryable is undeclared on drop unless its handler is a ZST, i.e. it is callback-only
pub(crate) id: Id,
pub(crate) undeclare_on_drop: bool,
}

Expand Down Expand Up @@ -823,8 +820,8 @@ impl<Handler> Queryable<Handler> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.inner.session_id.into(),
eid: self.inner.state.id,
zid: self.inner.session.zid().into(),
eid: self.inner.id,
}
.into()
}
Expand Down Expand Up @@ -868,7 +865,7 @@ impl<Handler> Queryable<Handler> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.inner.undeclare_on_drop = false;
self.inner.session.close_queryable(self.inner.state.id)
self.inner.session.close_queryable(self.inner.id)
}
}

Expand Down Expand Up @@ -930,10 +927,8 @@ where
)
.map(|qable_state| Queryable {
inner: QueryableInner {
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: self.session.downgrade(),
state: qable_state,
id: qable_state.id,
undeclare_on_drop: self.undeclare_on_drop,
},
handler: receiver,
Expand Down
49 changes: 36 additions & 13 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,7 @@ pub(crate) struct SessionInner {

impl fmt::Debug for SessionInner {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Session")
.field("id", &self.runtime.zid())
.finish()
f.debug_struct("Session").field("id", &self.zid()).finish()
}
}

Expand Down Expand Up @@ -671,7 +669,7 @@ impl Session {
// Called in the case that the runtime is not initialized with an hlc
// UNIX_EPOCH is Returns a Timespec::zero(), Unwrap Should be permissable here
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into();
Timestamp::new(now, self.0.runtime.zid().into())
Timestamp::new(now, self.0.zid().into())
}
}
}
Expand Down Expand Up @@ -1058,21 +1056,35 @@ impl Session {
}
}
impl SessionInner {
pub fn zid(&self) -> ZenohId {
self.runtime.zid()
}

fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
if self.owns_runtime {
info!(zid = %self.runtime.zid(), "close session");
info!(zid = %self.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
} else {
primitives.send_close();
}
zwrite!(self.state).queryables.clear();
let mut state = zwrite!(self.state);
state.queryables.clear();
state.subscribers.clear();
state.liveliness_subscribers.clear();
state.local_resources.clear();
state.remote_resources.clear();
#[cfg(feature = "unstable")]
{
state.tokens.clear();
state.matching_listeners.clear();
}
Ok(())
})
}
Expand Down Expand Up @@ -1196,6 +1208,9 @@ impl SessionInner {

pub(crate) fn undeclare_publisher_inner(&self, pid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
return Ok(());
};
if let Some(pub_state) = state.publishers.remove(&pid) {
trace!("undeclare_publisher({:?})", pub_state);
if pub_state.destination != Locality::SessionLocal {
Expand All @@ -1204,7 +1219,6 @@ impl SessionInner {
if !state.publishers.values().any(|p| {
p.destination != Locality::SessionLocal && p.remote_id == pub_state.remote_id
}) {
let primitives = state.primitives()?;
drop(state);
primitives.send_interest(Interest {
id: pub_state.remote_id,
Expand Down Expand Up @@ -1359,6 +1373,9 @@ impl SessionInner {
kind: SubscriberKind,
) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
return Ok(());
};
if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) {
trace!("undeclare_subscriber({:?})", sub_state);
for res in state
Expand All @@ -1384,7 +1401,6 @@ impl SessionInner {
if !state.subscribers(kind).values().any(|s| {
s.origin != Locality::SessionLocal && s.remote_id == sub_state.remote_id
}) {
let primitives = state.primitives()?;
drop(state);
primitives.send_declare(Declare {
interest_id: None,
Expand Down Expand Up @@ -1472,10 +1488,12 @@ impl SessionInner {

pub(crate) fn close_queryable(&self, qid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
return Ok(());
};
if let Some(qable_state) = state.queryables.remove(&qid) {
trace!("undeclare_queryable({:?})", qable_state);
if qable_state.origin != Locality::SessionLocal {
let primitives = state.primitives()?;
drop(state);
primitives.send_declare(Declare {
interest_id: None,
Expand Down Expand Up @@ -1596,13 +1614,15 @@ impl SessionInner {
#[zenoh_macros::unstable]
pub(crate) fn undeclare_liveliness(&self, tid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
return Ok(());
};
if let Some(tok_state) = state.tokens.remove(&tid) {
trace!("undeclare_liveliness({:?})", tok_state);
// Note: there might be several Tokens on the same KeyExpr.
let key_expr = &tok_state.key_expr;
let twin_tok = state.tokens.values().any(|s| s.key_expr == *key_expr);
if !twin_tok {
let primitives = state.primitives()?;
drop(state);
primitives.send_declare(Declare {
interest_id: None,
Expand Down Expand Up @@ -1771,6 +1791,9 @@ impl SessionInner {
#[zenoh_macros::unstable]
pub(crate) fn undeclare_matches_listener_inner(&self, sid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return Ok(());
}
if let Some(state) = state.matching_listeners.remove(&sid) {
trace!("undeclare_matches_listener_inner({:?})", state);
Ok(())
Expand Down Expand Up @@ -1896,7 +1919,7 @@ impl SessionInner {
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, {
let session = WeakSession::new(self);
#[cfg(feature = "unstable")]
let zid = self.runtime.zid();
let zid = self.zid();
async move {
tokio::select! {
_ = tokio::time::sleep(timeout) => {
Expand Down Expand Up @@ -2001,7 +2024,7 @@ impl SessionInner {
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, {
let session = WeakSession::new(self);
let zid = self.runtime.zid();
let zid = self.zid();
async move {
tokio::select! {
_ = tokio::time::sleep(timeout) => {
Expand Down Expand Up @@ -2094,7 +2117,7 @@ impl SessionInner {
}
};

let zid = self.runtime.zid();
let zid = self.zid();

let query_inner = Arc::new(QueryInner {
key_expr,
Expand Down
Loading

0 comments on commit ac36506

Please sign in to comment.