Skip to content

Commit a17c6b2

Browse files
authored
Merge pull request #443 from redboltz/accept_cb_shared_ptr
Updated the parameter of set_accept_handler() from endpoint_t to std:…
2 parents 10830cd + 932357d commit a17c6b2

18 files changed

+674
-446
lines changed

example/no_tls_both.cpp

+46-28
Original file line numberDiff line numberDiff line change
@@ -167,36 +167,46 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
167167
}
168168
);
169169
s.set_accept_handler(
170-
[&](con_t& ep) {
170+
[&s, &connections, &subs](con_sp_t spep) {
171+
auto& ep = *spep;
172+
std::weak_ptr<con_t> wp(spep);
173+
171174
using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
172175
std::cout << "[server] accept" << std::endl;
173-
auto sp = ep.shared_from_this();
174176
// For server close if ep is closed.
175177
auto g = MQTT_NS::shared_scope_guard(
176-
[&] {
178+
[&s] {
177179
std::cout << "[server] session end" << std::endl;
178180
s.close();
179181
}
180182
);
181-
ep.start_session(std::make_tuple(std::move(sp), std::move(g)));
183+
184+
// Pass spep to keep lifetime.
185+
// It makes sure wp.lock() never return nullptr in the handlers below
186+
// including close_handler and error_handler.
187+
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));
182188

183189
// set connection (lower than MQTT) level handlers
184190
ep.set_close_handler(
185-
[&]
191+
[&connections, &subs, wp]
186192
(){
187193
std::cout << "[server] closed." << std::endl;
188-
close_proc(connections, subs, ep.shared_from_this());
194+
auto sp = wp.lock();
195+
BOOST_ASSERT(sp);
196+
close_proc(connections, subs, sp);
189197
});
190198
ep.set_error_handler(
191-
[&]
199+
[&connections, &subs, wp]
192200
(boost::system::error_code const& ec){
193201
std::cout << "[server] error: " << ec.message() << std::endl;
194-
close_proc(connections, subs, ep.shared_from_this());
202+
auto sp = wp.lock();
203+
BOOST_ASSERT(sp);
204+
close_proc(connections, subs, sp);
195205
});
196206

197207
// set MQTT level handlers
198208
ep.set_connect_handler(
199-
[&]
209+
[&connections, wp]
200210
(MQTT_NS::buffer client_id,
201211
MQTT_NS::optional<MQTT_NS::buffer> username,
202212
MQTT_NS::optional<MQTT_NS::buffer> password,
@@ -209,53 +219,57 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
209219
std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
210220
std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
211221
std::cout << "[server] keep_alive : " << keep_alive << std::endl;
212-
connections.insert(ep.shared_from_this());
213-
ep.connack(false, MQTT_NS::connect_return_code::accepted);
222+
auto sp = wp.lock();
223+
BOOST_ASSERT(sp);
224+
connections.insert(sp);
225+
sp->connack(false, MQTT_NS::connect_return_code::accepted);
214226
return true;
215227
}
216228
);
217229
ep.set_disconnect_handler(
218-
[&]
230+
[&connections, &subs, wp]
219231
(){
220232
std::cout << "[server] disconnect received." << std::endl;
221-
close_proc(connections, subs, ep.shared_from_this());
233+
auto sp = wp.lock();
234+
BOOST_ASSERT(sp);
235+
close_proc(connections, subs, sp);
222236
});
223237
ep.set_puback_handler(
224-
[&]
238+
[]
225239
(packet_id_t packet_id){
226240
std::cout << "[server] puback received. packet_id: " << packet_id << std::endl;
227241
return true;
228242
});
229243
ep.set_pubrec_handler(
230-
[&]
244+
[]
231245
(packet_id_t packet_id){
232246
std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl;
233247
return true;
234248
});
235249
ep.set_pubrel_handler(
236-
[&]
250+
[]
237251
(packet_id_t packet_id){
238252
std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl;
239253
return true;
240254
});
241255
ep.set_pubcomp_handler(
242-
[&]
256+
[]
243257
(packet_id_t packet_id){
244258
std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl;
245259
return true;
246260
});
247261
ep.set_publish_handler(
248-
[&]
249-
(bool dup,
262+
[&subs]
263+
(bool is_dup,
250264
MQTT_NS::qos qos_value,
251-
bool retain,
265+
bool is_retain,
252266
MQTT_NS::optional<packet_id_t> packet_id,
253267
MQTT_NS::buffer topic_name,
254268
MQTT_NS::buffer contents){
255269
std::cout << "[server] publish received."
256-
<< " dup: " << std::boolalpha << dup
270+
<< " dup: " << std::boolalpha << is_dup
257271
<< " qos: " << qos_value
258-
<< " retain: " << std::boolalpha << retain << std::endl;
272+
<< " retain: " << std::boolalpha << is_retain << std::endl;
259273
if (packet_id)
260274
std::cout << "[server] packet_id: " << *packet_id << std::endl;
261275
std::cout << "[server] topic_name: " << topic_name << std::endl;
@@ -268,38 +282,42 @@ void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
268282
boost::asio::buffer(contents),
269283
std::make_pair(topic_name, contents),
270284
std::min(r.first->qos_value, qos_value),
271-
retain
285+
is_retain
272286
);
273287
}
274288
return true;
275289
});
276290
ep.set_subscribe_handler(
277-
[&]
291+
[&subs, wp]
278292
(packet_id_t packet_id,
279293
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
280294
std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
281295
std::vector<MQTT_NS::suback_reason_code> res;
282296
res.reserve(entries.size());
297+
auto sp = wp.lock();
298+
BOOST_ASSERT(sp);
283299
for (auto const& e : entries) {
284300
MQTT_NS::buffer topic = std::get<0>(e);
285301
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
286302
std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl;
287303
res.emplace_back(static_cast<MQTT_NS::suback_reason_code>(qos_value));
288-
subs.emplace(std::move(topic), ep.shared_from_this(), qos_value);
304+
subs.emplace(std::move(topic), sp, qos_value);
289305
}
290-
ep.suback(packet_id, res);
306+
sp->suback(packet_id, res);
291307
return true;
292308
}
293309
);
294310
ep.set_unsubscribe_handler(
295-
[&]
311+
[&subs, wp]
296312
(packet_id_t packet_id,
297313
std::vector<MQTT_NS::buffer> topics) {
298314
std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
299315
for (auto const& topic : topics) {
300316
subs.erase(topic);
301317
}
302-
ep.unsuback(packet_id);
318+
auto sp = wp.lock();
319+
BOOST_ASSERT(sp);
320+
sp->unsuback(packet_id);
303321
return true;
304322
}
305323
);

example/no_tls_server.cpp

+59-41
Original file line numberDiff line numberDiff line change
@@ -79,92 +79,106 @@ int main(int argc, char** argv) {
7979
mi_sub_con subs;
8080

8181
s.set_accept_handler(
82-
[&](con_t& ep) {
82+
[&connections, &subs](con_sp_t spep) {
83+
auto& ep = *spep;
84+
std::weak_ptr<con_t> wp(spep);
85+
8386
using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
8487
std::cout << "accept" << std::endl;
85-
auto sp = ep.shared_from_this();
86-
ep.start_session(sp); // keeping ep's lifetime as sp until session finished
88+
89+
// Pass spep to keep lifetime.
90+
// It makes sure wp.lock() never return nullptr in the handlers below
91+
// including close_handler and error_handler.
92+
ep.start_session(std::move(spep));
8793

8894
// set connection (lower than MQTT) level handlers
8995
ep.set_close_handler(
90-
[&]
96+
[&connections, &subs, wp]
9197
(){
92-
std::cout << "closed." << std::endl;
93-
close_proc(connections, subs, ep.shared_from_this());
98+
std::cout << "[server] closed." << std::endl;
99+
auto sp = wp.lock();
100+
BOOST_ASSERT(sp);
101+
close_proc(connections, subs, sp);
94102
});
95103
ep.set_error_handler(
96-
[&]
104+
[&connections, &subs, wp]
97105
(boost::system::error_code const& ec){
98-
std::cout << "error: " << ec.message() << std::endl;
99-
close_proc(connections, subs, ep.shared_from_this());
106+
std::cout << "[server] error: " << ec.message() << std::endl;
107+
auto sp = wp.lock();
108+
BOOST_ASSERT(sp);
109+
close_proc(connections, subs, sp);
100110
});
101111

102112
// set MQTT level handlers
103113
ep.set_connect_handler(
104-
[&]
114+
[&connections, wp]
105115
(MQTT_NS::buffer client_id,
106116
MQTT_NS::optional<MQTT_NS::buffer> username,
107117
MQTT_NS::optional<MQTT_NS::buffer> password,
108118
MQTT_NS::optional<MQTT_NS::will>,
109119
bool clean_session,
110120
std::uint16_t keep_alive) {
111121
using namespace MQTT_NS::literals;
112-
std::cout << "client_id : " << client_id << std::endl;
113-
std::cout << "username : " << (username ? username.value() : "none"_mb) << std::endl;
114-
std::cout << "password : " << (password ? password.value() : "none"_mb) << std::endl;
115-
std::cout << "clean_session: " << std::boolalpha << clean_session << std::endl;
116-
std::cout << "keep_alive : " << keep_alive << std::endl;
117-
connections.insert(ep.shared_from_this());
118-
ep.connack(false, MQTT_NS::connect_return_code::accepted);
122+
std::cout << "[server] client_id : " << client_id << std::endl;
123+
std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl;
124+
std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
125+
std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
126+
std::cout << "[server] keep_alive : " << keep_alive << std::endl;
127+
auto sp = wp.lock();
128+
BOOST_ASSERT(sp);
129+
connections.insert(sp);
130+
sp->connack(false, MQTT_NS::connect_return_code::accepted);
119131
return true;
120132
}
121133
);
122134
ep.set_disconnect_handler(
123-
[&]
135+
[&connections, &subs, wp]
124136
(){
125-
std::cout << "disconnect received." << std::endl;
126-
close_proc(connections, subs, ep.shared_from_this());
137+
std::cout << "[server] disconnect received." << std::endl;
138+
auto sp = wp.lock();
139+
BOOST_ASSERT(sp);
140+
close_proc(connections, subs, sp);
127141
});
128142
ep.set_puback_handler(
129-
[&]
143+
[]
130144
(packet_id_t packet_id){
131-
std::cout << "puback received. packet_id: " << packet_id << std::endl;
145+
std::cout << "[server] puback received. packet_id: " << packet_id << std::endl;
132146
return true;
133147
});
134148
ep.set_pubrec_handler(
135-
[&]
149+
[]
136150
(packet_id_t packet_id){
137-
std::cout << "pubrec received. packet_id: " << packet_id << std::endl;
151+
std::cout << "[server] pubrec received. packet_id: " << packet_id << std::endl;
138152
return true;
139153
});
140154
ep.set_pubrel_handler(
141-
[&]
155+
[]
142156
(packet_id_t packet_id){
143-
std::cout << "pubrel received. packet_id: " << packet_id << std::endl;
157+
std::cout << "[server] pubrel received. packet_id: " << packet_id << std::endl;
144158
return true;
145159
});
146160
ep.set_pubcomp_handler(
147-
[&]
161+
[]
148162
(packet_id_t packet_id){
149-
std::cout << "pubcomp received. packet_id: " << packet_id << std::endl;
163+
std::cout << "[server] pubcomp received. packet_id: " << packet_id << std::endl;
150164
return true;
151165
});
152166
ep.set_publish_handler(
153-
[&]
167+
[&subs]
154168
(bool is_dup,
155169
MQTT_NS::qos qos_value,
156170
bool is_retain,
157171
MQTT_NS::optional<packet_id_t> packet_id,
158172
MQTT_NS::buffer topic_name,
159173
MQTT_NS::buffer contents){
160-
std::cout << "publish received."
174+
std::cout << "[server] publish received."
161175
<< " dup: " << std::boolalpha << is_dup
162176
<< " qos: " << qos_value
163177
<< " retain: " << std::boolalpha << is_retain << std::endl;
164178
if (packet_id)
165-
std::cout << "packet_id: " << *packet_id << std::endl;
166-
std::cout << "topic_name: " << topic_name << std::endl;
167-
std::cout << "contents: " << contents << std::endl;
179+
std::cout << "[server] packet_id: " << *packet_id << std::endl;
180+
std::cout << "[server] topic_name: " << topic_name << std::endl;
181+
std::cout << "[server] contents: " << contents << std::endl;
168182
auto const& idx = subs.get<tag_topic>();
169183
auto r = idx.equal_range(topic_name);
170184
for (; r.first != r.second; ++r.first) {
@@ -179,32 +193,36 @@ int main(int argc, char** argv) {
179193
return true;
180194
});
181195
ep.set_subscribe_handler(
182-
[&]
196+
[&subs, wp]
183197
(packet_id_t packet_id,
184198
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
185-
std::cout << "subscribe received. packet_id: " << packet_id << std::endl;
199+
std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
186200
std::vector<MQTT_NS::suback_reason_code> res;
187201
res.reserve(entries.size());
202+
auto sp = wp.lock();
203+
BOOST_ASSERT(sp);
188204
for (auto const& e : entries) {
189205
MQTT_NS::buffer topic = std::get<0>(e);
190206
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
191-
std::cout << "topic: " << topic << " qos: " << qos_value << std::endl;
207+
std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl;
192208
res.emplace_back(static_cast<MQTT_NS::suback_reason_code>(qos_value));
193-
subs.emplace(std::move(topic), ep.shared_from_this(), qos_value);
209+
subs.emplace(std::move(topic), sp, qos_value);
194210
}
195-
ep.suback(packet_id, res);
211+
sp->suback(packet_id, res);
196212
return true;
197213
}
198214
);
199215
ep.set_unsubscribe_handler(
200-
[&]
216+
[&subs, wp]
201217
(packet_id_t packet_id,
202218
std::vector<MQTT_NS::buffer> topics) {
203-
std::cout << "unsubscribe received. packet_id: " << packet_id << std::endl;
219+
std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
204220
for (auto const& topic : topics) {
205221
subs.erase(topic);
206222
}
207-
ep.unsuback(packet_id);
223+
auto sp = wp.lock();
224+
BOOST_ASSERT(sp);
225+
sp->unsuback(packet_id);
208226
return true;
209227
}
210228
);

0 commit comments

Comments
 (0)