Skip to content

Commit 65f2784

Browse files
committed
Added shared subscription support preparation code.
Broker module files are separated by responsibility. Fixed invalid memory access on endpoint_t. https://github.com/redboltz/mqtt_cpp/blob/f659bee42b9fac76d6b13dc99cf8e379671ce16c/include/mqtt/broker/broker.hpp#L1186-L1205
1 parent 45af140 commit 65f2784

38 files changed

+1845
-759
lines changed

example/broker.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
int main() {
1515
MQTT_NS::setup_log();
1616
boost::asio::io_context ioc;
17-
MQTT_NS::broker b(ioc);
17+
MQTT_NS::broker::broker_t b(ioc);
1818
test_server_no_tls s(ioc, b);
1919
ioc.run();
2020
}

include/mqtt/broker/broker.hpp

+129-650
Large diffs are not rendered by default.
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Takatoshi Kondo 2020
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_BROKER_BROKER_NAMESPACE_HPP)
8+
#define MQTT_BROKER_BROKER_NAMESPACE_HPP
9+
10+
#include <mqtt/config.hpp>
11+
12+
#include <mqtt/broker/broker_namespace.hpp>
13+
14+
#if __cplusplus >= 201703L
15+
16+
#define MQTT_BROKER_NS_BEGIN namespace MQTT_NS::broker {
17+
#define MQTT_BROKER_NS_END }
18+
19+
#else // __cplusplus >= 201703L
20+
21+
#define MQTT_BROKER_NS_BEGIN namespace MQTT_NS { namespace broker {
22+
#define MQTT_BROKER_NS_END } }
23+
24+
#endif // __cplusplus >= 201703L
25+
26+
#endif // MQTT_BROKER_BROKER_NAMESPACE_HPP

include/mqtt/broker/common_type.hpp

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright Takatoshi Kondo 2020
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_BROKER_COMMON_TYPE_HPP)
8+
#define MQTT_BROKER_COMMON_TYPE_HPP
9+
10+
#include <mqtt/config.hpp>
11+
12+
#include <mqtt/broker/broker_namespace.hpp>
13+
#include <mqtt/server.hpp>
14+
15+
MQTT_BROKER_NS_BEGIN
16+
17+
using endpoint_t = server<>::endpoint_t;
18+
using con_sp_t = std::shared_ptr<endpoint_t>;
19+
using con_wp_t = std::weak_ptr<endpoint_t>;
20+
using packet_id_t = endpoint_t::packet_id_t;
21+
22+
MQTT_BROKER_NS_END
23+
24+
#endif // MQTT_BROKER_COMMON_TYPE_HPP
+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright Takatoshi Kondo 2020
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_BROKER_INFLIGHT_MESSAGE_HPP)
8+
#define MQTT_BROKER_INFLIGHT_MESSAGE_HPP
9+
10+
#include <mqtt/config.hpp>
11+
12+
#include <chrono>
13+
14+
#include <boost/asio/steady_timer.hpp>
15+
16+
#include <mqtt/broker/broker_namespace.hpp>
17+
#include <mqtt/message_variant.hpp>
18+
#include <mqtt/any.hpp>
19+
#include <mqtt/visitor_util.hpp>
20+
21+
#include <mqtt/broker/common_type.hpp>
22+
#include <mqtt/broker/tags.hpp>
23+
#include <mqtt/broker/property_util.hpp>
24+
25+
MQTT_BROKER_NS_BEGIN
26+
27+
class inflight_messages;
28+
29+
class inflight_message {
30+
public:
31+
inflight_message(
32+
store_message_variant msg,
33+
any life_keeper,
34+
std::shared_ptr<as::steady_timer> tim_message_expiry)
35+
:msg_ { force_move(msg) },
36+
life_keeper_ { force_move(life_keeper) },
37+
tim_message_expiry_ { force_move(tim_message_expiry) }
38+
{}
39+
40+
packet_id_t packet_id() const {
41+
return
42+
MQTT_NS::visit(
43+
make_lambda_visitor(
44+
[](auto const& m) {
45+
return m.packet_id();
46+
}
47+
),
48+
msg_
49+
);
50+
}
51+
52+
void send(endpoint_t& ep) const {
53+
optional<store_message_variant> msg_opt;
54+
if (tim_message_expiry_) {
55+
MQTT_NS::visit(
56+
make_lambda_visitor(
57+
[&](v5::basic_publish_message<sizeof(packet_id_t)> const& m) {
58+
auto updated_msg = m;
59+
auto& props = updated_msg.props();
60+
61+
auto d =
62+
std::chrono::duration_cast<std::chrono::seconds>(
63+
tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
64+
).count();
65+
if (d < 0) d = 0;
66+
set_property<v5::property::message_expiry_interval>(
67+
props,
68+
v5::property::message_expiry_interval(
69+
static_cast<uint32_t>(d)
70+
)
71+
);
72+
msg_opt.emplace(force_move(updated_msg));
73+
},
74+
[](auto const&) {
75+
}
76+
),
77+
msg_
78+
);
79+
}
80+
ep.send_store_message(msg_opt ? msg_opt.value() : msg_, life_keeper_);
81+
}
82+
83+
private:
84+
friend class inflight_messages;
85+
86+
store_message_variant msg_;
87+
any life_keeper_;
88+
std::shared_ptr<as::steady_timer> tim_message_expiry_;
89+
};
90+
91+
class inflight_messages {
92+
public:
93+
void insert(
94+
store_message_variant msg,
95+
any life_keeper,
96+
std::shared_ptr<as::steady_timer> tim_message_expiry
97+
) {
98+
messages_.emplace_back(
99+
force_move(msg),
100+
force_move(life_keeper),
101+
force_move(tim_message_expiry)
102+
);
103+
}
104+
105+
void send_all_messages(endpoint_t& ep) {
106+
for (auto const& ifm : messages_) {
107+
ifm.send(ep);
108+
}
109+
}
110+
111+
void clear() {
112+
messages_.clear();
113+
}
114+
115+
template <typename Tag>
116+
decltype(auto) get() {
117+
return messages_.get<Tag>();
118+
}
119+
120+
template <typename Tag>
121+
decltype(auto) get() const {
122+
return messages_.get<Tag>();
123+
}
124+
125+
private:
126+
using mi_inflight_message = mi::multi_index_container<
127+
inflight_message,
128+
mi::indexed_by<
129+
mi::sequenced<
130+
mi::tag<tag_seq>
131+
>,
132+
mi::ordered_unique<
133+
mi::tag<tag_pid>,
134+
BOOST_MULTI_INDEX_CONST_MEM_FUN(inflight_message, packet_id_t, packet_id)
135+
>,
136+
mi::ordered_non_unique<
137+
mi::tag<tag_tim>,
138+
BOOST_MULTI_INDEX_MEMBER(inflight_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
139+
>
140+
>
141+
>;
142+
143+
mi_inflight_message messages_;
144+
};
145+
146+
MQTT_BROKER_NS_END
147+
148+
#endif // MQTT_BROKER_INFLIGHT_MESSAGE_HPP
+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright Takatoshi Kondo 2020
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_BROKER_OFFLINE_MESSAGE_HPP)
8+
#define MQTT_BROKER_OFFLINE_MESSAGE_HPP
9+
10+
#include <mqtt/config.hpp>
11+
12+
#include <boost/asio/steady_timer.hpp>
13+
#include <boost/multi_index_container.hpp>
14+
#include <boost/multi_index/ordered_index.hpp>
15+
#include <boost/multi_index/sequenced_index.hpp>
16+
#include <boost/multi_index/member.hpp>
17+
18+
#include <mqtt/buffer.hpp>
19+
#include <mqtt/property_variant.hpp>
20+
#include <mqtt/publish.hpp>
21+
22+
#include <mqtt/broker/broker_namespace.hpp>
23+
#include <mqtt/broker/common_type.hpp>
24+
#include <mqtt/broker/tags.hpp>
25+
#include <mqtt/broker/property_util.hpp>
26+
27+
MQTT_BROKER_NS_BEGIN
28+
29+
namespace mi = boost::multi_index;
30+
31+
class offline_messages;
32+
33+
// The offline_message structure holds messages that have been published on a
34+
// topic that a not-currently-connected client is subscribed to.
35+
// When a new connection is made with the client id for this saved data,
36+
// these messages will be published to that client, and only that client.
37+
class offline_message {
38+
public:
39+
offline_message(
40+
buffer topic,
41+
buffer contents,
42+
v5::properties props,
43+
publish_options pubopts,
44+
std::shared_ptr<as::steady_timer> tim_message_expiry)
45+
: topic_(force_move(topic)),
46+
contents_(force_move(contents)),
47+
props_(force_move(props)),
48+
pubopts_(pubopts),
49+
tim_message_expiry_(force_move(tim_message_expiry))
50+
{ }
51+
52+
void send(endpoint_t& ep) const {
53+
auto props = props_;
54+
if (tim_message_expiry_) {
55+
auto d =
56+
std::chrono::duration_cast<std::chrono::seconds>(
57+
tim_message_expiry_->expiry() - std::chrono::steady_clock::now()
58+
).count();
59+
if (d < 0) d = 0;
60+
set_property<v5::property::message_expiry_interval>(
61+
props,
62+
v5::property::message_expiry_interval(
63+
static_cast<uint32_t>(d)
64+
)
65+
);
66+
}
67+
68+
ep.publish(topic_, contents_, pubopts_, force_move(props));
69+
}
70+
71+
private:
72+
friend class offline_messages;
73+
74+
buffer topic_;
75+
buffer contents_;
76+
v5::properties props_;
77+
publish_options pubopts_;
78+
std::shared_ptr<as::steady_timer> tim_message_expiry_;
79+
};
80+
81+
class offline_messages {
82+
public:
83+
void send_all_messages(endpoint_t& ep) {
84+
try {
85+
auto& idx = messages_.get<tag_seq>();
86+
while (!idx.empty()) {
87+
idx.modify(
88+
idx.begin(),
89+
[&](auto& e) {
90+
e.send(ep);
91+
}
92+
);
93+
idx.pop_front();
94+
}
95+
}
96+
catch (packet_id_exhausted_error const& e) {
97+
MQTT_LOG("mqtt_broker", warning)
98+
<< MQTT_ADD_VALUE(address, &ep)
99+
<< e.what();
100+
}
101+
for (auto const& oflm : messages_) {
102+
oflm.send(ep);
103+
}
104+
}
105+
106+
void clear() {
107+
messages_.clear();
108+
}
109+
110+
template <typename Tag>
111+
decltype(auto) get() {
112+
return messages_.get<Tag>();
113+
}
114+
115+
template <typename Tag>
116+
decltype(auto) get() const {
117+
return messages_.get<Tag>();
118+
}
119+
120+
private:
121+
using mi_offline_message = mi::multi_index_container<
122+
offline_message,
123+
mi::indexed_by<
124+
mi::sequenced<
125+
mi::tag<tag_seq>
126+
>,
127+
mi::ordered_non_unique<
128+
mi::tag<tag_tim>,
129+
BOOST_MULTI_INDEX_MEMBER(offline_message, std::shared_ptr<as::steady_timer>, tim_message_expiry_)
130+
>
131+
>
132+
>;
133+
134+
mi_offline_message messages_;
135+
};
136+
137+
MQTT_BROKER_NS_END
138+
139+
#endif // MQTT_BROKER_OFFLINE_MESSAGE_HPP

include/mqtt/broker/property_util.hpp

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright Takatoshi Kondo 2020
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_BROKER_PROPERTY_UTIL_HPP)
8+
#define MQTT_BROKER_PROPERTY_UTIL_HPP
9+
10+
#include <mqtt/config.hpp>
11+
12+
#include <mqtt/broker/broker_namespace.hpp>
13+
#include <mqtt/optional.hpp>
14+
#include <mqtt/property_variant.hpp>
15+
#include <mqtt/visitor_util.hpp>
16+
17+
MQTT_BROKER_NS_BEGIN
18+
19+
template <typename T>
20+
inline optional<T> get_property(v5::properties const& props) {
21+
optional<T> result;
22+
23+
auto visitor = make_lambda_visitor(
24+
[&result](T const& t) { result = t; },
25+
[](auto const&) { }
26+
);
27+
28+
for (auto const& p : props) {
29+
MQTT_NS::visit(visitor, p);
30+
}
31+
32+
return result;
33+
}
34+
35+
template <typename T>
36+
inline void set_property(v5::properties& props, T&& v) {
37+
auto visitor = make_lambda_visitor(
38+
[&v](T& t) mutable { t = std::forward<T>(v); },
39+
[](auto&) { }
40+
);
41+
42+
for (auto& p : props) {
43+
MQTT_NS::visit(visitor, p);
44+
}
45+
}
46+
47+
MQTT_BROKER_NS_END
48+
49+
#endif // MQTT_BROKER_PROPERTY_UTIL_HPP

0 commit comments

Comments
 (0)