Skip to content

Commit

Permalink
Fix bug in return_conditions declares leading to wait for the full de…
Browse files Browse the repository at this point in the history
…lay (#1557)

* P2p peers don't send OAM messages when gossip disabled

* Treminate start conditions on declare final

* Avoid warning when scouting configured in connect peer

* Fix peers start_conditions on non peer gossip
  • Loading branch information
OlivierHecart authored Oct 22, 2024
1 parent 9fd0b36 commit f264361
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 42 deletions.
12 changes: 8 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,19 @@ impl Primitives for Face {
.entry(id)
.and_modify(|interest| interest.finalized = true);

let mut wtables = zwrite!(self.tables.tables);
let mut declares = vec![];
declare_final(&mut self.state.clone(), id, &mut |p, m| {
declares.push((p.clone(), m))
});
declare_final(
ctrl_lock.as_ref(),
&mut wtables,
&mut self.state.clone(),
id,
&mut |p, m| declares.push((p.clone(), m)),
);

// recompute routes
// TODO: disable routes and recompute them in parallel to avoid holding
// tables write lock for a long time.
let mut wtables = zwrite!(self.tables.tables);
let mut root_res = wtables.root_res.clone();
update_data_routes_from(&mut wtables, &mut root_res);
update_query_routes_from(&mut wtables, &mut root_res);
Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use zenoh_util::Timed;

use super::{
face::FaceState,
tables::{register_expr_interest, TablesLock},
tables::{register_expr_interest, Tables, TablesLock},
};
use crate::net::routing::{
hat::{HatTrait, SendDeclare},
Expand Down Expand Up @@ -73,6 +73,8 @@ impl RemoteInterest {
}

pub(crate) fn declare_final(
hat_code: &(dyn HatTrait + Send + Sync),
wtables: &mut Tables,
face: &mut Arc<FaceState>,
id: InterestId,
send_declare: &mut SendDeclare,
Expand All @@ -83,6 +85,8 @@ pub(crate) fn declare_final(
{
finalize_pending_interest(interest, send_declare);
}

hat_code.declare_final(wtables, face, id);
}

pub(crate) fn finalize_pending_interests(
Expand Down
11 changes: 8 additions & 3 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ use zenoh_sync::get_mut_unchecked;

use super::face::FaceState;
pub use super::{pubsub::*, queries::*, resource::*};
use crate::net::routing::{
hat::{self, HatTrait},
interceptor::{interceptor_factories, InterceptorFactory},
use crate::net::{
routing::{
hat::{self, HatTrait},
interceptor::{interceptor_factories, InterceptorFactory},
},
runtime::WeakRuntime,
};

pub(crate) struct RoutingExpr<'a> {
Expand Down Expand Up @@ -62,6 +65,7 @@ impl<'a> RoutingExpr<'a> {
pub struct Tables {
pub(crate) zid: ZenohIdProto,
pub(crate) whatami: WhatAmI,
pub(crate) runtime: Option<WeakRuntime>,
pub(crate) face_counter: usize,
#[allow(dead_code)]
pub(crate) hlc: Option<Arc<HLC>>,
Expand Down Expand Up @@ -93,6 +97,7 @@ impl Tables {
Ok(Tables {
zid,
whatami,
runtime: None,
face_counter: 0,
hlc,
drop_future_timestamp,
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/hat/client/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,8 @@ impl HatInterestTrait for HatCode {
}
}
}

fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc<FaceState>, _id: InterestId) {
// Nothing
}
}
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ impl HatInterestTrait for HatCode {
fn undeclare_interest(&self, _tables: &mut Tables, face: &mut Arc<FaceState>, id: InterestId) {
face_hat_mut!(face).remote_interests.remove(&id);
}

fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc<FaceState>, _id: InterestId) {
// Nothing
}
}
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub(crate) trait HatInterestTrait {
send_declare: &mut SendDeclare,
);
fn undeclare_interest(&self, tables: &mut Tables, face: &mut Arc<FaceState>, id: InterestId);
fn declare_final(&self, tables: &mut Tables, face: &mut Arc<FaceState>, id: InterestId);
}

pub(crate) trait HatPubSubTrait {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ impl Network {
}
}
}
if self.wait_declares && src_whatami != WhatAmI::Peer {
if (!self.wait_declares) || src_whatami != WhatAmI::Peer {
zenoh_runtime::ZRuntime::Net.block_in_place(
strong_runtime
.start_conditions()
Expand Down
11 changes: 7 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;

use super::{
face_hat, face_hat_mut, hat, initial_interest, pubsub::declare_sub_interest,
queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, HatTables,
face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest,
queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace,
INITIAL_INTEREST_ID,
};
use crate::net::routing::{
Expand Down Expand Up @@ -256,10 +256,13 @@ impl HatInterestTrait for HatCode {
}
}
}
}

fn declare_final(&self, tables: &mut Tables, face: &mut Arc<FaceState>, id: InterestId) {
if id == INITIAL_INTEREST_ID {
zenoh_runtime::ZRuntime::Net.block_in_place(async move {
if let Some(net) = &hat!(tables).gossip {
if let Some(runtime) = net.runtime.upgrade() {
if let Some(runtime) = &tables.runtime {
if let Some(runtime) = runtime.upgrade() {
runtime
.start_conditions()
.terminate_peer_connector_zid(face.zid)
Expand Down
49 changes: 21 additions & 28 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ mod pubsub;
mod queries;
mod token;

macro_rules! hat {
($t:expr) => {
$t.hat.downcast_ref::<HatTables>().unwrap()
};
}
use hat;

macro_rules! hat_mut {
($t:expr) => {
$t.hat.downcast_mut::<HatTables>().unwrap()
Expand Down Expand Up @@ -132,16 +125,18 @@ impl HatBaseTrait for HatCode {
unwrap_or_default!(config.routing().router().peers_failover_brokering());
drop(config_guard);

hat_mut!(tables).gossip = Some(Network::new(
"[Gossip]".to_string(),
tables.zid,
runtime,
router_peers_failover_brokering,
gossip,
gossip_multihop,
autoconnect,
wait_declares,
));
if gossip {
hat_mut!(tables).gossip = Some(Network::new(
"[Gossip]".to_string(),
tables.zid,
runtime,
router_peers_failover_brokering,
gossip,
gossip_multihop,
autoconnect,
wait_declares,
));
}
}

fn new_tables(&self, _router_peers_failover_brokering: bool) -> Box<dyn Any + Send + Sync> {
Expand Down Expand Up @@ -326,11 +321,9 @@ impl HatBaseTrait for HatCode {
wtables.faces.remove(&face.id);

if face.whatami != WhatAmI::Client {
hat_mut!(wtables)
.gossip
.as_mut()
.unwrap()
.remove_link(&face.zid);
if let Some(net) = hat_mut!(wtables).gossip.as_mut() {
net.remove_link(&face.zid);
}
};
drop(wtables);
}
Expand All @@ -346,15 +339,15 @@ impl HatBaseTrait for HatCode {
if oam.id == OAM_LINKSTATE {
if let ZExtBody::ZBuf(buf) = oam.body {
if let Ok(zid) = transport.get_zid() {
use zenoh_buffers::reader::HasReader;
use zenoh_codec::RCodec;
let codec = Zenoh080Routing::new();
let mut reader = buf.reader();
let list: LinkStateList = codec.read(&mut reader).unwrap();

let whatami = transport.get_whatami()?;
if whatami != WhatAmI::Client {
if let Some(net) = hat_mut!(tables).gossip.as_mut() {
use zenoh_buffers::reader::HasReader;
use zenoh_codec::RCodec;
let codec = Zenoh080Routing::new();
let mut reader = buf.reader();
let list: LinkStateList = codec.read(&mut reader).unwrap();

net.link_states(list.link_states, zid, whatami);
}
};
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/hat/router/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ impl HatInterestTrait for HatCode {
fn undeclare_interest(&self, _tables: &mut Tables, face: &mut Arc<FaceState>, id: InterestId) {
face_hat_mut!(face).remote_interests.remove(&id);
}

fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc<FaceState>, _id: InterestId) {
// Nothing
}
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Router {
pub fn init_link_state(&mut self, runtime: Runtime) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
let mut tables = zwrite!(self.tables.tables);
tables.runtime = Some(Runtime::downgrade(&runtime));
ctrl_lock.init(&mut tables, runtime)
}

Expand Down
11 changes: 10 additions & 1 deletion zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,16 @@ impl Runtime {

let locators = scouted_locators
.iter()
.filter(|l| !configured_locators.contains(l));
.filter(|l| !configured_locators.contains(l))
.collect::<Vec<&Locator>>();

if locators.is_empty() {
tracing::debug!(
"Already connecting to locators of {} (connect configuration). Ignore.",
zid
);
return false;
}

let manager = self.manager();

Expand Down

0 comments on commit f264361

Please sign in to comment.