Skip to content

Commit 873b2af

Browse files
authored
Merge pull request #671 from jonesmz/subscription-wildcards
Add wildcard handling to topics in the test broker
2 parents 67839f5 + fcd479f commit 873b2af

File tree

1 file changed

+151
-43
lines changed

1 file changed

+151
-43
lines changed

test/test_broker.hpp

+151-43
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,103 @@ using con_sp_t = std::shared_ptr<endpoint_t>;
3232
using con_wp_t = std::weak_ptr<endpoint_t>;
3333
using packet_id_t = endpoint_t::packet_id_t;
3434

35+
inline bool validate_topic_pattern(MQTT_NS::string_view topicPattern)
36+
{
37+
/*
38+
* Confirm the topic pattern is valid before registering it.
39+
* Use rules from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106
40+
*/
41+
for(size_t idx = topicPattern.find_first_of("+#");
42+
MQTT_NS::string_view::npos != idx;
43+
idx = topicPattern.find_first_of("+#", idx+1)) {
44+
BOOST_ASSERT( ('+' == topicPattern[idx])
45+
|| ('#' == topicPattern[idx]));
46+
if('+' == topicPattern[idx]) {
47+
/*
48+
* Either must be the first character,
49+
* or be preceeded by a topic seperator.
50+
*/
51+
if((0 != idx) && ('/' != topicPattern[idx-1])) {
52+
return false;
53+
}
54+
55+
/*
56+
* Either must be the last character,
57+
* or be followed by a topic seperator.
58+
*/
59+
if((topicPattern.size()-1 != idx) && ('/' != topicPattern[idx+1])) {
60+
return false;
61+
}
62+
}
63+
// multilevel wildcard
64+
else {
65+
/*
66+
* Must be absolute last character.
67+
* Must only be one multi level wild card.
68+
*/
69+
if(idx != topicPattern.size()-1) {
70+
return false;
71+
}
72+
73+
/*
74+
* If not the first character, then the
75+
* immediately preceeding character must
76+
* be a topic level separator.
77+
*/
78+
if((0 != idx) && ('/' != topicPattern[idx-1])) {
79+
return false;
80+
}
81+
}
82+
}
83+
return true;
84+
}
85+
86+
inline bool compare_topic_pattern(MQTT_NS::string_view topicPattern, MQTT_NS::string_view topic)
87+
{
88+
BOOST_ASSERT(validate_topic_pattern(topicPattern));
89+
for(size_t idx = topicPattern.find_first_of("+#");
90+
MQTT_NS::string_view::npos != idx;
91+
idx = topicPattern.find_first_of("+#")) {
92+
BOOST_ASSERT( ('+' == topicPattern[idx])
93+
|| ('#' == topicPattern[idx]));
94+
if('+' == topicPattern[idx]) {
95+
// Compare everything up to the first +
96+
if(topicPattern.substr(0, idx) == topic.substr(0, idx)) {
97+
/*
98+
* We already know thanks to the topic pattern being validated
99+
* that the + symbol is directly touching '/'s on both sides
100+
* (if not the first or last character), so we don't need to
101+
* double check that.
102+
*
103+
* By simply removing the prefix that we've compared and letting
104+
* the loop continue, we get the proper comparison of the '/'s
105+
* automatically when the loop continues.
106+
*/
107+
topicPattern.remove_prefix(idx+1);
108+
/*
109+
* It's a bit more complicated for the incoming topic though
110+
* as we need to remove everything up to the next seperator.
111+
*/
112+
topic.remove_prefix(topic.find('/', idx));
113+
}
114+
else {
115+
return false;
116+
}
117+
}
118+
// multilevel wildcard
119+
else {
120+
/*
121+
* Compare up to where the multilevel wild card is found
122+
* and then anything after that matches the wildcard.
123+
*/
124+
return topicPattern.substr(0, idx) == topic.substr(0, idx);
125+
}
126+
}
127+
128+
// No + or # found in the remaining topic pattern. Just do a string compare.
129+
return topicPattern == topic;
130+
}
131+
35132
class test_broker {
36133
public:
37134
test_broker(as::io_context& ioc)
@@ -787,19 +884,20 @@ class test_broker {
787884
break;
788885
}
789886

887+
// Publish any retained messages that match the newly subscribed topic.
790888
for (auto const& e : entries) {
791-
MQTT_NS::buffer const& topic = std::get<0>(e);
792-
MQTT_NS::subscribe_options options = std::get<1>(e);
793-
// Publish any retained messages that match the newly subscribed topic.
794-
auto it = retains_.find(topic);
795-
if (it != retains_.end()) {
796-
ep.publish(
797-
as::buffer(it->topic),
798-
as::buffer(it->contents),
799-
std::min(it->qos_value, options.get_qos()) | MQTT_NS::retain::yes,
800-
it->props,
801-
std::make_pair(it->topic, it->contents)
802-
);
889+
for(auto const& retain : retains_) {
890+
MQTT_NS::buffer const& topic = std::get<0>(e);
891+
MQTT_NS::subscribe_options options = std::get<1>(e);
892+
if(compare_topic_pattern(topic, retain.topic)) {
893+
ep.publish(
894+
as::buffer(retain.topic),
895+
as::buffer(retain.contents),
896+
std::min(retain.qos_value, options.get_qos()) | MQTT_NS::retain::yes,
897+
retain.props,
898+
std::make_pair(retain.topic, retain.contents)
899+
);
900+
}
803901
}
804902
}
805903
return true;
@@ -884,49 +982,59 @@ class test_broker {
884982
MQTT_NS::publish_options pubopts,
885983
MQTT_NS::v5::properties props) {
886984
// For each active subscription registered for this topic
887-
for(auto const& sub : boost::make_iterator_range(subs_.get<tag_topic>().equal_range(topic))) {
888-
// publish the message to subscribers.
889-
// TODO: Probably this should be switched to async_publish?
890-
// Given the async_client / sync_client seperation
891-
// and the way they have different function names,
892-
// it wouldn't be possible for test_broker.hpp to be
893-
// used with some hypothetical "async_server" in the future.
894-
895-
// retain is delivered as the original only if rap_value is rap::retain.
896-
// On MQTT v3.1.1, rap_value is always rap::dont.
897-
auto retain =
898-
[&] {
899-
if (sub.rap_value == MQTT_NS::rap::retain) {
900-
return pubopts.get_retain();
901-
}
902-
return MQTT_NS::retain::no;
903-
} ();
904-
sub.con->publish(
905-
topic,
906-
contents,
907-
std::min(sub.qos_value, pubopts.get_qos()) | retain,
908-
props // TODO: Copying the properties vector for each subscription.
909-
);
985+
for(auto const& sub : subs_.get<tag_topic>()) {
986+
if(compare_topic_pattern(sub.topic, topic)) {
987+
// publish the message to subscribers.
988+
// TODO: Probably this should be switched to async_publish?
989+
// Given the async_client / sync_client seperation
990+
// and the way they have different function names,
991+
// it wouldn't be possible for test_broker.hpp to be
992+
// used with some hypothetical "async_server" in the future.
993+
994+
// retain is delivered as the original only if rap_value is rap::retain.
995+
// On MQTT v3.1.1, rap_value is always rap::dont.
996+
auto retain =
997+
[&] {
998+
if (sub.rap_value == MQTT_NS::rap::retain) {
999+
return pubopts.get_retain();
1000+
}
1001+
return MQTT_NS::retain::no;
1002+
} ();
1003+
sub.con->publish(
1004+
topic,
1005+
contents,
1006+
std::min(sub.qos_value, pubopts.get_qos()) | retain,
1007+
props // TODO: Copying the properties vector for each subscription.
1008+
);
1009+
}
9101010
}
9111011

9121012
{
9131013
// For each saved subscription, add this message to
9141014
// the list to be sent out when a connection resumes
9151015
// a lost session.
916-
//
917-
// TODO: This does not properly handle wildcards!
9181016
auto & idx = saved_subs_.get<tag_topic>();
919-
auto range = boost::make_iterator_range(idx.equal_range(topic));
920-
if( ! range.empty()) {
921-
auto sp_props = std::make_shared<MQTT_NS::v5::properties>(props);
922-
for(auto it = range.begin(); it != range.end(); std::advance(it, 1)) {
923-
idx.modify(it,
1017+
// Note: Only allocated if used.
1018+
std::shared_ptr<MQTT_NS::v5::properties> sp_props;
1019+
for(auto const& item : idx) {
1020+
if(compare_topic_pattern(item.topic, topic)) {
1021+
if(!sp_props) {
1022+
sp_props = std::make_shared<MQTT_NS::v5::properties>(props);
1023+
}
1024+
1025+
idx.modify(idx.iterator_to(item),
9241026
[&](session_subscription & val)
9251027
{
1028+
// Note: The description of session state here:
1029+
// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901230
1030+
// does not say that only one message is saved per topic in the state.
1031+
// That behavior is apparently only applicable to the 'retain' message
1032+
// storage, which is a global thing, and not a per-session thing.
1033+
// So it is correct that all messages should be stored in the session state.
9261034
val.messages.emplace_back(
9271035
contents,
9281036
sp_props,
929-
std::min(it->qos_value, pubopts.get_qos()));
1037+
std::min(val.qos_value, pubopts.get_qos()));
9301038
},
9311039
[](session_subscription&) { BOOST_ASSERT(false); });
9321040
}

0 commit comments

Comments
 (0)