diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..2345b209 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +target +*~ +*.log +*.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 49e542e1..5552b1e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,26 +12,41 @@ # ADLINK zenoh team, # [package] -name = "zplugin-cdds" -version = "0.5.0" -authors = ["Esteve Fernandez "] +name = "zplugin-dds" +version = "0.5.0-beta.2" +authors = [ + "kydos ", + "Esteve Fernandez "] edition = "2018" +repository = "https://github.com/eclipse-zenoh/zenoh-plugin-dds" +homepage = "http://zenoh.io" +license = " EPL-2.0 OR Apache-2.0" +categories = ["network-programming"] +description = "Zenoh plugin for ROS2 and DDS in general" [lib] -name = "zplugin_cdds" -crate-type = ["cdylib"] +name = "zplugin_dds" +# crate-type = ["cdylib"] [dependencies] -zenoh = { version = "0.5.0-beta.1" } -zenoh-router = { version = "0.5.0-beta.1" } -futures = "0.3.5" -clap = "2" +zenoh = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "rust-master" } +zenoh-router = { git = "https://github.com/eclipse-zenoh/zenoh", branch = "rust-master" } +cyclors = { version = "0.1.0" } log = "0.4" -env_logger = "0.7.1" -ddsc = "0.1" +async-trait = "0.1.38" +futures = "0.3.5" + [dependencies.async-std] -version = "~1.6.0" -features = ["unstable"] +version = "=1.6.4" +features = ["unstable", "attributes"] + +[dev-dependencies] +clap = "2.33" +env_logger = "0.7.1" + +[[example]] +name = "dzd" +path = "examples/dzd.rs" \ No newline at end of file diff --git a/build.rs b/build.rs new file mode 100644 index 00000000..eb916279 --- /dev/null +++ b/build.rs @@ -0,0 +1,4 @@ +fn main() { + println!("cargo:rustc-link-search=/usr/local/lib"); + println!("cargo:rustc-link-lib=cdds-util"); +} \ No newline at end of file diff --git a/examples/dzd.rs b/examples/dzd.rs new file mode 100644 index 00000000..e6e87fd2 --- /dev/null +++ b/examples/dzd.rs @@ -0,0 +1,147 @@ +#![feature(vec_into_raw_parts)] + +use zplugin_dds::*; +use clap::{App, Arg}; +use futures::prelude::*; +use futures::select; +use zenoh::net::queryable::EVAL; +use zenoh::net::*; +use cyclors::*; +use std::collections::HashMap; +use log::{debug, info}; +use std::sync::Arc; +use async_std::task; +use async_std::sync::Mutex; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::ffi::CString; + +fn parse_args() -> (Config, String) { + let args = App::new("dzd zenoh router for DDS") + .arg(Arg::from_usage( + "-e, --peer=[LOCATOR]... 'Peer locators used to initiate the zenoh session.'", + )) + .arg(Arg::from_usage( + "-l, --listener=[LOCATOR]... 'Locators to listen on.'", + )) + .arg(Arg::from_usage( + "-s, --scope=[String]... 'A string used as prefix to scope DDS traffic.'", + )) + .get_matches(); + + let scope: String = args.value_of("scope") + .map(|s| String::from(s)) + .or_else(|| Some(String::from(""))) + .unwrap(); + + let config = Config::default() + .add_peers( + args.values_of("peer") + .map(|p| p.collect()) + .or_else(|| Some(vec![])) + .unwrap() + ) + .add_listeners( + args.values_of("listener") + .map(|p| p.collect()) + .or_else(|| Some(vec![])) + .unwrap()) + .local_routing(false); + + (config, scope) +} + + +#[async_std::main] +async fn main() { + env_logger::init(); + let mut rid_map = HashMap::::new(); + let mut rd_map = HashMap::::new(); + let mut wr_map = HashMap::::new(); + let (config, scope) = parse_args(); + let dp = unsafe { + dds_create_participant(DDS_DOMAIN_DEFAULT, std::ptr::null(), std::ptr::null()) + }; + let z = Arc::new(open(config, None).await.unwrap()); + let (tx, rx): (Sender, Receiver) = channel(); + run_discovery(dp, tx); + while let Ok(me) = rx.recv() { + match me { + MatchedEntity::DiscoveredPublication {topic_name, type_name, keyless, partition, qos} => { + debug!("DiscoveredPublication({}, {}, {:?}", topic_name, type_name, partition); + // keyless: bool, qos: dds_qos_t, z_key: ResKey, z: Arc) -> dds_entity_t { + let key = match partition { + Some(p) => format!("{}/{}/{}", scope, p, topic_name), + None => format!("{}/{}", scope, topic_name) + }; + debug!("Declaring resource {}", key); + match rd_map.get(&key) { + None => { + let rkey = ResKey::RName(key.clone()); + let nrid = z.declare_resource(&rkey).await.unwrap(); + let rid = ResKey::RId(nrid); + let _ = z.declare_publisher(&rid).await; + rid_map.insert(key.clone(), nrid); + debug!("Creating Forwarding Reader for: {}", key); + let dr: dds_entity_t = unsafe { + create_forwarding_dds_reader(dp, topic_name, type_name, keyless, qos, rid, z.clone()) + }; + rd_map.insert(key, dr); + }, + _ => { + debug!("Already forwarding matching subscription {} -- ignoring", topic_name); + } + + } + }, + MatchedEntity::UndiscoveredPublication {topic_name, type_name, partition} => { + debug!("UndiscoveredPublication({}, {}, {:?}", topic_name, type_name, partition); + }, + MatchedEntity::DiscoveredSubscription {topic_name, type_name, keyless, partition, qos} => { + debug!("DiscoveredSubscription({}, {}, {:?}", topic_name, type_name, partition); + let key = match partition { + Some(p) => format!("{}/{}/{}", scope, p, topic_name), + None => format!("{}/{}", scope, topic_name) + }; + debug!("Creating Subscriber for {}", key); + let nrid = match rid_map.get(&key) { + Some(nrid) => *nrid, + None => { + let rkey = ResKey::RName(key.clone()); + z.declare_resource(&rkey).await.unwrap() + } + }; + rid_map.insert(key.clone(), nrid); + let rkey = ResKey::RId(nrid); + let sub_info = SubInfo { + reliability: Reliability::Reliable, + mode: SubMode::Push, + period: None, + }; + let rsel = rkey.into(); + let zc = z.clone(); + task::spawn(async move { + let wr = create_forwarding_dds_writer(dp, topic_name.clone(), type_name.clone(), keyless, qos); + let mut sub = zc.declare_subscriber(&rsel, &sub_info).await.unwrap(); + let stream = sub.stream(); + while let Some(d) = stream.next().await { + debug!("Received data on zenoh subscriber for resource {}", d.res_name); + unsafe { + let bs = d.payload.to_vec(); + let (ptr, len, _) = bs.into_raw_parts(); + let cton = CString::new(topic_name.clone()).unwrap().into_raw(); + let ctyn = CString::new(type_name.clone()).unwrap().into_raw(); + let st = cdds_create_blob_sertopic(dp, cton, ctyn, keyless); + let fwdp = cdds_ddsi_payload_create(st, ddsi_serdata_kind_SDK_DATA, ptr, len as u64); + dds_writecdr(wr, fwdp as *mut ddsi_serdata); + }; + + } + }); + }, + MatchedEntity::UndiscoveredSubscription {topic_name, type_name, partition} => { + debug!("UndiscoveredSubscription({}, {}, {:?}", topic_name, type_name, partition); + }, + + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 2cf224ae..c15b94d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,245 @@ // Initial commit +use std::mem::MaybeUninit; +use cyclors::*; +use std::ffi::{CStr, CString}; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::os::raw; +use zenoh::net::{ResKey, Session, RBuf}; +use log::{debug}; +use async_std::task; + +const MAX_SAMPLES: usize = 32; + +#[derive(Debug)] +pub struct QosHolder(*mut dds_qos_t); +unsafe impl Send for QosHolder {} +unsafe impl Sync for QosHolder {} + +#[derive(Debug)] +pub enum MatchedEntity { + DiscoveredPublication { + topic_name: String, + type_name: String, + keyless: bool, + partition: Option, + qos: QosHolder + }, + UndiscoveredPublication { + topic_name: String, + type_name: String, + partition: Option, + }, + DiscoveredSubscription { + topic_name: String, + type_name: String, + keyless: bool, + partition: Option, + qos: QosHolder + }, + UndiscoveredSubscription { + topic_name: String, + type_name: String, + partition: Option, + }, +} + +unsafe extern "C" fn on_data(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { + let btx = Box::from_raw(arg as *mut (bool, Sender)); + let dp = dds_get_participant(dr); + let mut dpih: dds_instance_handle_t = 0; + let _ = dds_get_instance_handle(dp, &mut dpih); + debug!("Local Domain Participant IH = {:?}", dpih); + + #[allow(clippy::uninit_assumed_init)] + let mut si: [dds_sample_info_t; MAX_SAMPLES] = { MaybeUninit::uninit().assume_init() }; + let mut samples: [*mut ::std::os::raw::c_void; MAX_SAMPLES] = + [std::ptr::null_mut(); MAX_SAMPLES as usize]; + samples[0] = std::ptr::null_mut(); + + let n = dds_take( + dr, + samples.as_mut_ptr() as *mut *mut raw::c_void, + si.as_mut_ptr() as *mut dds_sample_info_t, + MAX_SAMPLES as u64, + MAX_SAMPLES as u32, + ); + for i in 0..n { + if si[i as usize].valid_data { + let sample = samples[i as usize] as *mut dds_builtintopic_endpoint_t; + debug!("Discovery data from Participant with IH = {:?}", (*sample).participant_instance_handle); + let topic_name = CStr::from_ptr((*sample).topic_name).to_str().unwrap(); + if topic_name.contains("DCPS") || (*sample).participant_instance_handle == dpih { + debug!("Ignoring discovery from local participant: {}", topic_name); + continue + } + let type_name = CStr::from_ptr((*sample).type_name).to_str().unwrap(); + let mut n = 0u32; + let mut ps: *mut *mut ::std::os::raw::c_char = std::ptr::null_mut(); + let qos = dds_create_qos(); + dds_copy_qos(qos, (*sample).qos); + + let _ = dds_qget_partition( + (*sample).qos, + &mut n as *mut u32, + &mut ps as *mut *mut *mut ::std::os::raw::c_char, + ); + if n > 0 { + for k in 0..n { + let p = CStr::from_ptr(*(ps.offset(k as isize))).to_str().unwrap(); + if si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE { + if btx.0 { + (btx.1) + .send(MatchedEntity::DiscoveredPublication { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + keyless: true, + partition: Some(String::from(p)), + qos: QosHolder(qos) + }) + .unwrap(); + } else { + (btx.1) + .send(MatchedEntity::DiscoveredSubscription { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + keyless: true, + partition: Some(String::from(p)), + qos: QosHolder(qos) + }) + .unwrap(); + } + } else if btx.0 { + (btx.1) + .send(MatchedEntity::UndiscoveredPublication { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + partition: Some(String::from(p)), + }) + .unwrap(); + } else { + (btx.1) + .send(MatchedEntity::UndiscoveredSubscription { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + partition: Some(String::from(p)), + }) + .unwrap(); + } + } + } else if si[i as usize].instance_state == dds_instance_state_DDS_IST_ALIVE { + if btx.0 { + (btx.1) + .send(MatchedEntity::DiscoveredPublication { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + keyless: true, + partition: None, + qos: QosHolder(qos) + }) + .unwrap(); + } else { + (btx.1) + .send(MatchedEntity::DiscoveredSubscription { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + keyless: true, + partition: None, + qos: QosHolder(qos) + }) + .unwrap(); + } + } else if btx.0 { + (btx.1) + .send(MatchedEntity::UndiscoveredPublication { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + partition: None, + }) + .unwrap(); + } else { + (btx.1) + .send(MatchedEntity::UndiscoveredSubscription { + topic_name: String::from(topic_name), + type_name: String::from(type_name), + partition: None, + }) + .unwrap(); + } + } + } + dds_return_loan( + dr, + samples.as_mut_ptr() as *mut *mut raw::c_void, + MAX_SAMPLES as i32, + ); + Box::into_raw(btx); +} +pub fn run_discovery(dp: dds_entity_t, tx: Sender) { + unsafe { + let ptx = Box::new((true, tx.clone())); + let stx = Box::new((false, tx)); + let sub_listener = dds_create_listener(Box::into_raw(ptx) as *mut std::os::raw::c_void); + dds_lset_data_available(sub_listener, Some(on_data)); + + let _pr = dds_create_reader( + dp, + DDS_BUILTIN_TOPIC_DCPSPUBLICATION, + std::ptr::null(), + sub_listener, + ); + + let sub_listener = dds_create_listener(Box::into_raw(stx) as *mut std::os::raw::c_void); + dds_lset_data_available(sub_listener, Some(on_data)); + let _sr = dds_create_reader( + dp, + DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, + std::ptr::null(), + sub_listener, + ); + + } +} + +unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { + debug!("data_forwarder_listener: triggered\n"); + let pa = arg as *mut (ResKey, Arc); + let mut zp: *mut cdds_ddsi_payload = std::ptr::null_mut(); + let mut si: [dds_sample_info_t; 1] = { MaybeUninit::uninit().assume_init() }; + let rc = cdds_take_blob(dr, &mut zp, si.as_mut_ptr()); + if rc > 0 { + debug!("data_forwarder_listener: forwarding data on zenoh\n"); + let xs = std::slice::from_raw_parts((*zp).payload, (*zp).size as usize); + let bs = Vec::from(xs); + let rbuf = RBuf::from(bs); + let _ = task::block_on(async { (*pa).1.write(&(*pa).0, rbuf).await }); + } + +} +pub fn create_forwarding_dds_reader(dp: dds_entity_t,topic_name: String, type_name: String, keyless: bool, qos: QosHolder, z_key: ResKey, z: Arc) -> dds_entity_t { + let cton = CString::new(topic_name).unwrap().into_raw(); + let ctyn = CString::new(type_name).unwrap().into_raw(); + + unsafe { + let t = cdds_create_blob_topic(dp, cton, ctyn, keyless); + let arg = Box::new((z_key, z)); + let sub_listener = dds_create_listener(Box::into_raw(arg) as *mut std::os::raw::c_void); + dds_lset_data_available(sub_listener, Some(data_forwarder_listener)); + dds_create_reader(dp, t, qos.0, sub_listener) + } +} + +pub fn create_forwarding_dds_writer(dp: dds_entity_t,topic_name: String, type_name: String, keyless: bool, qos: QosHolder) -> dds_entity_t { + let cton = CString::new(topic_name).unwrap().into_raw(); + let ctyn = CString::new(type_name).unwrap().into_raw(); + + unsafe { + let t = cdds_create_blob_topic(dp, cton, ctyn, keyless); + dds_create_writer(dp, t, qos.0, std::ptr::null_mut()) + } +} + + +// pub fn make_payload(st: *const std::ffi::c_void, kind: ddsi_serdata_kind, buf: *const u8, len: usize) -> *const std::ffi::c_void { +// unsafe { cdds_ddsi_payload_create(st as *mut ddsi_sertopic, ddsi_serdata_kind_SDK_DATA, buf as *mut u8, len as u64) as *const std::ffi::c_void } +// } \ No newline at end of file