Skip to content

Commit

Permalink
Make SHM sub\queryable examples more robust (#1261)
Browse files Browse the repository at this point in the history
* Make SHM sub\queryable examples more robust

* fix clippy

* fix clippy
  • Loading branch information
yellowhatter authored Jul 24, 2024
1 parent 7aeb4ad commit 4827f39
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 16 deletions.
69 changes: 59 additions & 10 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//
use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
Expand Down Expand Up @@ -63,18 +64,29 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
print!(
">> [Queryable] Received Query '{}' ('{}'",
query.selector(),
query.key_expr().as_str(),
);
if let Some(query_payload) = query.payload() {
match query_payload.deserialize::<&zshm>() {
Ok(p) => print!(": '{}'", String::from_utf8_lossy(p)),
Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e),
// Print overall query payload information
match query.payload() {
Some(payload) => {
let (payload_type, payload) = handle_bytes(payload);
print!(
">> [Queryable] Received Query [{}] ('{}': '{}')",
payload_type,
query.selector(),
payload
);
}
None => {
print!(">> Received Query '{}'", query.selector());
}
};

// Print attachment information
if let Some(att) = query.attachment() {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}
println!(")");

println!();

// Allocate an SHM buffer
// NOTE: For allocation API please check z_alloc_shm.rs example
Expand Down Expand Up @@ -119,3 +131,40 @@ fn parse_args() -> (Config, KeyExpr<'static>, String, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.payload, args.complete)
}

fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#[cfg(not(feature = "shared-memory"))]
{
"RAW"
}

// if Zenoh is built with SHM support but without SHM API (that is unstable), it can
// receive buffers of any type, but there is no way to detect the buffer type
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))]
{
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
};

// In order to indicate the real underlying buffer type the code above is written ^^^
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently.
// In other words, the common application compiled with "shared-memory" feature will be able to
// handle incoming SHM data without any changes in the application code.
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
}
58 changes: 52 additions & 6 deletions examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use clap::Parser;
use zenoh::{config::Config, key_expr::KeyExpr, prelude::*, shm::zshm};
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -35,16 +37,23 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
// Print overall payload information
let (payload_type, payload) = handle_bytes(sample.payload());
print!(
">> [Subscriber] Received {} ('{}': ",
">> [Subscriber] Received [{}] {} ('{}': '{}')",
payload_type,
sample.kind(),
sample.key_expr().as_str(),
payload
);
match sample.payload().deserialize::<&zshm>() {
Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)),
Err(e) => print!("'Not a ShmBufInner: {:?}'", e),

// Print attachment information
if let Some(att) = sample.attachment() {
let (attachment_type, attachment) = handle_bytes(att);
print!(" ({}: {})", attachment_type, attachment);
}
println!(")");

println!();
}

// // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber
Expand Down Expand Up @@ -81,3 +90,40 @@ fn parse_args() -> (Config, KeyExpr<'static>) {
let args = SubArgs::parse();
(args.common.into(), args.key)
}

fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
// Determine buffer type for indication purpose
let bytes_type = {
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW
#[cfg(not(feature = "shared-memory"))]
{
"RAW"
}

// if Zenoh is built with SHM support but without SHM API (that is unstable), it can
// receive buffers of any type, but there is no way to detect the buffer type
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))]
{
"UNKNOWN"
}

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
};

// In order to indicate the real underlying buffer type the code above is written ^^^
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently.
// In other words, the common application compiled with "shared-memory" feature will be able to
// handle incoming SHM data without any changes in the application code.
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
}

0 comments on commit 4827f39

Please sign in to comment.