forked from eclipse-paho/paho.mqtt.rust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdyn_subscribe.rs
177 lines (152 loc) · 6.98 KB
/
dyn_subscribe.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// paho-mqtt/examples/dyn_subscribe.rs
// This is a Paho MQTT Rust client, sample application.
//
//! This application is an MQTT subscriber using the asynchronous client
//! interface of the Paho Rust client library. It maintains a dynamic
//! set of subscription topics in the client's user data, and employs
//! callbacks to receive messages and status updates. It also monitors
//! for disconnects and performs manual re-connections.
//!
//! The MQTT client lets an application keep a single "User Data" item as a
//! boxed `Any`. It is set when the client is created and can be accessed
//! from the various callbacks. It must adhere to Rust's usual concurrency
//! and safety rules, so if it is to be updated by the application and
//! callbacks, then it must be thread protected. In this example, we use a
//! `RwLock` to provide thread safety.
//!
//! This sample demonstrates:
//! - Connecting to an MQTT server/broker.
//! - Client user data
//! - Subscribing to multiple topics simultaneously
//! - Receiving messages through the callback API
//! - Receiving network disconnect updates and attempting manual reconnects.
//! - Using a "clean session" and manually re-subscribing to topics on
//! reconnect.
//! - Last will and testament
//!
/*******************************************************************************
* Copyright (c) 2017-2020 Frank Pagliughi <fpagliughi@mindspring.com>
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Frank Pagliughi - initial implementation and documentation
*******************************************************************************/
use paho_mqtt as mqtt;
use std::{env, process, sync::RwLock, thread, time::Duration};
// The topics to which we subscribe.
const DFLT_TOPICS: &[&str] = &["requests/subscription/add", "test", "hello"];
const QOS: i32 = 1;
// The type we'll use to keep our dynamic list of topics inside the
// MQTT client. Since we want to update it after creating the client,
// we need to wrap the data in a lock, like a Mutex or RwLock.
type UserTopics = RwLock<Vec<String>>;
/////////////////////////////////////////////////////////////////////////////
// Callback for a successful connection to the broker.
// We subscribe to the topic(s) we want here.
fn on_connect_success(cli: &mqtt::AsyncClient, _msgid: u16) {
println!("Connection succeeded");
let data = cli.user_data().unwrap();
if let Some(lock) = data.downcast_ref::<UserTopics>() {
let topics = lock.read().unwrap();
println!("Subscribing to topics: {:?}", topics);
// Create a QoS vector, same len as # topics
let qos = vec![QOS; topics.len()];
// Subscribe to the desired topic(s).
cli.subscribe_many(&topics, &qos);
// TODO: This doesn't yet handle a failed subscription.
}
}
// Callback for a failed attempt to connect to the server.
// We simply sleep and then try again.
//
// Note that normally we don't want to do a blocking operation or sleep
// from within a callback. But in this case, we know that the client is
// *not* conected, and thus not doing anything important. So we don't worry
// too much about stopping its callback thread.
fn on_connect_failure(cli: &mqtt::AsyncClient, _msgid: u16, rc: i32) {
println!("Connection attempt failed with error code {}.\n", rc);
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
}
/////////////////////////////////////////////////////////////////////////////
fn main() {
// Initialize the logger from the environment
env_logger::init();
let host = env::args()
.nth(1)
.unwrap_or_else(|| "tcp://localhost:1883".to_string());
let topics: Vec<String> = DFLT_TOPICS.iter().map(|s| s.to_string()).collect();
// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("rust_dyn_subscribe")
.user_data(Box::new(RwLock::new(topics)))
.finalize();
// Create the client connection
let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
// Set a closure to be called whenever the client connection is established.
cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
println!("Connected.");
});
// Set a closure to be called whenever the client loses the connection.
// It will attempt to reconnect, and set up function callbacks to keep
// retrying until the connection is re-established.
cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
println!("Connection lost. Attempting reconnect.");
thread::sleep(Duration::from_millis(2500));
cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
});
// Attach a closure to the client to receive callback
// on incoming messages.
cli.set_message_callback(|cli, msg| {
if let Some(msg) = msg {
let topic = msg.topic();
let payload_str = msg.payload_str();
if topic == "requests/subscription/add" {
let data = cli.user_data().unwrap();
if let Some(lock) = data.downcast_ref::<UserTopics>() {
let mut topics = lock.write().unwrap();
let new_topic = payload_str.to_owned().to_string();
println!("Adding topic: {}", new_topic);
cli.subscribe(&new_topic, QOS);
topics.push(new_topic);
}
else {
println!("Failed to add topic: {}", payload_str);
}
}
else {
println!("{} - {}", topic, payload_str);
}
}
});
// Define the set of options for the connection
let lwt = mqtt::Message::new("test", "Async subscriber lost connection", 1);
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.mqtt_version(mqtt::MQTT_VERSION_3_1_1)
.clean_session(true)
.will_message(lwt)
.finalize();
// Make the connection to the broker
println!("Connecting to the MQTT server...");
cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);
// Just wait for incoming messages.
loop {
thread::sleep(Duration::from_millis(1000));
}
// Hitting ^C will exit the app and cause the broker to publish
// the LWT message since we're not disconnecting cleanly.
}