Skip to content

Commit cb27b8c

Browse files
committed
add work in progress blocking and non-blocking receive
1 parent 4b18dbc commit cb27b8c

File tree

5 files changed

+168
-17
lines changed

5 files changed

+168
-17
lines changed

examples/blocking_receive.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#[macro_use]
2+
extern crate log;
3+
extern crate fern;
4+
extern crate time;
5+
extern crate mqtt;
6+
7+
use std::thread;
8+
use std::char;
9+
use mqtt::async::{PersistenceType, Qos, MqttError, AsyncClient, AsyncConnectOptions, AsyncDisconnectOptions};
10+
use std::error::Error;
11+
12+
13+
fn conf_logger() {
14+
let logger_config = fern::DispatchConfig {
15+
format: Box::new(|msg: &str, level: &log::LogLevel, _location: &log::LogLocation| {
16+
let t = time::now();
17+
let ms = t.tm_nsec/1000_000;
18+
format!("{}.{:3} [{}] {}", t.strftime("%Y-%m-%dT%H:%M:%S").unwrap(), ms, level, msg)
19+
}),
20+
output: vec![fern::OutputConfig::stderr()],
21+
level: log::LogLevelFilter::Trace,
22+
};
23+
24+
if let Err(e) = fern::init_global_logger(logger_config, log::LogLevelFilter::Trace) {
25+
panic!("Failed to initialize global logger: {}", e);
26+
}
27+
}
28+
29+
fn setup_mqtt(server_address: &str, topic: &str, client_id: &str) -> Result<AsyncClient, MqttError> {
30+
let connect_options = AsyncConnectOptions::new();
31+
let mut client = try!(AsyncClient::new(server_address, client_id, PersistenceType::Nothing));
32+
try!(client.connect(&connect_options));
33+
try!(client.subscribe(topic, Qos::FireAndForget));
34+
Ok(client)
35+
}
36+
37+
fn main() {
38+
// setup fern logger
39+
conf_logger();
40+
41+
// start processing
42+
info!("blocking receive test started");
43+
info!("run: mosquitto_pub -t TestTopic -m somedata to send some messages to the test");
44+
45+
let topic = "TestTopic";
46+
match setup_mqtt("tcp://localhost:1883", &topic, "TestClientId") {
47+
Ok(mut client) => {
48+
49+
// thread blocks here until message is received
50+
for message in client.messages(None) {
51+
info!("{:?}", message);
52+
}
53+
54+
let disconnect_options = AsyncDisconnectOptions::new();
55+
client.disconnect(&disconnect_options).unwrap();
56+
},
57+
Err(e) => error!("{}; raw error: {}", e.description(), e)
58+
}
59+
info!("blocking receive test ended");
60+
}

examples/loopback.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ fn main() {
4646
match setup_mqtt("tcp://localhost:1883", &topic, "TestClientId") {
4747
Ok(mut client) => {
4848
for i in 0..10 {
49-
info!("data len: {}", i);
49+
info!("send data len: {}", i);
5050
data.push(char::from_digit(i % 10, 10).unwrap() as u8);
5151
client.send(&data, &topic, Qos::FireAndForget).unwrap();
52-
for message in client.messages() {
52+
for message in client.messages(Some(100)) {
5353
info!("{:?}", message);
5454
}
55-
thread::sleep_ms(200);
55+
//thread::sleep_ms(200);
5656
}
5757

5858
let disconnect_options = AsyncDisconnectOptions::new();

examples/nonblocking_receive.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#[macro_use]
2+
extern crate log;
3+
extern crate fern;
4+
extern crate time;
5+
extern crate mqtt;
6+
7+
use std::thread;
8+
use std::char;
9+
use mqtt::async::{PersistenceType, Qos, MqttError, AsyncClient, AsyncConnectOptions, AsyncDisconnectOptions};
10+
use std::error::Error;
11+
12+
13+
fn conf_logger() {
14+
let logger_config = fern::DispatchConfig {
15+
format: Box::new(|msg: &str, level: &log::LogLevel, _location: &log::LogLocation| {
16+
let t = time::now();
17+
let ms = t.tm_nsec/1000_000;
18+
format!("{}.{:3} [{}] {}", t.strftime("%Y-%m-%dT%H:%M:%S").unwrap(), ms, level, msg)
19+
}),
20+
output: vec![fern::OutputConfig::stderr()],
21+
level: log::LogLevelFilter::Trace,
22+
};
23+
24+
if let Err(e) = fern::init_global_logger(logger_config, log::LogLevelFilter::Trace) {
25+
panic!("Failed to initialize global logger: {}", e);
26+
}
27+
}
28+
29+
fn setup_mqtt(server_address: &str, topic: &str, client_id: &str) -> Result<AsyncClient, MqttError> {
30+
let connect_options = AsyncConnectOptions::new();
31+
let mut client = try!(AsyncClient::new(server_address, client_id, PersistenceType::Nothing));
32+
try!(client.connect(&connect_options));
33+
try!(client.subscribe(topic, Qos::FireAndForget));
34+
Ok(client)
35+
}
36+
37+
fn main() {
38+
// setup fern logger
39+
conf_logger();
40+
41+
// start processing
42+
info!("non-blocking receive test started");
43+
info!("run: mosquitto_pub -t TestTopic -m somedata to send some messages to the test");
44+
45+
let topic = "TestTopic";
46+
match setup_mqtt("tcp://localhost:1883", &topic, "TestClientId") {
47+
Ok(mut client) => {
48+
49+
loop {
50+
info!("wait for a message..");
51+
let timeout_ms = Some(1);
52+
for message in client.messages(timeout_ms) {
53+
info!("{:?}", message);
54+
}
55+
}
56+
57+
let disconnect_options = AsyncDisconnectOptions::new();
58+
client.disconnect(&disconnect_options).unwrap();
59+
},
60+
Err(e) => error!("{}; raw error: {}", e.description(), e)
61+
}
62+
info!("non-blocking receive test ended");
63+
}

src/async/client.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use libc::{c_char, c_int, c_void};
2727
use std::ffi::{CStr, CString};
2828
use std::mem;
2929
use std::slice;
30-
use std::sync::{Barrier, Mutex, Arc};
30+
use std::sync::{Barrier, Arc, Mutex, Condvar};
3131

3232
use super::Message;
3333
use super::options::{PersistenceType, Qos, AsyncConnectOptions, AsyncDisconnectOptions};
@@ -62,8 +62,8 @@ impl AsyncClient {
6262
pub fn subscribe(&mut self, topic: &str, qos: Qos) -> Result<(), MqttError> {
6363
self.inner.subscribe(topic, qos)
6464
}
65-
pub fn messages(&mut self) -> AsyncClientIntoIterator {
66-
AsyncClientIntoIterator::new(self.inner.messages.clone())
65+
pub fn messages(&mut self, timeout_ms: Option<u32>) -> AsyncClientIntoIterator {
66+
AsyncClientIntoIterator::new(self.inner.messages.clone(), timeout_ms)
6767
}
6868
}
6969

@@ -77,7 +77,7 @@ struct ImmovableClient {
7777

7878
barrier : Barrier,
7979
action_result : Option<Result<(), CallbackError>>,
80-
pub messages : Arc<Mutex<Vec<Message>>>,
80+
pub messages : Arc<(Mutex<Vec<Message>>, Condvar)>,
8181
}
8282
impl ImmovableClient {
8383
fn context(&mut self) -> *mut c_void {
@@ -94,7 +94,7 @@ impl ImmovableClient {
9494

9595
barrier : Barrier::new(2),
9696
action_result : None,
97-
messages : Arc::new(Mutex::new(Vec::new())),
97+
messages : Arc::new((Mutex::new(Vec::new()), Condvar::new())),
9898
}
9999
}
100100

@@ -354,8 +354,12 @@ impl ImmovableClient {
354354
duplicate : duplicate,
355355
};
356356

357-
let mut messages = selfclient.messages.lock().unwrap();
357+
info!("RX");
358+
let &(ref msglock, ref cvar) = &*selfclient.messages;
359+
let mut messages = msglock.lock().unwrap();
358360
messages.push(msg);
361+
info!("RX queue len {:}", messages.len());
362+
cvar.notify_one();
359363

360364
let mut msg = amessage;
361365
unsafe{ffiasync::MQTTAsync_freeMessage(&mut msg)};

src/async/iterator.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,54 @@
2222
* SOFTWARE.
2323
*/
2424

25-
use std::sync::{Mutex, Arc};
25+
use std::sync::{Arc, Mutex, Condvar};
2626
use super::Message;
2727

2828

2929
pub struct AsyncClientIntoIterator {
30-
messages: Arc<Mutex<Vec<Message>>>,
30+
messages : Arc<(Mutex<Vec<Message>>, Condvar)>,
31+
timeout_ms : Option<u32>,
3132
}
3233

3334
impl AsyncClientIntoIterator {
34-
pub fn new(messages: Arc<Mutex<Vec<Message>>>) -> Self {
35-
AsyncClientIntoIterator{ messages: messages }
35+
pub fn new(messages: Arc<(Mutex<Vec<Message>>, Condvar)>, timeout_ms: Option<u32>) -> Self {
36+
AsyncClientIntoIterator{ messages : messages,
37+
timeout_ms : timeout_ms
38+
}
3639
}
3740
}
3841

3942
impl Iterator for AsyncClientIntoIterator {
4043
type Item = Message;
4144

4245
fn next(&mut self) -> Option<Message> {
43-
let mut messages = self.messages.lock().unwrap();
44-
if messages.len() > 0 {
45-
Some(messages.remove(0))
46+
let &(ref msglock, ref cvar) = &*self.messages;
47+
debug!("next");
48+
if self.timeout_ms.is_some() {
49+
// non-blocking
50+
let mut messages = msglock.lock().unwrap();
51+
if messages.len() > 0 {
52+
Some(messages.remove(0))
53+
}
54+
else {
55+
let (mut messages, is_timeout) = cvar.wait_timeout_ms(messages, self.timeout_ms.unwrap()).unwrap();
56+
if is_timeout {
57+
debug!("timeout");
58+
None
59+
}
60+
else {
61+
debug!("no timeout");
62+
assert!(messages.len() > 0);
63+
Some(messages.remove(0))
64+
}
65+
}
4666
}
4767
else {
48-
None
68+
// blocking
69+
let mut messages = msglock.lock().unwrap();
70+
messages = cvar.wait(messages).unwrap();
71+
assert!(messages.len() > 0);
72+
Some(messages.remove(0))
4973
}
5074
}
5175
}

0 commit comments

Comments
 (0)