Skip to content

Commit

Permalink
Make FIFO (try)recv{,_deadline,_timeout} return ZResult<Option<T>>
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Oct 8, 2024
1 parent 3740564 commit 8cab592
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 35 deletions.
30 changes: 24 additions & 6 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
impl<T> FifoChannelHandler<T> {
/// Attempt to fetch an incoming value from the channel associated with this receiver, returning
/// an error if the channel is empty or if all senders have been dropped.
pub fn try_recv(&self) -> ZResult<T> {
self.0.try_recv().map_err(Into::into)
///
/// If the channel is empty, this will return [`None`].
pub fn try_recv(&self) -> ZResult<Option<T>> {
match self.0.try_recv() {
Ok(value) => Ok(Some(value)),
Err(flume::TryRecvError::Empty) => Ok(None),
Err(err) => Err(err.into()),
}
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
Expand All @@ -84,14 +90,26 @@ impl<T> FifoChannelHandler<T> {

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the deadline has passed.
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<T> {
self.0.recv_deadline(deadline).map_err(Into::into)
///
/// If the deadline has expired, this will return [`None`].
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<Option<T>> {
match self.0.recv_deadline(deadline) {
Ok(value) => Ok(Some(value)),
Err(flume::RecvTimeoutError::Timeout) => Ok(None),
Err(err) => Err(err.into()),
}
}

/// Wait for an incoming value from the channel associated with this receiver, returning an
/// error if all senders have been dropped or the timeout has expired.
pub fn recv_timeout(&self, duration: Duration) -> ZResult<T> {
self.0.recv_timeout(duration).map_err(Into::into)
///
/// If the timeout has expired, this will return [`None`].
pub fn recv_timeout(&self, duration: Duration) -> ZResult<Option<T>> {
match self.0.recv_timeout(duration) {
Ok(value) => Ok(Some(value)),
Err(flume::RecvTimeoutError::Timeout) => Ok(None),
Err(err) => Err(err.into()),
}
}

/// Create a blocking iterator over the values received on the channel that finishes iteration
Expand Down
76 changes: 47 additions & 29 deletions zenoh/tests/matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,41 +58,55 @@ async fn zenoh_matching_status_any() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_any_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(true)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(false)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_any_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(true)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(false)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand All @@ -114,45 +128,47 @@ async fn zenoh_matching_status_remote() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_remote_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_remote_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(true)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(false)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand All @@ -175,45 +191,47 @@ async fn zenoh_matching_status_local() -> ZResult<()> {
let matching_listener = ztimeout!(publisher1.matching_listener()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session1.declare_subscriber("zenoh_matching_status_local_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(true)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false));
assert!(received_status
.ok()
.flatten()
.map(|s| s.matching_subscribers())
.eq(&Some(false)));

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

let sub = ztimeout!(session2.declare_subscriber("zenoh_matching_status_local_test")).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());

ztimeout!(sub.undeclare()).unwrap();

let received_status = matching_listener.recv_timeout(RECV_TIMEOUT);
assert!(
received_status.err().unwrap().downcast_ref() == Some(&flume::RecvTimeoutError::Timeout)
);
assert!(received_status.unwrap().is_none());

let matching_status = ztimeout!(publisher1.matching_status()).unwrap();
assert!(!matching_status.matching_subscribers());
Expand Down

0 comments on commit 8cab592

Please sign in to comment.