Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions test/path_tokenizer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright wkl04 2019
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#if !defined(MQTT_PATH_TOKENIZER_HPP)
#define MQTT_PATH_TOKENIZER_HPP

#include <algorithm>

#include <mqtt/namespace.hpp>
#include <mqtt/string_view.hpp>

static constexpr char mqtt_level_separator = '/';

template<typename Iterator, typename Output>
inline void mqtt_path_tokenizer(Iterator first, Iterator last, Output write) {
auto pos = std::find(first, last, mqtt_level_separator);
while (write(first, pos) && pos != last) {
first = std::next(pos);
pos = std::find(first, last, mqtt_level_separator);
}
}


template<typename Output>
inline void mqtt_path_tokenizer(MQTT_NS::string_view str, Output write) {
mqtt_path_tokenizer(
std::begin(str),
std::end(str),
[&write](MQTT_NS::string_view::const_iterator token_begin, MQTT_NS::string_view::const_iterator token_end) {
return write(MQTT_NS::string_view(token_begin, std::distance(token_begin, token_end)));
}
);
}

// See if the topic is valid (does not contain # or +)
inline bool mqtt_valid_topic(MQTT_NS::string_view topic) {
/*
* Confirm the topic pattern is valid before using it.
* Use rules from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106
*/
for (size_t idx = topic.find_first_of("+#");
MQTT_NS::string_view::npos != idx;
idx = topic.find_first_of("+#", idx+1)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly recommend using the new version of this function from #672

It includes several fixes for bugs that I found when I added new tests.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry ? Do you mean the current (merged) wildcard support has bugs and you fixed it in #672 ?
I thought that #672 is for shared subscriptions, not wildcard. But it seems that #672 contains wildcard bug fix.

I have no plan to merge #672. So it seems that I need to separate wildcard fixed part and create the new PR to fix wildcard support. And then merge the new PR.

I don't now much about path_tokenizer.hpp. Hopefully @kleunen , or I reflect the new PR to path_tokenizer.hpp someday before merging #676

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added #681 . It is from #672 . I will merge it after all tests passed. I will do soon.

Then reflect the fix to path_tokenizer.hpp. It is lower priority for me.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#681 merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dear redboltz. Thank you for your effort in merging. I will have a look at the questions you have.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kleunen , thank you for the reply.

I think that there are two topics to do.

The first one is simple. It relates to this thread. It seems that @jonesmz refined and fixed some bugs on his version of wildcard mechanism. And I believe that I already merged the fix by #681. I guess that the same problem is in path_tokenizer.hpp. I expect that you would fix it. In order to do that, you can write a PR and its target branch is new_wc.

The second one is bigger. I updated your code on new_wc. But test_broker.hpp is not yet updated because I need to understand the your design concept. I guess that you could do better.
Thanks to @jonesmz comment, I understand that the current wildcard approach is based on linear search algorithm. But your approach is based on trie algorithm. So your algorithm is more efficient. I want to replace the algorithm.

NOTE: I'm implementing other MQTT v5 feature now. I might force push to new_wc. If it avoids your work, please let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redboltz. I looked at fix #681, it is not completely clear to me I understand what you fixed. I think if I read the code, you added a compare_filter and subscriptions get merged when two subscriptions match: one has wildcard, but other does not, but they both resolve to the same topic, is that correct ?

Or is it, the these rules from specification were not followed correctly ?
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247

I also see this comment in the code which is important for the matching:
// TODO: The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding your second question, yes, the datastructure is like a trie or a multi-dimensional (or multi-level) associative array. The subscription is split into the respective components of the path ( l1/l2/l3 is split into l1, l2, l3) and the value is stored at the respective key.
Key for dimension or level 1 is: l1
Key for dimension 2 is: l2
Key for dimension 3 is: l3
So: map[l1][l2][l3] = value

Only thing is the lookup operation of the map, allows for matching wildcards, so:
map[l1][l2][l3] returns -> value

But i case the key is a wildcard:
map[l1][l2][*] return -> wildcard_value

Several lookups will return this value:
map[l1][l2][something] returns -> wildcard_value
map[l1][l2][l3][l4][l5] returns -> wildcard_value

map[something][l2][l3] -> Does not return wildcard_value

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redboltz. I looked at fix #681, it is not completely clear to me I understand what you fixed. I think if I read the code, you added a compare_filter and subscriptions get merged when two subscriptions match: one has wildcard, but other does not, but they both resolve to the same topic, is that correct ?

I'm not sure about the fix. I guess that original temporary wildcard support contains some bugs and they are fixed by #681.

compare_topic_filter() is a comparison function for linear search. Please see calling locations.

compare_topic_filter() is called when publish packet delivering and when retained publish delivering on subscribe process.
The first parameter topic_filter is subscription's topic filter. It might contain wildcard. The second parameter topic_name is from publish packet. If it is matched, then published packet is delivered.

I think that compare_topic_filter() is replaced with trie version algorithm that you would modify test_broker.hpp in the future.

Or is it, the these rules from specification were not followed correctly ?
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247

I don't understand what do you mean at the above.

I also see this comment in the code which is important for the matching:
// TODO: The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character

So far, '$' (preserved for broker) prefix is not cared. It should be cared the new (trie) version.

I'm not sure I answered all of your question. Please let me know if you have further questions.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding your second question, yes, the datastructure is like a trie or a multi-dimensional (or multi-level) associative array. The subscription is split into the respective components of the path ( l1/l2/l3 is split into l1, l2, l3) and the value is stored at the respective key.
Key for dimension or level 1 is: l1
Key for dimension 2 is: l2
Key for dimension 3 is: l3
So: map[l1][l2][l3] = value

Only thing is the lookup operation of the map, allows for matching wildcards, so:
map[l1][l2][l3] returns -> value

But i case the key is a wildcard:
map[l1][l2][*] return -> wildcard_value

Several lookups will return this value:
map[l1][l2][something] returns -> wildcard_value
map[l1][l2][l3][l4][l5] returns -> wildcard_value

map[something][l2][l3] -> Does not return wildcard_value

Does this work with a/+/b case ?

if ('+' == topic[idx]) {
/*
* Either must be the first character,
* or be preceeded by a topic seperator.
*/
if ((0 != idx) && (mqtt_level_separator != topic[idx-1])) {
return false;
}
/*
* Either must be the last character,
* or be followed by a topic seperator.
*/
if ((topic.size()-1 != idx) && (mqtt_level_separator != topic[idx+1])) {
return false;
}
}
else { // multilevel wildcard
/*
* Must be absolute last character.
* Must only be one multi level wild card.
*/
if (idx != topic.size()-1) {
return false;
}
/*
* If not the first character, then the
* immediately preceeding character must
* be a topic level separator.
*/
if ((0 != idx) && (mqtt_level_separator != topic[idx-1])) {
return false;
}
}
}

return !topic.empty();
}

// See if the subscription is valid
inline bool mqtt_valid_subscription(MQTT_NS::string_view subscription) {
if (subscription.empty()) {
return false;
}

// If hash position is found, it should be the last character
const size_t hash_position = subscription.find('#');
return (hash_position == MQTT_NS::string_view::npos) || (hash_position == subscription.size() - 1);
}

#endif // !defined(MQTT_PATH_TOKENIZER_HPP)
253 changes: 253 additions & 0 deletions test/retained_topic_map.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright wkl04 2019
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#if !defined(MQTT_RETAINED_TOPIC_MAP_HPP)
#define MQTT_RETAINED_TOPIC_MAP_HPP

#include <mqtt/string_view.hpp>
#include <map>
#include <boost/optional.hpp>
#include "path_tokenizer.h"

template<typename Value>
class retained_topic_map {
using node_id_t = size_t;
using path_entry_key = std::pair<node_id_t, MQTT_NS::buffer>;

static constexpr node_id_t root_node_id = 0;

struct path_entry {
node_id_t id;
size_t count = 1;

static constexpr size_t max_count = std::numeric_limits<size_t>::max();

boost::optional<Value> value;

path_entry(node_id_t _id)
: id(_id)
{ }
};

using map_type = std::map< path_entry_key, path_entry >;
using map_type_iterator = typename map_type::iterator;
using map_type_const_iterator = typename map_type::const_iterator;

map_type map;
map_type_iterator root;
node_id_t next_node_id;

map_type_iterator create_topic(MQTT_NS::string_view topic) {
map_type_iterator parent = root;

// Check on root entry if we can still add an entry
if (parent->second.count == path_entry::max_count) {
throw std::overflow_error("Maximum number of topics reached");
}

mqtt_path_tokenizer(
topic,
[this, &parent](MQTT_NS::string_view t) {
if (t == "+" || t == "#") {
throw std::runtime_error("No wildcards allowed in retained topic name");
}

node_id_t parent_id = parent->second.id;
map_type_iterator entry = map.find(path_entry_key(parent_id, t));

if (entry == map.end()) {
entry = map.emplace(path_entry_key(parent_id, MQTT_NS::allocate_buffer(t)), path_entry(next_node_id++)).first;
if (next_node_id == std::numeric_limits<typeof(next_node_id)>::max()) {
throw std::overflow_error("Maximum number of topics reached");
}
}
else {
entry->second.count++;
}

parent = entry;
return true;
}
);

return parent;
}

std::vector<map_type_iterator> find_topic(MQTT_NS::string_view topic) {
std::vector<map_type_iterator> path;
map_type_iterator parent = root;

mqtt_path_tokenizer(
topic,
[this, &parent, &path](MQTT_NS::string_view t) {
map_type_iterator entry = map.find(path_entry_key(parent->second.id, t));

if (entry == map.end()) {
path = std::vector<map_type_iterator>();
return false;
}

path.push_back(entry);
parent = entry;
return true;
}
);

return path;
}

// Match all underlying topics when a hash entry is matched
// perform a breadth-first iteration over all items in the tree below
template<typename Output>
void match_hash_entries(node_id_t parent, Output callback, bool ignore_system) const {
std::deque<node_id_t> entries;
entries.push_back(parent);
std::deque<node_id_t> new_entries;

while (!entries.empty()) {
new_entries.resize(0);

for (auto root : entries) {
// Find all entries below this node
for (map_type_const_iterator i = map.lower_bound(path_entry_key(root, MQTT_NS::string_view(""))); i != map.end(); ++i) {
if(i->first.first != root) {
break;
}

// Should we ignore system matches
if (!ignore_system || (i->first.second.empty() ? true : i->first.second[0] != '$')) {
if (i->second.value) {
callback(*i->second.value);
}

new_entries.push_back(i->second.id);
}
}
}

// Ignore system only on first level
ignore_system = false;
std::swap(entries, new_entries);
}

}

// Find all topics that match the specified subscription
template<typename Output>
void find_match(MQTT_NS::string_view subscription, Output callback) const {
std::deque<map_type_const_iterator> entries;
entries.push_back(root);

std::deque<map_type_const_iterator> new_entries;
mqtt_path_tokenizer(
subscription,
[this, &entries, &new_entries, &callback](MQTT_NS::string_view t) {
new_entries.resize(0);

for (auto const& entry : entries) {
node_id_t parent = entry->second.id;

if (t == MQTT_NS::string_view("+")) {
for (map_type_const_iterator i = map.lower_bound(path_entry_key(parent, MQTT_NS::string_view("")));
i != map.end();
++i) {
if (i->first.first == parent &&
(parent != root_node_id || (i->first.second.empty() ? true : i->first.second[0] != '$'))
) {
new_entries.push_back(i);
}
else {
break;
}
}
}
else if (t == MQTT_NS::string_view("#")) {
match_hash_entries(parent, callback, parent == root_node_id);
return false;
}
else {
map_type_const_iterator i = map.find(path_entry_key(parent, t));
if (i != map.end()) {
new_entries.push_back(i);
}
}
}

std::swap(new_entries, entries);
return !entries.empty();
}
);

for (auto const& entry : entries) {
if (entry->second.value) {
callback(*entry->second.value);
}
}
}

// Remove a value at the specified subscription path
bool remove_topic(MQTT_NS::string_view topic) {
auto path = find_topic(topic);
if (path.empty()) {
return false;
}

std::vector<path_entry_key> remove_keys;
remove_keys.reserve(path.size());

for (auto entry : path) {
--entry->second.count;
if (entry->second.count == 0) {
remove_keys.push_back(entry->first);
}
}

for(auto i : remove_keys) {
map.erase(i);
}

return true;
}

public:
retained_topic_map()
: next_node_id(root_node_id)
{
// Create the root node
root =
map.emplace(
path_entry_key(
std::numeric_limits<node_id_t>::max(),
MQTT_NS::allocate_buffer("")
),
path_entry(root_node_id)
).first;
++next_node_id;
}

// Insert a value at the specified subscription path
void insert_or_update(MQTT_NS::string_view topic, Value const& value) {
auto path = find_topic(topic);
if (path.empty()) {
this->create_topic(topic)->second.value = value;
}
else {
path.back()->second.value = value;
}
}

// Find all stored topics that math the specified subscription
void find(MQTT_NS::string_view subscription, std::function< void (Value const&) > const& callback) const {
find_match(subscription, callback);
}

// Remove a stored value at the specified topic
void remove(MQTT_NS::string_view topic) {
remove_topic(topic);
}
};

#endif // !defined(MQTT_RETAINED_TOPIC_MAP_HPP)
Loading