forked from damienpontifex/azure-iot-sdk-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtemperature-client.rs
168 lines (139 loc) · 5.04 KB
/
temperature-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
use azure_iot_sdk::{
DeviceKeyTokenSource, DirectMethodResponse, IoTHubClient, Message, MessageType,
};
use chrono::{DateTime, Utc};
use log::{error, info};
use rand_distr::{Distribution, Normal};
use serde::Serialize;
use tokio::time;
#[derive(Serialize, Debug)]
struct TemperatureReading {
timestamp: DateTime<Utc>,
temperature: f32,
}
struct TemperatureSensor {
distribution: Normal<f32>,
}
impl Default for TemperatureSensor {
fn default() -> Self {
TemperatureSensor {
distribution: Normal::new(25.0, 7.0).unwrap(),
}
}
}
impl TemperatureSensor {
fn get_reading(&self) -> TemperatureReading {
TemperatureReading {
timestamp: Utc::now(),
temperature: self.distribution.sample(&mut rand::thread_rng()),
}
}
}
#[derive(Serialize, Debug)]
struct HumidityReading {
timestamp: DateTime<Utc>,
humidity: f32,
}
struct HumiditySensor {
distribution: Normal<f32>,
}
impl Default for HumiditySensor {
fn default() -> Self {
Self {
distribution: Normal::new(50.0, 7.0).unwrap(),
}
}
}
impl HumiditySensor {
fn get_reading(&self) -> HumidityReading {
HumidityReading {
timestamp: Utc::now(),
humidity: self.distribution.sample(&mut rand::thread_rng()),
}
}
}
#[tokio::main]
async fn main() -> azure_iot_sdk::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let hostname = std::env::var("IOTHUB_HOSTNAME")
.expect("Set IoT Hub hostname in the IOTHUB_HOSTNAME environment variable");
let device_id = std::env::var("DEVICE_ID")
.expect("Set the device id in the DEVICE_ID environment variable");
let shared_access_key = std::env::var("SHARED_ACCESS_KEY")
.expect("Set the device shared access key in the SHARED_ACCESS_KEY environment variable");
let token_source =
DeviceKeyTokenSource::new(&hostname, &device_id, &shared_access_key).unwrap();
let mut client = IoTHubClient::new(&hostname, device_id, None, token_source).await?;
info!("Initialized client");
let mut recv_client = client.clone();
let mut receiver = client.get_receiver().await;
let receive_loop = async {
loop {
while let Some(msg) = receiver.recv().await {
match msg {
MessageType::C2DMessage(msg) => info!("Received message {:?}", msg),
MessageType::DirectMethod(msg) => {
info!("Received direct method {:?}", msg);
recv_client
.respond_to_direct_method(DirectMethodResponse::new(
msg.request_id,
0,
Some(std::str::from_utf8(&msg.message.body).unwrap().to_string()),
))
.await
.unwrap();
}
MessageType::DesiredPropertyUpdate(msg) => {
info!("Desired properties updated {:?}", msg)
}
MessageType::ErrorReceive(err) => error!("Error during receive {:?}", err),
}
}
}
};
let mut interval = time::interval(time::Duration::from_secs(2));
let mut count = 0u32;
let sensor = TemperatureSensor::default();
let mut temp_client = client.clone();
let temp_sender = async {
loop {
interval.tick().await;
let temp = sensor.get_reading();
let msg = Message::builder()
.set_body(serde_json::to_vec(&temp).unwrap())
.set_message_id(format!("{}-t", count))
.build();
temp_client.send_message(msg).await?;
count += 1;
}
/*
* We need the explicit return so the compiler can figure out the return type of the async block.
* https://rust-lang.github.io/async-book/07_workarounds/03_err_in_async_blocks.html
*/
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
};
let mut interval = time::interval(time::Duration::from_secs(5));
let mut count = 0u32;
let sensor = HumiditySensor::default();
let humidity_sender = async move {
loop {
interval.tick().await;
let humidity = sensor.get_reading();
let msg = Message::builder()
.set_body(serde_json::to_vec(&humidity).unwrap())
.set_message_id(format!("{}-h", count))
.build();
client.send_message(msg).await?;
count += 1;
}
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
};
let (_, temp_sender_result, humidity_sender_result) =
tokio::join!(receive_loop, temp_sender, humidity_sender);
//in real life you'd do something useful, like restart the process
temp_sender_result.unwrap();
humidity_sender_result.unwrap();
Ok(())
}