-
Notifications
You must be signed in to change notification settings - Fork 113
Added basic part of new wildcard mechanism. #676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| // Copyright wkl04 2019 | ||
| // | ||
| // Distributed under the Boost Software License, Version 1.0. | ||
| // (See accompanying file LICENSE_1_0.txt or copy at | ||
| // http://www.boost.org/LICENSE_1_0.txt) | ||
|
|
||
| #if !defined(MQTT_PATH_TOKENIZER_HPP) | ||
| #define MQTT_PATH_TOKENIZER_HPP | ||
|
|
||
| #include <algorithm> | ||
|
|
||
| #include <mqtt/namespace.hpp> | ||
| #include <mqtt/string_view.hpp> | ||
|
|
||
| static constexpr char mqtt_level_separator = '/'; | ||
|
|
||
| template<typename Iterator, typename Output> | ||
| inline void mqtt_path_tokenizer(Iterator first, Iterator last, Output write) { | ||
| auto pos = std::find(first, last, mqtt_level_separator); | ||
| while (write(first, pos) && pos != last) { | ||
| first = std::next(pos); | ||
| pos = std::find(first, last, mqtt_level_separator); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| template<typename Output> | ||
| inline void mqtt_path_tokenizer(MQTT_NS::string_view str, Output write) { | ||
| mqtt_path_tokenizer( | ||
| std::begin(str), | ||
| std::end(str), | ||
| [&write](MQTT_NS::string_view::const_iterator token_begin, MQTT_NS::string_view::const_iterator token_end) { | ||
| return write(MQTT_NS::string_view(token_begin, std::distance(token_begin, token_end))); | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| // See if the topic is valid (does not contain # or +) | ||
| inline bool mqtt_valid_topic(MQTT_NS::string_view topic) { | ||
| /* | ||
| * Confirm the topic pattern is valid before using it. | ||
| * Use rules from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106 | ||
| */ | ||
| for (size_t idx = topic.find_first_of("+#"); | ||
| MQTT_NS::string_view::npos != idx; | ||
| idx = topic.find_first_of("+#", idx+1)) { | ||
| if ('+' == topic[idx]) { | ||
| /* | ||
| * Either must be the first character, | ||
| * or be preceeded by a topic seperator. | ||
| */ | ||
| if ((0 != idx) && (mqtt_level_separator != topic[idx-1])) { | ||
| return false; | ||
| } | ||
| /* | ||
| * Either must be the last character, | ||
| * or be followed by a topic seperator. | ||
| */ | ||
| if ((topic.size()-1 != idx) && (mqtt_level_separator != topic[idx+1])) { | ||
| return false; | ||
| } | ||
| } | ||
| else { // multilevel wildcard | ||
| /* | ||
| * Must be absolute last character. | ||
| * Must only be one multi level wild card. | ||
| */ | ||
| if (idx != topic.size()-1) { | ||
| return false; | ||
| } | ||
| /* | ||
| * If not the first character, then the | ||
| * immediately preceeding character must | ||
| * be a topic level separator. | ||
| */ | ||
| if ((0 != idx) && (mqtt_level_separator != topic[idx-1])) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return !topic.empty(); | ||
| } | ||
|
|
||
| // See if the subscription is valid | ||
| inline bool mqtt_valid_subscription(MQTT_NS::string_view subscription) { | ||
| if (subscription.empty()) { | ||
| return false; | ||
| } | ||
|
|
||
| // If hash position is found, it should be the last character | ||
| const size_t hash_position = subscription.find('#'); | ||
| return (hash_position == MQTT_NS::string_view::npos) || (hash_position == subscription.size() - 1); | ||
| } | ||
|
|
||
| #endif // !defined(MQTT_PATH_TOKENIZER_HPP) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,253 @@ | ||
| // Copyright wkl04 2019 | ||
| // | ||
| // Distributed under the Boost Software License, Version 1.0. | ||
| // (See accompanying file LICENSE_1_0.txt or copy at | ||
| // http://www.boost.org/LICENSE_1_0.txt) | ||
|
|
||
| #if !defined(MQTT_RETAINED_TOPIC_MAP_HPP) | ||
| #define MQTT_RETAINED_TOPIC_MAP_HPP | ||
|
|
||
| #include <mqtt/string_view.hpp> | ||
| #include <map> | ||
| #include <boost/optional.hpp> | ||
| #include "path_tokenizer.h" | ||
|
|
||
| template<typename Value> | ||
| class retained_topic_map { | ||
| using node_id_t = size_t; | ||
| using path_entry_key = std::pair<node_id_t, MQTT_NS::buffer>; | ||
|
|
||
| static constexpr node_id_t root_node_id = 0; | ||
|
|
||
| struct path_entry { | ||
| node_id_t id; | ||
| size_t count = 1; | ||
|
|
||
| static constexpr size_t max_count = std::numeric_limits<size_t>::max(); | ||
|
|
||
| boost::optional<Value> value; | ||
|
|
||
| path_entry(node_id_t _id) | ||
| : id(_id) | ||
| { } | ||
| }; | ||
|
|
||
| using map_type = std::map< path_entry_key, path_entry >; | ||
| using map_type_iterator = typename map_type::iterator; | ||
| using map_type_const_iterator = typename map_type::const_iterator; | ||
|
|
||
| map_type map; | ||
| map_type_iterator root; | ||
| node_id_t next_node_id; | ||
|
|
||
| map_type_iterator create_topic(MQTT_NS::string_view topic) { | ||
| map_type_iterator parent = root; | ||
|
|
||
| // Check on root entry if we can still add an entry | ||
| if (parent->second.count == path_entry::max_count) { | ||
| throw std::overflow_error("Maximum number of topics reached"); | ||
| } | ||
|
|
||
| mqtt_path_tokenizer( | ||
| topic, | ||
| [this, &parent](MQTT_NS::string_view t) { | ||
| if (t == "+" || t == "#") { | ||
| throw std::runtime_error("No wildcards allowed in retained topic name"); | ||
| } | ||
|
|
||
| node_id_t parent_id = parent->second.id; | ||
| map_type_iterator entry = map.find(path_entry_key(parent_id, t)); | ||
|
|
||
| if (entry == map.end()) { | ||
| entry = map.emplace(path_entry_key(parent_id, MQTT_NS::allocate_buffer(t)), path_entry(next_node_id++)).first; | ||
| if (next_node_id == std::numeric_limits<typeof(next_node_id)>::max()) { | ||
| throw std::overflow_error("Maximum number of topics reached"); | ||
| } | ||
| } | ||
| else { | ||
| entry->second.count++; | ||
| } | ||
|
|
||
| parent = entry; | ||
| return true; | ||
| } | ||
| ); | ||
|
|
||
| return parent; | ||
| } | ||
|
|
||
| std::vector<map_type_iterator> find_topic(MQTT_NS::string_view topic) { | ||
| std::vector<map_type_iterator> path; | ||
| map_type_iterator parent = root; | ||
|
|
||
| mqtt_path_tokenizer( | ||
| topic, | ||
| [this, &parent, &path](MQTT_NS::string_view t) { | ||
| map_type_iterator entry = map.find(path_entry_key(parent->second.id, t)); | ||
|
|
||
| if (entry == map.end()) { | ||
| path = std::vector<map_type_iterator>(); | ||
| return false; | ||
| } | ||
|
|
||
| path.push_back(entry); | ||
| parent = entry; | ||
| return true; | ||
| } | ||
| ); | ||
|
|
||
| return path; | ||
| } | ||
|
|
||
| // Match all underlying topics when a hash entry is matched | ||
| // perform a breadth-first iteration over all items in the tree below | ||
| template<typename Output> | ||
| void match_hash_entries(node_id_t parent, Output callback, bool ignore_system) const { | ||
| std::deque<node_id_t> entries; | ||
| entries.push_back(parent); | ||
| std::deque<node_id_t> new_entries; | ||
|
|
||
| while (!entries.empty()) { | ||
| new_entries.resize(0); | ||
|
|
||
| for (auto root : entries) { | ||
| // Find all entries below this node | ||
| for (map_type_const_iterator i = map.lower_bound(path_entry_key(root, MQTT_NS::string_view(""))); i != map.end(); ++i) { | ||
| if(i->first.first != root) { | ||
| break; | ||
| } | ||
|
|
||
| // Should we ignore system matches | ||
| if (!ignore_system || (i->first.second.empty() ? true : i->first.second[0] != '$')) { | ||
| if (i->second.value) { | ||
| callback(*i->second.value); | ||
| } | ||
|
|
||
| new_entries.push_back(i->second.id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Ignore system only on first level | ||
| ignore_system = false; | ||
| std::swap(entries, new_entries); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| // Find all topics that match the specified subscription | ||
| template<typename Output> | ||
| void find_match(MQTT_NS::string_view subscription, Output callback) const { | ||
| std::deque<map_type_const_iterator> entries; | ||
| entries.push_back(root); | ||
|
|
||
| std::deque<map_type_const_iterator> new_entries; | ||
| mqtt_path_tokenizer( | ||
| subscription, | ||
| [this, &entries, &new_entries, &callback](MQTT_NS::string_view t) { | ||
| new_entries.resize(0); | ||
|
|
||
| for (auto const& entry : entries) { | ||
| node_id_t parent = entry->second.id; | ||
|
|
||
| if (t == MQTT_NS::string_view("+")) { | ||
| for (map_type_const_iterator i = map.lower_bound(path_entry_key(parent, MQTT_NS::string_view(""))); | ||
| i != map.end(); | ||
| ++i) { | ||
| if (i->first.first == parent && | ||
| (parent != root_node_id || (i->first.second.empty() ? true : i->first.second[0] != '$')) | ||
| ) { | ||
| new_entries.push_back(i); | ||
| } | ||
| else { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| else if (t == MQTT_NS::string_view("#")) { | ||
| match_hash_entries(parent, callback, parent == root_node_id); | ||
| return false; | ||
| } | ||
| else { | ||
| map_type_const_iterator i = map.find(path_entry_key(parent, t)); | ||
| if (i != map.end()) { | ||
| new_entries.push_back(i); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| std::swap(new_entries, entries); | ||
| return !entries.empty(); | ||
| } | ||
| ); | ||
|
|
||
| for (auto const& entry : entries) { | ||
| if (entry->second.value) { | ||
| callback(*entry->second.value); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Remove a value at the specified subscription path | ||
| bool remove_topic(MQTT_NS::string_view topic) { | ||
| auto path = find_topic(topic); | ||
| if (path.empty()) { | ||
| return false; | ||
| } | ||
|
|
||
| std::vector<path_entry_key> remove_keys; | ||
| remove_keys.reserve(path.size()); | ||
|
|
||
| for (auto entry : path) { | ||
| --entry->second.count; | ||
| if (entry->second.count == 0) { | ||
| remove_keys.push_back(entry->first); | ||
| } | ||
| } | ||
|
|
||
| for(auto i : remove_keys) { | ||
| map.erase(i); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| public: | ||
| retained_topic_map() | ||
| : next_node_id(root_node_id) | ||
| { | ||
| // Create the root node | ||
| root = | ||
| map.emplace( | ||
| path_entry_key( | ||
| std::numeric_limits<node_id_t>::max(), | ||
| MQTT_NS::allocate_buffer("") | ||
| ), | ||
| path_entry(root_node_id) | ||
| ).first; | ||
| ++next_node_id; | ||
| } | ||
|
|
||
| // Insert a value at the specified subscription path | ||
| void insert_or_update(MQTT_NS::string_view topic, Value const& value) { | ||
| auto path = find_topic(topic); | ||
| if (path.empty()) { | ||
| this->create_topic(topic)->second.value = value; | ||
| } | ||
| else { | ||
| path.back()->second.value = value; | ||
| } | ||
| } | ||
|
|
||
| // Find all stored topics that math the specified subscription | ||
| void find(MQTT_NS::string_view subscription, std::function< void (Value const&) > const& callback) const { | ||
| find_match(subscription, callback); | ||
| } | ||
|
|
||
| // Remove a stored value at the specified topic | ||
| void remove(MQTT_NS::string_view topic) { | ||
| remove_topic(topic); | ||
| } | ||
| }; | ||
|
|
||
| #endif // !defined(MQTT_RETAINED_TOPIC_MAP_HPP) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly recommend using the new version of this function from #672
It includes several fixes for bugs that I found when I added new tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry ? Do you mean the current (merged) wildcard support has bugs and you fixed it in #672 ?
I thought that #672 is for shared subscriptions, not wildcard. But it seems that #672 contains wildcard bug fix.
I have no plan to merge #672. So it seems that I need to separate wildcard fixed part and create the new PR to fix wildcard support. And then merge the new PR.
I don't now much about
path_tokenizer.hpp. Hopefully @kleunen , or I reflect the new PR to path_tokenizer.hpp someday before merging #676There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added #681 . It is from #672 . I will merge it after all tests passed. I will do soon.
Then reflect the fix to path_tokenizer.hpp. It is lower priority for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#681 merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dear redboltz. Thank you for your effort in merging. I will have a look at the questions you have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kleunen , thank you for the reply.
I think that there are two topics to do.
The first one is simple. It relates to this thread. It seems that @jonesmz refined and fixed some bugs on his version of wildcard mechanism. And I believe that I already merged the fix by #681. I guess that the same problem is in path_tokenizer.hpp. I expect that you would fix it. In order to do that, you can write a PR and its target branch is
new_wc.The second one is bigger. I updated your code on
new_wc. But test_broker.hpp is not yet updated because I need to understand the your design concept. I guess that you could do better.Thanks to @jonesmz comment, I understand that the current wildcard approach is based on linear search algorithm. But your approach is based on trie algorithm. So your algorithm is more efficient. I want to replace the algorithm.
NOTE: I'm implementing other MQTT v5 feature now. I might force push to new_wc. If it avoids your work, please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redboltz. I looked at fix #681, it is not completely clear to me I understand what you fixed. I think if I read the code, you added a compare_filter and subscriptions get merged when two subscriptions match: one has wildcard, but other does not, but they both resolve to the same topic, is that correct ?
Or is it, the these rules from specification were not followed correctly ?
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901247
I also see this comment in the code which is important for the matching:
// TODO: The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding your second question, yes, the datastructure is like a trie or a multi-dimensional (or multi-level) associative array. The subscription is split into the respective components of the path ( l1/l2/l3 is split into l1, l2, l3) and the value is stored at the respective key.
Key for dimension or level 1 is: l1
Key for dimension 2 is: l2
Key for dimension 3 is: l3
So: map[l1][l2][l3] = value
Only thing is the lookup operation of the map, allows for matching wildcards, so:
map[l1][l2][l3] returns -> value
But i case the key is a wildcard:
map[l1][l2][*] return -> wildcard_value
Several lookups will return this value:
map[l1][l2][something] returns -> wildcard_value
map[l1][l2][l3][l4][l5] returns -> wildcard_value
map[something][l2][l3] -> Does not return wildcard_value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the fix. I guess that original temporary wildcard support contains some bugs and they are fixed by #681.
compare_topic_filter() is a comparison function for linear search. Please see calling locations.
compare_topic_filter() is called when publish packet delivering and when retained publish delivering on subscribe process.
The first parameter topic_filter is subscription's topic filter. It might contain wildcard. The second parameter topic_name is from publish packet. If it is matched, then published packet is delivered.
I think that compare_topic_filter() is replaced with trie version algorithm that you would modify test_broker.hpp in the future.
I don't understand what do you mean at the above.
So far, '$' (preserved for broker) prefix is not cared. It should be cared the new (trie) version.
I'm not sure I answered all of your question. Please let me know if you have further questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work with
a/+/bcase ?