Skip to content

Commit

Permalink
Align examples and remove reading from stdin (eclipse-zenoh#768)
Browse files Browse the repository at this point in the history
* Remove reading from stdin, align example implementations

* Change counter start to 0

* Replace sleep with thread-parking

* Remove select, format files

* Uncomment liveliness token undeclaration
  • Loading branch information
oteffahi authored Mar 13, 2024
1 parent c75401b commit 61838d5
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 192 deletions.
28 changes: 8 additions & 20 deletions examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -39,23 +36,14 @@ async fn main() {
.unwrap(),
);

println!("Enter 'd' to undeclare LivelinessToken, 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
let _ = stdin.read_exact(&mut input).await;
match input[0] {
b'q' => break,
b'd' => {
if let Some(token) = token.take() {
println!("Undeclaring LivelinessToken...");
token.undeclare().res().await.unwrap();
}
}
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
println!("Press CTRL-C to undeclare LivelinessToken and quit...");
std::thread::park();
// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
if let Some(token) = token.take() {
println!("Undeclaring LivelinessToken...");
token.undeclare().res().await.unwrap();
};
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
4 changes: 1 addition & 3 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::{stdin, Read};

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -44,7 +42,7 @@ fn main() {
.callback(move |sample| publisher.put(sample.value).res().unwrap())
.res()
.unwrap();
for _ in stdin().bytes().take_while(|b| !matches!(b, Ok(b'q'))) {}
std::thread::park();
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn main() {
println!("Declaring Publisher on '{key_expr}'...");
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> {
println!("Allocating Shared Memory Buffer...");
let publisher = session.declare_publisher(&path).res().await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..(K * N as u32) {
sleep(Duration::from_secs(1)).await;
let mut sbuf = match shm.alloc(1024) {
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn main() {
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block).res().await.unwrap();

println!("Press CTRL-C to quit...");
loop {
publisher.put(buf.clone()).res().await.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn main() {
.res()
.unwrap();

println!("Press CTRL-C to quit...");
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
Expand Down
40 changes: 11 additions & 29 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use async_std::prelude::FutureExt;
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -35,40 +33,24 @@ async fn main() {
let subscriber = session
.declare_subscriber(&key_expr)
.pull_mode()
.res()
.await
.unwrap();

println!("Press <enter> to pull data...");

// Define the future to handle incoming samples of the subscription.
let subs = async {
while let Ok(sample) = subscriber.recv_async().await {
.callback(|sample| {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value,
);
}
};

// Define the future to handle keyboard's input.
let keyb = async {
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
stdin.read_exact(&mut input).await.unwrap();
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => subscriber.pull().res().await.unwrap(),
}
}
};
})
.res()
.await
.unwrap();

// Execute both futures concurrently until one of them returns.
subs.race(keyb).await;
println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
println!("[{idx:4}] Pulling...");
subscriber.pull().res().await.unwrap();
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
66 changes: 20 additions & 46 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -27,7 +22,6 @@ async fn main() {
env_logger::init();

let (config, key_expr, value, complete) = parse_args();
let send_errors = std::sync::atomic::AtomicBool::new(false);

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -40,47 +34,27 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit, 'e' to reply an error to next query...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
select!(
query = queryable.recv_async() => {
let query = query.unwrap();
match query.value() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), value),
}
let reply = if send_errors.swap(false, Relaxed) {
println!(
">> [Queryable ] Replying (ERROR: '{}')",
value,
);
Err(value.clone().into())
} else {
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
Ok(Sample::new(key_expr.clone(), value.clone()))
};
query
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
b'e' => send_errors.store(true, Relaxed),
_ => (),
}
}
println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
match query.value() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => println!(
">> [Queryable ] Received Query '{}' with value '{}'",
query.selector(),
value
),
}
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
let reply = Ok(Sample::new(key_expr.clone(), value.clone()));
query
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
}
}

Expand Down
15 changes: 1 addition & 14 deletions examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
//
#![recursion_limit = "256"]

use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::collections::HashMap;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand Down Expand Up @@ -46,9 +43,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0u8];
println!("Press CTRL-C to quit...");
loop {
select!(
sample = subscriber.recv_async() => {
Expand All @@ -70,14 +65,6 @@ async fn main() {
query.reply(Ok(sample.clone())).res().await.unwrap();
}
}
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
}
Expand Down
29 changes: 7 additions & 22 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -39,24 +35,13 @@ async fn main() {

let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
println!(">> [Subscriber] Received {} ('{}': '{}')",
sample.kind, sample.key_expr.as_str(), sample.value);
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
}
}
Expand Down
41 changes: 12 additions & 29 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -39,31 +35,18 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
match sample.kind {
SampleKind::Put => println!(
">> [LivelinessSubscriber] New alive token ('{}')",
sample.key_expr.as_str()),
SampleKind::Delete => println!(
">> [LivelinessSubscriber] Dropped token ('{}')",
sample.key_expr.as_str()),
}
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
match sample.kind {
SampleKind::Put => println!(
">> [LivelinessSubscriber] New alive token ('{}')",
sample.key_expr.as_str()
),
SampleKind::Delete => println!(
">> [LivelinessSubscriber] Dropped token ('{}')",
sample.key_expr.as_str()
),
}
}
}

Expand Down
Loading

0 comments on commit 61838d5

Please sign in to comment.