Skip to content

Commit b4c46fd

Browse files
committed
fix move error by splitting asyncclient into two
1 parent beb4d13 commit b4c46fd

File tree

3 files changed

+64
-38
lines changed

3 files changed

+64
-38
lines changed

src/async/client.rs

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -36,48 +36,77 @@ use super::iterator::AsyncClientIntoIterator;
3636

3737

3838
pub struct AsyncClient {
39-
handle : ffiasync::MQTTAsync,
40-
barrier : Barrier,
41-
action_result : Option<Result<(), CallbackError>>,
42-
messages : Arc<Mutex<Vec<Message>>>,
39+
inner: Box<ImmovableClient>
4340
}
4441

4542
impl AsyncClient {
43+
pub fn new(address: &str, clientid: &str, persistence: PersistenceType) -> Result<Self, MqttError> {
44+
let mut ac = AsyncClient {
45+
inner: Box::new(ImmovableClient::new(address, clientid, persistence))
46+
};
47+
try!(ac.inner.create());
48+
Ok(ac)
49+
}
50+
pub fn connect(&mut self, options: &AsyncConnectOptions) -> Result<(), MqttError> {
51+
self.inner.connect(options)
52+
}
53+
pub fn is_connected(&self) -> bool {
54+
self.inner.is_connected()
55+
}
56+
pub fn send(&mut self, data: &[u8], topic: &str, qos: Qos) -> Result<(), MqttError> {
57+
self.inner.send(data, topic, qos)
58+
}
59+
pub fn subscribe(&mut self, topic: &str, qos: Qos) -> Result<(), MqttError> {
60+
self.inner.subscribe(topic, qos)
61+
}
62+
pub fn messages(&mut self) -> AsyncClientIntoIterator {
63+
AsyncClientIntoIterator::new(self.inner.messages.clone())
64+
}
65+
}
66+
4667

47-
/// Ensures the FFI struct is consistent for callback invocation.
48-
///
49-
/// Because the user may move this struct, the `context` pointer
50-
/// passed back to FFI callbacks might be invalidated. This function
51-
/// should be called before FFI actions that might fire callbacks to ensure
52-
/// the self-pointer is valid.
68+
struct ImmovableClient {
69+
c_url : CString,
70+
c_clientid : CString,
71+
handle : ffiasync::MQTTAsync,
72+
persistence_context : c_void,
73+
persistence : PersistenceType,
74+
75+
barrier : Barrier,
76+
action_result : Option<Result<(), CallbackError>>,
77+
pub messages : Arc<Mutex<Vec<Message>>>,
78+
}
79+
impl ImmovableClient {
5380
fn context(&mut self) -> *mut c_void {
5481
self as *mut _ as *mut c_void
5582
}
5683

57-
pub fn new(address: &str, clientid: &str, persistence: PersistenceType) -> Result<AsyncClient, MqttError> {
58-
let mut handle = unsafe{mem::zeroed()};
59-
let mut persistence_context: c_void = unsafe{mem::zeroed()};
60-
61-
let c_url = CString::new(address).unwrap();
62-
let c_clientid = CString::new(clientid).unwrap();
63-
let array_url = c_url.as_bytes_with_nul();
64-
let array_clientid = c_clientid.as_bytes_with_nul();
84+
pub fn new(address: &str, clientid: &str, persistence: PersistenceType) -> Self {
85+
ImmovableClient {
86+
c_url : CString::new(address).unwrap(),
87+
c_clientid : CString::new(clientid).unwrap(),
88+
handle : unsafe{mem::zeroed()},
89+
persistence_context : unsafe{mem::zeroed()},
90+
persistence : persistence,
91+
92+
barrier : Barrier::new(2),
93+
action_result : None,
94+
messages : Arc::new(Mutex::new(Vec::new())),
95+
}
96+
}
6597

98+
pub fn create(&mut self) -> Result<(), MqttError> {
99+
let array_url = self.c_url.as_bytes_with_nul();
100+
let array_clientid = self.c_clientid.as_bytes_with_nul();
66101
let error = unsafe {
67-
ffiasync::MQTTAsync_create(&mut handle,
102+
ffiasync::MQTTAsync_create(&mut self.handle,
68103
mem::transmute::<&u8, *const c_char>(&array_url[0]),
69104
mem::transmute::<&u8, *const c_char>(&array_clientid[0]),
70-
persistence as i32,
71-
&mut persistence_context)
105+
self.persistence as i32,
106+
&mut self.persistence_context)
72107
};
73-
74108
match error {
75-
0 => { Ok( AsyncClient {
76-
handle : handle,
77-
barrier : Barrier::new(2),
78-
action_result : None,
79-
messages : Arc::new(Mutex::new(Vec::new())),
80-
})},
109+
0 => { Ok(())},
81110
err => Err(MqttError::Create(err))
82111
}
83112
}
@@ -128,7 +157,7 @@ impl AsyncClient {
128157
assert!(!context.is_null());
129158
}
130159

131-
pub fn is_connected(&mut self) -> bool {
160+
pub fn is_connected(&self) -> bool {
132161
let ret = unsafe {
133162
ffiasync::MQTTAsync_isConnected(self.handle)
134163
};
@@ -227,15 +256,15 @@ impl AsyncClient {
227256
extern "C" fn action_succeeded(context: *mut ::libc::c_void, response: *mut ffiasync::MQTTAsync_successData) -> () {
228257
debug!("success callback");
229258
assert!(!context.is_null());
230-
let selfclient: &mut AsyncClient = unsafe {mem::transmute(context)};
259+
let selfclient: &mut ImmovableClient = unsafe {mem::transmute(context)};
231260
selfclient.action_result = Some(Ok(()));
232261
selfclient.barrier.wait();
233262
}
234263

235264
extern "C" fn action_failed(context: *mut ::libc::c_void, response: *mut ffiasync::MQTTAsync_failureData) -> () {
236265
debug!("failure callback");
237266
assert!(!context.is_null());
238-
let selfclient : &mut AsyncClient = unsafe {mem::transmute(context)};
267+
let selfclient : &mut ImmovableClient = unsafe {mem::transmute(context)};
239268
if response.is_null() {
240269
selfclient.action_result = Some(Err(CallbackError::NullPtr));
241270
} else {
@@ -264,7 +293,7 @@ impl AsyncClient {
264293
};
265294

266295
assert!(!context.is_null());
267-
let selfclient : &mut AsyncClient = unsafe {mem::transmute(context)};
296+
let selfclient : &mut ImmovableClient = unsafe {mem::transmute(context)};
268297

269298
let qos = Qos::from_int(transmessage.qos);
270299

@@ -297,12 +326,8 @@ impl AsyncClient {
297326
1
298327
}
299328

300-
pub fn messages(&mut self) -> AsyncClientIntoIterator {
301-
AsyncClientIntoIterator::new(self.messages.clone())
302-
}
303329
}
304-
305-
impl Drop for AsyncClient {
330+
impl Drop for ImmovableClient {
306331
fn drop(&mut self) {
307332
unsafe{ffiasync::MQTTAsync_destroy(&mut self.handle)};
308333
}

src/async/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ impl ConnectErrReturnCode {
9191
}
9292
}
9393

94+
#[derive(Debug, Copy, Clone)]
9495
pub enum CallbackError {
9596
Response(i32),
9697
NullPtr

src/async/options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
use ffiasync;
2626
use std::ptr;
2727

28-
28+
#[derive(Debug, Copy, Clone)]
2929
pub enum PersistenceType {
3030
Default = 0,
3131
Nothing = 1,

0 commit comments

Comments
 (0)