Skip to content

Commit af71b12

Browse files
committed
break async.rs contents into several files
1 parent f41087b commit af71b12

File tree

5 files changed

+325
-200
lines changed

5 files changed

+325
-200
lines changed

src/async.rs renamed to src/async/client.rs

Lines changed: 16 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -23,117 +23,17 @@
2323
*/
2424

2525
use ffiasync;
26-
2726
use libc::{c_char, c_int, c_void};
2827
use std::ffi::{CStr, CString};
2928
use std::mem;
30-
use std::ptr;
3129
use std::slice;
3230
use std::sync::{Barrier, Mutex, Arc};
33-
use std::error::Error;
34-
use std::fmt;
35-
36-
pub enum PersistenceType {
37-
Default = 0,
38-
Nothing = 1,
39-
User = 2,
40-
}
41-
42-
#[derive(Debug)]
43-
pub enum Qos {
44-
FireAndForget = 0,
45-
AtLeastOnce = 1,
46-
OnceAndOneOnly = 2,
47-
}
48-
impl Qos {
49-
fn from_int(i:i32) -> Self {
50-
match i {
51-
0 => Qos::FireAndForget,
52-
1 => Qos::AtLeastOnce,
53-
2 => Qos::OnceAndOneOnly,
54-
_ => unreachable!(),
55-
}
56-
}
57-
}
58-
59-
#[derive(Debug)]
60-
pub struct Message {
61-
pub topic : String,
62-
pub payload : Option<Vec<u8>>,
63-
pub qos : Qos,
64-
pub retained : bool,
65-
pub duplicate : bool,
66-
}
67-
68-
#[derive(Debug, Clone)]
69-
pub enum MqttError {
70-
Create(i32),
71-
Connect(ConnectError),
72-
Subscribe(CommandError),
73-
Send(CommandError),
74-
}
75-
impl fmt::Display for MqttError {
76-
fn fmt(&self, f:&mut fmt::Formatter) -> Result<(), fmt::Error> {
77-
match *self {
78-
MqttError::Create(ref x) => fmt::Display::fmt(&format!("MqttError::Create({:?})", x), f),
79-
MqttError::Connect(ref x) => fmt::Display::fmt(&format!("MqttError::Connect({:?})", x), f),
80-
MqttError::Subscribe(ref x) => fmt::Display::fmt(&format!("MqttError::Subscribe({:?})", x), f),
81-
MqttError::Send(ref x) => fmt::Display::fmt(&format!("MqttError::Send({:?})", x), f),
82-
}
83-
}
84-
}
85-
impl Error for MqttError {
86-
fn description(&self) -> &str {
87-
match *self {
88-
MqttError::Create(_) => "Mqtt creation failed",
89-
MqttError::Connect(_) => "Mqtt connect failed",
90-
MqttError::Subscribe(_) => "Mqtt subscribe failed",
91-
MqttError::Send(_) => "Mqtt send failed",
92-
}
93-
}
94-
}
95-
96-
#[derive(Debug, Clone)]
97-
pub enum CommandError {
98-
ReturnCode(i32),
99-
CallbackResponse(i32),
100-
CallbackNullPtr
101-
}
10231

103-
#[derive(Debug, Clone)]
104-
pub enum ConnectError {
105-
ReturnCode(ConnectErrReturnCode),
106-
CallbackResponse(i32),
107-
CallbackNullPtr
108-
}
32+
use super::Message;
33+
use super::options::{PersistenceType, Qos, AsyncConnectOptions};
34+
use super::error::{MqttError, CommandError, ConnectError, ConnectErrReturnCode, CallbackError};
35+
use super::iterator::AsyncClientIntoIterator;
10936

110-
#[derive(Debug, Clone)]
111-
pub enum ConnectErrReturnCode {
112-
UnacceptableProtocol = 1,
113-
IdentifierRejected = 2,
114-
ServerUnavailable = 3,
115-
BadUsernameOrPassword = 4,
116-
NotAuthorized = 5,
117-
Reserved = 6,
118-
}
119-
impl ConnectErrReturnCode {
120-
fn from_int(i:i32) -> Self {
121-
match i {
122-
1 => ConnectErrReturnCode::UnacceptableProtocol,
123-
2 => ConnectErrReturnCode::IdentifierRejected,
124-
3 => ConnectErrReturnCode::ServerUnavailable,
125-
4 => ConnectErrReturnCode::BadUsernameOrPassword,
126-
5 => ConnectErrReturnCode::NotAuthorized,
127-
6 => ConnectErrReturnCode::Reserved,
128-
_ => unreachable!()
129-
}
130-
}
131-
}
132-
133-
enum CallbackError {
134-
Response(i32),
135-
NullPtr
136-
}
13737

13838
pub struct AsyncClient {
13939
handle : ffiasync::MQTTAsync,
@@ -192,21 +92,23 @@ impl AsyncClient {
19292
None);
19393
}
19494

95+
let mut async_opts = ffiasync::MQTTAsync_connectOptions::new();
96+
19597
// fill in FFI private struct
196-
options.options.keepAliveInterval = options.keep_alive_interval;
197-
options.options.cleansession = options.cleansession;
198-
options.options.maxInflight = options.max_in_flight;
199-
options.options.connectTimeout = options.connect_timeout;
200-
options.options.retryInterval = options.retry_interval;
98+
async_opts.keepAliveInterval = options.keep_alive_interval;
99+
async_opts.cleansession = options.cleansession;
100+
async_opts.maxInflight = options.max_in_flight;
101+
async_opts.connectTimeout = options.connect_timeout;
102+
async_opts.retryInterval = options.retry_interval;
201103

202104
// register callbacks
203-
options.options.context = self.context();
204-
options.options.onSuccess = Some(Self::action_succeeded);
205-
options.options.onFailure = Some(Self::action_failed);
105+
async_opts.context = self.context();
106+
async_opts.onSuccess = Some(Self::action_succeeded);
107+
async_opts.onFailure = Some(Self::action_failed);
206108

207109
self.action_result = None;
208110
let error = unsafe {
209-
ffiasync::MQTTAsync_connect(self.handle, &options.options)
111+
ffiasync::MQTTAsync_connect(self.handle, &async_opts)
210112
};
211113
if error == 0 {
212114
self.barrier.wait();
@@ -396,26 +298,7 @@ impl AsyncClient {
396298
}
397299

398300
pub fn messages(&mut self) -> AsyncClientIntoIterator {
399-
AsyncClientIntoIterator {
400-
messages: self.messages.clone(),
401-
}
402-
}
403-
}
404-
405-
pub struct AsyncClientIntoIterator {
406-
messages: Arc<Mutex<Vec<Message>>>,
407-
}
408-
409-
impl Iterator for AsyncClientIntoIterator {
410-
type Item = Message;
411-
fn next(&mut self) -> Option<Message> {
412-
let mut messages = self.messages.lock().unwrap();
413-
if messages.len() > 0 {
414-
Some(messages.remove(0))
415-
}
416-
else {
417-
None
418-
}
301+
AsyncClientIntoIterator::new(self.messages.clone())
419302
}
420303
}
421304

@@ -424,70 +307,3 @@ impl Drop for AsyncClient {
424307
unsafe{ffiasync::MQTTAsync_destroy(&mut self.handle)};
425308
}
426309
}
427-
428-
pub struct AsyncConnectOptions {
429-
options: ffiasync::MQTTAsync_connectOptions,
430-
431-
pub keep_alive_interval : i32,
432-
pub cleansession : i32,
433-
pub max_in_flight : i32,
434-
pub connect_timeout : i32,
435-
pub retry_interval : i32,
436-
}
437-
impl AsyncConnectOptions {
438-
pub fn new() -> AsyncConnectOptions {
439-
let ffioptions = ffiasync::MQTTAsync_connectOptions {
440-
struct_id : ['M' as i8, 'Q' as i8, 'T' as i8, 'C' as i8],
441-
struct_version : 3,
442-
keepAliveInterval : 60,
443-
cleansession : 1,
444-
maxInflight : 10,
445-
will : ptr::null_mut(),
446-
username : ptr::null_mut(),
447-
password : ptr::null_mut(),
448-
connectTimeout : 30,
449-
retryInterval : 0,
450-
ssl : ptr::null_mut(),
451-
onSuccess : None,
452-
onFailure : None,
453-
context : ptr::null_mut(),
454-
serverURIcount : 0,
455-
serverURIs : ptr::null_mut(),
456-
MQTTVersion : 0,
457-
};
458-
459-
let options = AsyncConnectOptions {
460-
options : ffioptions,
461-
462-
keep_alive_interval : 20,
463-
cleansession : 1,
464-
max_in_flight : 10,
465-
connect_timeout : 30,
466-
retry_interval : 0,
467-
};
468-
469-
options
470-
}
471-
}
472-
473-
// it is going to be used in the future
474-
// just silence warning for now
475-
#[allow(dead_code)]
476-
pub struct AsyncDisconnectOptions {
477-
options: ffiasync::MQTTAsync_disconnectOptions,
478-
}
479-
impl AsyncDisconnectOptions {
480-
481-
pub fn new() -> ffiasync::MQTTAsync_disconnectOptions {
482-
let options = ffiasync::MQTTAsync_disconnectOptions {
483-
struct_id : ['M' as i8, 'Q' as i8, 'T' as i8, 'D' as i8],
484-
struct_version : 0,
485-
timeout : 0,
486-
onSuccess : ptr::null_mut(),
487-
onFailure : ptr::null_mut(),
488-
context : ptr::null_mut(),
489-
};
490-
491-
options
492-
}
493-
}

src/async/error.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* The MIT License (MIT)
3+
*
4+
* Copyright (c) 2015 Andres Vahter (andres.vahter@gmail.com)
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
25+
use std::fmt;
26+
use std::error::Error;
27+
28+
29+
#[derive(Debug, Clone)]
30+
pub enum MqttError {
31+
Create(i32),
32+
Connect(ConnectError),
33+
Subscribe(CommandError),
34+
Send(CommandError),
35+
}
36+
impl fmt::Display for MqttError {
37+
fn fmt(&self, f:&mut fmt::Formatter) -> Result<(), fmt::Error> {
38+
match *self {
39+
MqttError::Create(ref x) => fmt::Display::fmt(&format!("MqttError::Create({:?})", x), f),
40+
MqttError::Connect(ref x) => fmt::Display::fmt(&format!("MqttError::Connect({:?})", x), f),
41+
MqttError::Subscribe(ref x) => fmt::Display::fmt(&format!("MqttError::Subscribe({:?})", x), f),
42+
MqttError::Send(ref x) => fmt::Display::fmt(&format!("MqttError::Send({:?})", x), f),
43+
}
44+
}
45+
}
46+
impl Error for MqttError {
47+
fn description(&self) -> &str {
48+
match *self {
49+
MqttError::Create(_) => "Mqtt creation failed",
50+
MqttError::Connect(_) => "Mqtt connect failed",
51+
MqttError::Subscribe(_) => "Mqtt subscribe failed",
52+
MqttError::Send(_) => "Mqtt send failed",
53+
}
54+
}
55+
}
56+
57+
#[derive(Debug, Clone)]
58+
pub enum CommandError {
59+
ReturnCode(i32),
60+
CallbackResponse(i32),
61+
CallbackNullPtr
62+
}
63+
64+
#[derive(Debug, Clone)]
65+
pub enum ConnectError {
66+
ReturnCode(ConnectErrReturnCode),
67+
CallbackResponse(i32),
68+
CallbackNullPtr
69+
}
70+
71+
#[derive(Debug, Clone)]
72+
pub enum ConnectErrReturnCode {
73+
UnacceptableProtocol = 1,
74+
IdentifierRejected = 2,
75+
ServerUnavailable = 3,
76+
BadUsernameOrPassword = 4,
77+
NotAuthorized = 5,
78+
Reserved = 6,
79+
}
80+
impl ConnectErrReturnCode {
81+
pub fn from_int(i:i32) -> Self {
82+
match i {
83+
1 => ConnectErrReturnCode::UnacceptableProtocol,
84+
2 => ConnectErrReturnCode::IdentifierRejected,
85+
3 => ConnectErrReturnCode::ServerUnavailable,
86+
4 => ConnectErrReturnCode::BadUsernameOrPassword,
87+
5 => ConnectErrReturnCode::NotAuthorized,
88+
6 => ConnectErrReturnCode::Reserved,
89+
_ => unreachable!()
90+
}
91+
}
92+
}
93+
94+
pub enum CallbackError {
95+
Response(i32),
96+
NullPtr
97+
}

0 commit comments

Comments
 (0)