Skip to content

Commit

Permalink
Fix KeepAlive transmission interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Oct 21, 2024
1 parent d3998f3 commit 8913f67
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,39 +178,42 @@ async fn tx_task(
token: CancellationToken,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
let mut interval =
tokio::time::interval_at(tokio::time::Instant::now() + keep_alive, keep_alive);
loop {
tokio::select! {
res = pipeline.pull() => {
if let Some((mut batch, priority)) = res {
link.send_batch(&mut batch).await?;

#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(batch.stats.t_msgs);
stats.inc_tx_bytes(batch.len() as usize);
res = tokio::time::timeout(keep_alive, pipeline.pull()) => {
match res {
Ok(Some((mut batch, priority))) => {
link.send_batch(&mut batch).await?;

#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(batch.stats.t_msgs);
stats.inc_tx_bytes(batch.len() as usize);
}

// Reinsert the batch into the queue
pipeline.refill(batch, priority);
},
Ok(None) => {
// The queue has been disabled: break the tx loop, drain the queue, and exit
break;
},
Err(_) => {
// A timeout occured, no control/data messages have been sent during

Check warning on line 202 in io/zenoh-transport/src/unicast/universal/link.rs

View workflow job for this annotation

GitHub Actions / Typos Check

"occured" should be "occurred".

Check warning on line 202 in io/zenoh-transport/src/unicast/universal/link.rs

View workflow job for this annotation

GitHub Actions / Typos Check

"occured" should be "occurred".
// the keep_alive period, we need to send a KeepAlive message
let message: TransportMessage = KeepAlive.into();

#[allow(unused_variables)] // Used when stats feature is enabled
let n = link.send(&message).await?;

#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(1);
stats.inc_tx_bytes(n);
}
}

// Reinsert the batch into the queue
pipeline.refill(batch, priority);
} else {
break
}
}

_ = interval.tick() => {
let message: TransportMessage = KeepAlive.into();

#[allow(unused_variables)] // Used when stats feature is enabled
let n = link.send(&message).await?;

#[cfg(feature = "stats")]
{
stats.inc_tx_t_msgs(1);
stats.inc_tx_bytes(n);
}
}
},

_ = token.cancelled() => break
}
Expand Down

0 comments on commit 8913f67

Please sign in to comment.