Skip to content

Commit a897c81

Browse files
authored
Merge pull request #685 from redboltz/new_wc2
Added basic part of new wildcard mechanism.
2 parents 67b1953 + d035c5b commit a897c81

File tree

4 files changed

+790
-1
lines changed

4 files changed

+790
-1
lines changed

test/retained_topic_map.hpp

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
// Copyright wkl04 2019
2+
//
3+
// Distributed under the Boost Software License, Version 1.0.
4+
// (See accompanying file LICENSE_1_0.txt or copy at
5+
// http://www.boost.org/LICENSE_1_0.txt)
6+
7+
#if !defined(MQTT_RETAINED_TOPIC_MAP_HPP)
8+
#define MQTT_RETAINED_TOPIC_MAP_HPP
9+
10+
#include <mqtt/string_view.hpp>
11+
#include <map>
12+
#include <boost/optional.hpp>
13+
#include "topic_filter_tokenizer.hpp"
14+
15+
template<typename Value>
16+
class retained_topic_map {
17+
using node_id_t = size_t;
18+
using path_entry_key = std::pair<node_id_t, MQTT_NS::buffer>;
19+
20+
static constexpr node_id_t root_node_id = 0;
21+
22+
struct path_entry {
23+
node_id_t id;
24+
size_t count = 1;
25+
26+
static constexpr size_t max_count = std::numeric_limits<size_t>::max();
27+
28+
boost::optional<Value> value;
29+
30+
path_entry(node_id_t _id)
31+
: id(_id)
32+
{ }
33+
};
34+
35+
using map_type = std::map< path_entry_key, path_entry >;
36+
using map_type_iterator = typename map_type::iterator;
37+
using map_type_const_iterator = typename map_type::const_iterator;
38+
39+
map_type map;
40+
map_type_iterator root;
41+
node_id_t next_node_id;
42+
43+
map_type_iterator create_topic(MQTT_NS::string_view topic) {
44+
map_type_iterator parent = root;
45+
46+
// Check on root entry if we can still add an entry
47+
if (parent->second.count == path_entry::max_count) {
48+
throw std::overflow_error("Maximum number of topics reached");
49+
}
50+
51+
topic_filter_tokenizer(
52+
topic,
53+
[this, &parent](MQTT_NS::string_view t) {
54+
if (t == "+" || t == "#") {
55+
throw std::runtime_error("No wildcards allowed in retained topic name");
56+
}
57+
58+
node_id_t parent_id = parent->second.id;
59+
map_type_iterator entry = map.find(path_entry_key(parent_id, t));
60+
61+
if (entry == map.end()) {
62+
entry = map.emplace(path_entry_key(parent_id, MQTT_NS::allocate_buffer(t)), path_entry(next_node_id++)).first;
63+
if (next_node_id == std::numeric_limits<typeof(next_node_id)>::max()) {
64+
throw std::overflow_error("Maximum number of topics reached");
65+
}
66+
}
67+
else {
68+
entry->second.count++;
69+
}
70+
71+
parent = entry;
72+
return true;
73+
}
74+
);
75+
76+
return parent;
77+
}
78+
79+
std::vector<map_type_iterator> find_topic(MQTT_NS::string_view topic) {
80+
std::vector<map_type_iterator> path;
81+
map_type_iterator parent = root;
82+
83+
topic_filter_tokenizer(
84+
topic,
85+
[this, &parent, &path](MQTT_NS::string_view t) {
86+
map_type_iterator entry = map.find(path_entry_key(parent->second.id, t));
87+
88+
if (entry == map.end()) {
89+
path = std::vector<map_type_iterator>();
90+
return false;
91+
}
92+
93+
path.push_back(entry);
94+
parent = entry;
95+
return true;
96+
}
97+
);
98+
99+
return path;
100+
}
101+
102+
// Match all underlying topics when a hash entry is matched
103+
// perform a breadth-first iteration over all items in the tree below
104+
template<typename Output>
105+
void match_hash_entries(node_id_t parent, Output callback, bool ignore_system) const {
106+
std::deque<node_id_t> entries;
107+
entries.push_back(parent);
108+
std::deque<node_id_t> new_entries;
109+
110+
while (!entries.empty()) {
111+
new_entries.resize(0);
112+
113+
for (auto root : entries) {
114+
// Find all entries below this node
115+
for (map_type_const_iterator i = map.lower_bound(path_entry_key(root, MQTT_NS::string_view(""))); i != map.end(); ++i) {
116+
if(i->first.first != root) {
117+
break;
118+
}
119+
120+
// Should we ignore system matches
121+
if (!ignore_system || (i->first.second.empty() ? true : i->first.second[0] != '$')) {
122+
if (i->second.value) {
123+
callback(*i->second.value);
124+
}
125+
126+
new_entries.push_back(i->second.id);
127+
}
128+
}
129+
}
130+
131+
// Ignore system only on first level
132+
ignore_system = false;
133+
std::swap(entries, new_entries);
134+
}
135+
136+
}
137+
138+
// Find all topics that match the specified subscription
139+
template<typename Output>
140+
void find_match(MQTT_NS::string_view subscription, Output callback) const {
141+
std::deque<map_type_const_iterator> entries;
142+
entries.push_back(root);
143+
144+
std::deque<map_type_const_iterator> new_entries;
145+
topic_filter_tokenizer(
146+
subscription,
147+
[this, &entries, &new_entries, &callback](MQTT_NS::string_view t) {
148+
new_entries.resize(0);
149+
150+
for (auto const& entry : entries) {
151+
node_id_t parent = entry->second.id;
152+
153+
if (t == MQTT_NS::string_view("+")) {
154+
for (map_type_const_iterator i = map.lower_bound(path_entry_key(parent, MQTT_NS::string_view("")));
155+
i != map.end();
156+
++i) {
157+
if (i->first.first == parent &&
158+
(parent != root_node_id || (i->first.second.empty() ? true : i->first.second[0] != '$'))
159+
) {
160+
new_entries.push_back(i);
161+
}
162+
else {
163+
break;
164+
}
165+
}
166+
}
167+
else if (t == MQTT_NS::string_view("#")) {
168+
match_hash_entries(parent, callback, parent == root_node_id);
169+
return false;
170+
}
171+
else {
172+
map_type_const_iterator i = map.find(path_entry_key(parent, t));
173+
if (i != map.end()) {
174+
new_entries.push_back(i);
175+
}
176+
}
177+
}
178+
179+
std::swap(new_entries, entries);
180+
return !entries.empty();
181+
}
182+
);
183+
184+
for (auto const& entry : entries) {
185+
if (entry->second.value) {
186+
callback(*entry->second.value);
187+
}
188+
}
189+
}
190+
191+
// Remove a value at the specified subscription path
192+
bool remove_topic(MQTT_NS::string_view topic) {
193+
auto path = find_topic(topic);
194+
if (path.empty()) {
195+
return false;
196+
}
197+
198+
std::vector<path_entry_key> remove_keys;
199+
remove_keys.reserve(path.size());
200+
201+
for (auto entry : path) {
202+
--entry->second.count;
203+
if (entry->second.count == 0) {
204+
remove_keys.push_back(entry->first);
205+
}
206+
}
207+
208+
for(auto i : remove_keys) {
209+
map.erase(i);
210+
}
211+
212+
return true;
213+
}
214+
215+
public:
216+
retained_topic_map()
217+
: next_node_id(root_node_id)
218+
{
219+
// Create the root node
220+
root =
221+
map.emplace(
222+
path_entry_key(
223+
std::numeric_limits<node_id_t>::max(),
224+
MQTT_NS::allocate_buffer("")
225+
),
226+
path_entry(root_node_id)
227+
).first;
228+
++next_node_id;
229+
}
230+
231+
// Insert a value at the specified subscription path
232+
void insert_or_update(MQTT_NS::string_view topic, Value const& value) {
233+
auto path = find_topic(topic);
234+
if (path.empty()) {
235+
this->create_topic(topic)->second.value = value;
236+
}
237+
else {
238+
path.back()->second.value = value;
239+
}
240+
}
241+
242+
// Find all stored topics that math the specified subscription
243+
void find(MQTT_NS::string_view subscription, std::function< void (Value const&) > const& callback) const {
244+
find_match(subscription, callback);
245+
}
246+
247+
// Remove a stored value at the specified topic
248+
void remove(MQTT_NS::string_view topic) {
249+
remove_topic(topic);
250+
}
251+
};
252+
253+
#endif // MQTT_RETAINED_TOPIC_MAP_HPP

0 commit comments

Comments
 (0)