Skip to content

Commit 1cb99ea

Browse files
committed
Start work on a retained message tree
1 parent 30a8c99 commit 1cb99ea

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

mqtt-v5-broker/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use tokio_util::codec::Framed;
2222

2323
mod broker;
2424
mod client;
25+
mod retained;
2526
mod tree;
2627

2728
async fn client_handler(stream: TcpStream, broker_tx: Sender<BrokerMessage>) {

mqtt-v5-broker/src/retained.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
use mqtt_v5::topic::{Topic, TopicFilter, TopicLevel};
2+
use std::collections::{hash_map::Entry, HashMap};
3+
4+
#[derive(Debug)]
5+
pub struct RetainedMessageTreeNode<T> {
6+
retained_data: Option<T>,
7+
concrete_topic_levels: HashMap<String, RetainedMessageTreeNode<T>>,
8+
}
9+
10+
#[derive(Debug)]
11+
pub struct RetainedMessageTree<T> {
12+
root: RetainedMessageTreeNode<T>,
13+
}
14+
15+
impl<T: std::fmt::Debug> RetainedMessageTree<T> {
16+
pub fn new() -> Self {
17+
Self { root: RetainedMessageTreeNode::new() }
18+
}
19+
20+
pub fn insert(&mut self, topic: &Topic, retained_data: T) {
21+
self.root.insert(topic, retained_data);
22+
}
23+
24+
/// Get the retained messages which match a given topic filter.
25+
pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator<Item = &T> {
26+
self.root.retained_messages(topic_filter)
27+
}
28+
29+
pub fn remove(&mut self, topic: &Topic) -> Option<T> {
30+
self.root.remove(topic)
31+
}
32+
33+
#[allow(dead_code)]
34+
fn is_empty(&self) -> bool {
35+
self.root.is_empty()
36+
}
37+
}
38+
39+
impl<T: std::fmt::Debug> RetainedMessageTreeNode<T> {
40+
fn new() -> Self {
41+
Self { retained_data: None, concrete_topic_levels: HashMap::new() }
42+
}
43+
44+
fn is_empty(&self) -> bool {
45+
self.retained_data.is_none() && self.concrete_topic_levels.is_empty()
46+
}
47+
48+
fn insert(&mut self, topic: &Topic, retained_data: T) {
49+
let mut current_tree = self;
50+
51+
for level in topic.levels() {
52+
match level {
53+
TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => {
54+
unreachable!("Publish topics only contain concrete levels");
55+
},
56+
TopicLevel::Concrete(concrete_topic_level) => {
57+
if !current_tree.concrete_topic_levels.contains_key(concrete_topic_level) {
58+
current_tree.concrete_topic_levels.insert(
59+
concrete_topic_level.to_string(),
60+
RetainedMessageTreeNode::new(),
61+
);
62+
}
63+
64+
// TODO - Do this without another hash lookup
65+
current_tree =
66+
current_tree.concrete_topic_levels.get_mut(concrete_topic_level).unwrap();
67+
},
68+
}
69+
}
70+
71+
current_tree.retained_data = Some(retained_data);
72+
}
73+
74+
fn remove(&mut self, topic: &Topic) -> Option<T> {
75+
let mut current_tree = self;
76+
let mut stack: Vec<(*mut RetainedMessageTreeNode<T>, usize)> = vec![];
77+
78+
let levels: Vec<TopicLevel> = topic.levels().collect();
79+
let mut level_index = 0;
80+
81+
for level in &levels {
82+
match level {
83+
TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => {
84+
unreachable!("Publish topics only contain concrete levels");
85+
},
86+
TopicLevel::Concrete(concrete_topic_level) => {
87+
if current_tree.concrete_topic_levels.contains_key(*concrete_topic_level) {
88+
stack.push((&mut *current_tree, level_index));
89+
level_index += 1;
90+
91+
current_tree = current_tree
92+
.concrete_topic_levels
93+
.get_mut(*concrete_topic_level)
94+
.unwrap();
95+
} else {
96+
return None;
97+
}
98+
},
99+
}
100+
}
101+
102+
let return_val = current_tree.retained_data.take();
103+
104+
// Go up the stack, cleaning up empty nodes
105+
while let Some((stack_val, level_index)) = stack.pop() {
106+
let tree = unsafe { &mut *stack_val };
107+
108+
let level = &levels[level_index];
109+
110+
match level {
111+
TopicLevel::SingleLevelWildcard | TopicLevel::MultiLevelWildcard => {
112+
unreachable!("Publish topics only contain concrete levels");
113+
},
114+
TopicLevel::Concrete(concrete_topic_level) => {
115+
if let Entry::Occupied(o) =
116+
tree.concrete_topic_levels.entry((*concrete_topic_level).to_string())
117+
{
118+
if o.get().is_empty() {
119+
o.remove_entry();
120+
}
121+
}
122+
},
123+
}
124+
}
125+
126+
return_val
127+
}
128+
129+
pub fn retained_messages(&self, topic_filter: &TopicFilter) -> impl Iterator<Item = &T> {
130+
// let mut subscriptions = Vec::new();
131+
let mut tree_stack = vec![(self, 0)];
132+
let levels: Vec<TopicLevel> = topic_filter.levels().collect();
133+
134+
vec![].into_iter()
135+
}
136+
}
137+
138+
#[cfg(test)]
139+
mod tests {
140+
use crate::retained::RetainedMessageTree;
141+
142+
#[test]
143+
fn test_insert() {
144+
let mut sub_tree = RetainedMessageTree::new();
145+
sub_tree.insert(&"home/kitchen/temperature".parse().unwrap(), 1);
146+
sub_tree.insert(&"home/kitchen".parse().unwrap(), 7);
147+
148+
assert_eq!(sub_tree.remove(&"home/kitchen/temperature".parse().unwrap()), Some(1));
149+
assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), Some(7));
150+
assert_eq!(sub_tree.remove(&"home/kitchen".parse().unwrap()), None);
151+
}
152+
}

0 commit comments

Comments
 (0)