Skip to content

Commit 867ce28

Browse files
committed
Add staged qos to prevent heap allocation in test broker
1 parent d949027 commit 867ce28

File tree

1 file changed

+58
-42
lines changed

1 file changed

+58
-42
lines changed

test/test_broker.hpp

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -531,16 +531,16 @@ class test_broker {
531531
// If we have a saved session, we can transfer the state from it
532532
// to the active_session container.
533533
if(non_act_sess_it == non_act_sess_idx.end()) {
534-
auto const& ret = active_sessions_.emplace(client_id, spep, std::move(will), std::move(session_expiry_interval));
534+
auto const& ret = active_sessions_.emplace(std::make_shared<session_state>(client_id, spep, std::move(will), std::move(session_expiry_interval)));
535535
act_sess_it = ret.first;
536536
BOOST_ASSERT(ret.second);
537537
BOOST_ASSERT(act_sess_it->client_id == client_id);
538538
BOOST_ASSERT(act_sess_it == act_sess_idx.find(client_id));
539539
}
540540
else {
541-
session_state state;
542-
non_act_sess_idx.modify(non_act_sess_it, [&](session_state & val) { state = val; });
543-
state.con = spep;
541+
session_state_sp_t state;
542+
non_act_sess_idx.modify(non_act_sess_it, [&](session_state_sp_t & val) { state = val; });
543+
state->con = spep;
544544
non_act_sess_idx.erase(non_act_sess_it);
545545
BOOST_ASSERT(non_act_sess_idx.end() == non_act_sess_idx.find(client_id));
546546

@@ -557,7 +557,7 @@ class test_broker {
557557

558558
// Force disconnect the client.
559559
// This shuts down the socket directly.
560-
act_sess_it->con->force_disconnect();
560+
(*act_sess_it)->con->force_disconnect();
561561

562562
// Replace it with the new connection
563563
// Nothing more should need to be done
@@ -568,7 +568,7 @@ class test_broker {
568568
{
569569
auto & subs_idx = subs_.get<tag_con>();
570570
{
571-
auto const& range = boost::make_iterator_range(subs_idx.equal_range(act_sess_it->con));
571+
auto const& range = boost::make_iterator_range(subs_idx.equal_range((*act_sess_it)->con));
572572
for(auto it = range.begin(); it != range.end(); std::advance(it, 1)) {
573573
subs_idx.modify_key(it,
574574
[&](con_sp_t & val) { val = spep; },
@@ -606,7 +606,7 @@ class test_broker {
606606
*(d.props)
607607
);
608608
}
609-
auto handle = subs_map.insert_or_update(item.topic, connection_subscription_t(spep, item.qos_value));
609+
auto handle = subs_map.insert_or_update(item.topic, connection_session_t(*act_sess_it, item.qos_value));
610610
subs_.emplace(item.topic, spep, item.qos_value, handle);
611611
}
612612
idx.erase(range.begin(), range.end());
@@ -711,7 +711,7 @@ class test_broker {
711711
// TODO: This doesn't handle situations where we receive a new subscription for the same topic.
712712
// MQTT 3.1.1 - 3.8.4 Response - paragraph 3.
713713

714-
auto handle = subs_map.insert_or_update(topic, connection_subscription_t(spep, qos_value));
714+
auto handle = subs_map.insert_or_update(topic, connection_session_t(*session, qos_value));
715715
subs_.emplace(std::move(topic), spep, qos_value, handle);
716716
}
717717
// Acknowledge the subscriptions, and the registered QOS settings
@@ -728,7 +728,7 @@ class test_broker {
728728
res.emplace_back(MQTT_NS::v5::qos_to_suback_reason_code(qos_value)); // converts to granted_qos_x
729729
// TODO: This doesn't handle situations where we receive a new subscription for the same topic.
730730
// MQTT 3.1.1 - 3.8.4 Response - paragraph 3.
731-
auto handle = subs_map.insert_or_update(topic, connection_subscription_t(spep, qos_value));
731+
auto handle = subs_map.insert_or_update(topic, connection_session_t(*session, qos_value));
732732
subs_.emplace(std::move(topic), spep, qos_value, handle);
733733
}
734734
if (h_subscribe_props_) h_subscribe_props_(props);
@@ -765,6 +765,10 @@ class test_broker {
765765

766766
auto& ep = *spep;
767767

768+
auto & act_sess_idx = active_sessions_.get<tag_con>();
769+
auto act_sess_it = act_sess_idx.find(spep);
770+
session_state_sp_t state = *act_sess_it;
771+
768772
// For each subscription that this connection has
769773
// Compare against the list of topics, and remove
770774
// the subscription if the topic is in the list.
@@ -786,7 +790,7 @@ class test_broker {
786790
* iterator because we might not have a match, and
787791
* thus would infinitely loop on the current iterator.
788792
*/
789-
subs_map.remove(it->handle, connection_subscription_t(spep));
793+
subs_map.remove(it->handle, connection_session_t(state));
790794
it = idx.erase(it);
791795
match = true;
792796
break;
@@ -833,26 +837,33 @@ class test_broker {
833837
MQTT_NS::publish_options pubopts,
834838
MQTT_NS::v5::properties props) {
835839

836-
std::map< con_sp_t, MQTT_NS::qos > subscribers;
837-
838-
subs_map.find(topic, [&subscribers]( connection_subscription_t const &r){
839-
subscribers[r.con] = std::max(r.qos, subscribers[r.con]);
840+
subs_map.find(topic, []( connection_session_t const &r){
841+
if(r.session->staged_qos)
842+
r.session->staged_qos = std::max(*r.session->staged_qos, r.qos);
843+
else
844+
r.session->staged_qos = r.qos;
840845
});
841846

842-
for(auto const &i: subscribers) {
847+
auto & act_sess_idx = active_sessions_.get<tag_con>();
848+
for(auto const &session: act_sess_idx) {
843849
// publish the message to subscribers.
844850
// TODO: Probably this should be switched to async_publish?
845851
// Given the async_client / sync_client seperation
846852
// and the way they have different function names,
847853
// it wouldn't be possible for test_broker.hpp to be
848854
// used with some hypothetical "async_server" in the future.
849855

850-
i.first->publish(
851-
topic,
852-
contents,
853-
std::min(i.second, pubopts.get_qos()) | MQTT_NS::retain::no,
854-
props // TODO: Copying the properties vector for each subscription.
855-
);
856+
if(session->staged_qos) {
857+
session->con->publish(
858+
topic,
859+
contents,
860+
std::min(*session->staged_qos, pubopts.get_qos()) | MQTT_NS::retain::no,
861+
props // TODO: Copying the properties vector for each subscription.
862+
);
863+
864+
session->staged_qos.reset();
865+
}
866+
856867
}
857868

858869
{
@@ -945,6 +956,7 @@ class test_broker {
945956

946957
auto & act_sess_idx = active_sessions_.get<tag_con>();
947958
auto act_sess_it = act_sess_idx.find(spep);
959+
session_state_sp_t state = *act_sess_it;
948960

949961
// act_sess_it == act_sess_idx.end() could happen if broker accepts
950962
// the session from client but the client closes the session before sending
@@ -954,10 +966,10 @@ class test_broker {
954966

955967
MQTT_NS::buffer client_id;
956968
MQTT_NS::optional<MQTT_NS::will> will;
957-
bool session_clear = !act_sess_it->session_expiry_interval;
969+
bool session_clear = !(*act_sess_it)->session_expiry_interval;
958970
if (ep.clean_session() && session_clear) {
959-
client_id = std::move(act_sess_it->client_id);
960-
will = std::move(act_sess_it->will);
971+
client_id = std::move((*act_sess_it)->client_id);
972+
will = std::move((*act_sess_it)->will);
961973
act_sess_idx.erase(act_sess_it);
962974

963975
BOOST_ASSERT(active_sessions_.get<tag_client_id>().count(client_id) == 0);
@@ -969,13 +981,12 @@ class test_broker {
969981
BOOST_ASSERT(non_active_sessions_.get<tag_client_id>().count(client_id) == 0);
970982
BOOST_ASSERT(non_active_sessions_.get<tag_client_id>().find(client_id) == non_active_sessions_.get<tag_client_id>().end());
971983
}
972-
else {
973-
session_state state = std::move(*act_sess_it);
974-
client_id = state.client_id;
975-
will = std::move(state.will);
984+
else {
985+
client_id =state->client_id;
986+
will = std::move(state->will);
976987

977988
// TODO: Should yank out the messages from this connection object and store it in the session_state object??
978-
state.con.reset(); // clear the shared pointer, so it doesn't stay alive after this funciton ends.
989+
state->con.reset(); // clear the shared pointer, so it doesn't stay alive after this funciton ends.
979990
act_sess_idx.erase(act_sess_it);
980991
BOOST_ASSERT(active_sessions_.get<tag_client_id>().count(client_id) == 0);
981992
BOOST_ASSERT(active_sessions_.get<tag_client_id>().find(client_id) == active_sessions_.get<tag_client_id>().end());
@@ -1001,15 +1012,15 @@ class test_broker {
10011012
// In v3_1_1, sessin_expiry_interval is not set. So clean on close.
10021013
if (ep.clean_session() && session_clear) {
10031014
// Remove all subscriptions for this clientid
1004-
for(auto it: range) {
1005-
subs_map.remove(it.topic, connection_subscription_t(spep));
1015+
for(auto const &item: range) {
1016+
subs_map.remove(item.handle, connection_session_t(state));
10061017
}
10071018
idx.erase(range.begin(), range.end());
10081019
}
10091020
else if( ! range.empty()) {
10101021
// Save all the subscriptions for this clientid for later.
10111022
for(auto const& item : range) {
1012-
subs_map.remove(item.topic, connection_subscription_t(spep));
1023+
subs_map.remove(item.handle, connection_session_t(state));
10131024
auto const& ret = saved_subs_.emplace(client_id,
10141025
item.topic,
10151026
item.qos_value);
@@ -1084,12 +1095,17 @@ class test_broker {
10841095
MQTT_NS::optional<MQTT_NS::will> will;
10851096
MQTT_NS::optional<boost::posix_time::time_duration> will_delay;
10861097
MQTT_NS::optional<boost::posix_time::time_duration> session_expiry_interval;
1098+
1099+
// The staged Qos is used to calculate the max qos when a message is published
1100+
MQTT_NS::optional<MQTT_NS::qos> staged_qos;
10871101
};
10881102

1103+
using session_state_sp_t = std::shared_ptr<session_state>;
1104+
10891105
// The mi_active_sessions container holds the relevant data about an active connection with the broker.
10901106
// It can be queried either with the clientid, or with the shared pointer to the mqtt endpoint object
10911107
using mi_active_sessions = mi::multi_index_container<
1092-
session_state,
1108+
session_state_sp_t,
10931109
mi::indexed_by<
10941110
mi::ordered_unique<
10951111
mi::tag<tag_client_id>,
@@ -1105,7 +1121,7 @@ class test_broker {
11051121
// The mi_active_sessions container holds the relevant data about an active connection with the broker.
11061122
// It can be queried either with the clientid, or with the shared pointer to the mqtt endpoint object
11071123
using mi_non_active_sessions = mi::multi_index_container<
1108-
session_state,
1124+
session_state_sp_t,
11091125
mi::indexed_by<
11101126
mi::ordered_unique<
11111127
mi::tag<tag_client_id>,
@@ -1115,25 +1131,25 @@ class test_broker {
11151131
>;
11161132

11171133
// Subscription map tracks subscription -> connection mapping and allows searching with wildcards
1118-
struct connection_subscription_t
1134+
struct connection_session_t
11191135
{
1120-
con_sp_t con;
1136+
session_state_sp_t session;
11211137
MQTT_NS::qos qos;
11221138

1123-
connection_subscription_t(con_sp_t con, MQTT_NS::qos qos = MQTT_NS::qos())
1124-
: con(con), qos(qos)
1139+
connection_session_t(session_state_sp_t session, MQTT_NS::qos qos = MQTT_NS::qos())
1140+
: session(session), qos(qos)
11251141
{ }
11261142
};
11271143

1128-
struct connection_subscription_comp_t
1144+
struct connection_session_comp_t
11291145
{
1130-
inline bool operator()(connection_subscription_t const &a, connection_subscription_t const &b) const
1146+
inline bool operator()(connection_session_t const &a, connection_session_t const &b) const
11311147
{
1132-
return a.con < b.con;
1148+
return a.session.get() < b.session.get();
11331149
}
11341150
};
11351151

1136-
using subscription_map_t = multiple_subscription_map< connection_subscription_t, std::set<connection_subscription_t, connection_subscription_comp_t, std::allocator<connection_subscription_t> > >;
1152+
using subscription_map_t = multiple_subscription_map< connection_session_t, std::set<connection_session_t, connection_session_comp_t, std::allocator<connection_session_t> > >;
11371153

11381154
// Mapping between connection object and subscription topics
11391155
struct sub_con {

0 commit comments

Comments
 (0)