Skip to content

Commit 7c63f56

Browse files
authored
Merge pull request #1836 from doxxx93/fix/predicate-filter-uid-tracking
fix(predicate): track resource UID to handle recreated resources
2 parents 98bfbe3 + 8b1b7c7 commit 7c63f56

File tree

1 file changed

+82
-14
lines changed

1 file changed

+82
-14
lines changed

kube-runtime/src/utils/predicate.rs

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use crate::{reflector::ObjectRef, watcher::Error};
1+
use crate::watcher::Error;
22
use core::{
33
pin::Pin,
44
task::{ready, Context, Poll},
55
};
66
use futures::Stream;
7-
use kube_client::Resource;
7+
use kube_client::{api::ObjectMeta, Resource};
88
use pin_project::pin_project;
99
use std::{
1010
collections::{hash_map::DefaultHasher, HashMap},
1111
hash::{Hash, Hasher},
12+
marker::PhantomData,
1213
};
1314

1415
fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
@@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
1718
hasher.finish()
1819
}
1920

21+
/// Private cache key that includes UID in equality for predicate filtering
22+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23+
struct PredicateCacheKey {
24+
name: String,
25+
namespace: Option<String>,
26+
uid: Option<String>,
27+
}
28+
29+
impl From<&ObjectMeta> for PredicateCacheKey {
30+
fn from(meta: &ObjectMeta) -> Self {
31+
Self {
32+
name: meta.name.clone().unwrap_or_default(),
33+
namespace: meta.namespace.clone(),
34+
uid: meta.uid.clone(),
35+
}
36+
}
37+
}
38+
2039
/// A predicate is a hasher of Kubernetes objects stream filtering
2140
pub trait Predicate<K> {
2241
/// A predicate only needs to implement optional hashing when keys exist
@@ -103,7 +122,9 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
103122
#[pin]
104123
stream: St,
105124
predicate: P,
106-
cache: HashMap<ObjectRef<K>, u64>,
125+
cache: HashMap<PredicateCacheKey, u64>,
126+
// K: Resource necessary to get .meta() of an object during polling
127+
_phantom: PhantomData<K>,
107128
}
108129
impl<St, K, P> PredicateFilter<St, K, P>
109130
where
@@ -116,6 +137,7 @@ where
116137
stream,
117138
predicate,
118139
cache: HashMap::new(),
140+
_phantom: PhantomData,
119141
}
120142
}
121143
}
@@ -134,17 +156,9 @@ where
134156
break match ready!(me.stream.as_mut().poll_next(cx)) {
135157
Some(Ok(obj)) => {
136158
if let Some(val) = me.predicate.hash_property(&obj) {
137-
let key = ObjectRef::from_obj(&obj);
138-
let changed = if let Some(old) = me.cache.get(&key) {
139-
*old != val
140-
} else {
141-
true
142-
};
143-
if let Some(old) = me.cache.get_mut(&key) {
144-
*old = val;
145-
} else {
146-
me.cache.insert(key, val);
147-
}
159+
let key = PredicateCacheKey::from(obj.meta());
160+
let changed = me.cache.get(&key) != Some(&val);
161+
me.cache.insert(key, val);
148162
if changed {
149163
Some(Ok(obj))
150164
} else {
@@ -251,4 +265,58 @@ pub(crate) mod tests {
251265
assert_eq!(second.meta().generation, Some(2));
252266
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
253267
}
268+
269+
#[tokio::test]
270+
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
271+
use k8s_openapi::api::core::v1::Pod;
272+
273+
let mkobj = |g: i32, uid: &str| {
274+
let p: Pod = serde_json::from_value(json!({
275+
"apiVersion": "v1",
276+
"kind": "Pod",
277+
"metadata": {
278+
"name": "blog",
279+
"namespace": "default",
280+
"generation": Some(g),
281+
"uid": uid,
282+
},
283+
"spec": {
284+
"containers": [{
285+
"name": "blog",
286+
"image": "clux/blog:0.1.0"
287+
}],
288+
}
289+
}))
290+
.unwrap();
291+
p
292+
};
293+
294+
// Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete ->
295+
// create (gen=1, uid=2) -> delete -> create (gen=2, uid=3)
296+
let data = stream::iter([
297+
Ok(mkobj(1, "uid-1")),
298+
Ok(mkobj(1, "uid-1")),
299+
Ok(mkobj(1, "uid-2")),
300+
Ok(mkobj(2, "uid-3")),
301+
]);
302+
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));
303+
304+
// mkobj(1, uid-1) passed through
305+
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
306+
assert_eq!(first.meta().generation, Some(1));
307+
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));
308+
309+
// (no repeat mkobj(1, uid-1) - same UID and generation)
310+
// mkobj(1, uid-2) next - different UID detected
311+
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
312+
assert_eq!(second.meta().generation, Some(1));
313+
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));
314+
315+
// mkobj(2, uid-3) next
316+
let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
317+
assert_eq!(third.meta().generation, Some(2));
318+
assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));
319+
320+
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
321+
}
254322
}

0 commit comments

Comments
 (0)