Skip to content

Commit b28c6a9

Browse files
Implement default handling as trait extension
Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
1 parent 6decc81 commit b28c6a9

File tree

2 files changed

+49
-42
lines changed

2 files changed

+49
-42
lines changed

src/controller.rs

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -110,28 +110,36 @@ impl State {
110110
}
111111
}
112112

113-
fn default_handling<K: Resource<DynamicType = ()> + 'static>(
114-
stream: impl Send + WatchStreamExt<Item = Result<watcher::Event<K>, watcher::Error>>,
115-
) -> impl Send + WatchStreamExt<Item = Result<K, watcher::Error>> {
116-
stream
117-
.modify(|g| g.managed_fields_mut().clear())
118-
.touched_objects()
119-
.predicate_filter(predicates::resource_version)
120-
.default_backoff()
121-
}
113+
trait ControllerDefault: WatchStreamExt {
114+
fn default_handling<K>(self) -> impl WatchStreamExt<Item = Result<K, watcher::Error>>
115+
where
116+
K: Resource<DynamicType = ()> + 'static,
117+
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
118+
{
119+
self.modify(|g| g.managed_fields_mut().clear())
120+
.touched_objects()
121+
.predicate_filter(predicates::resource_version)
122+
.default_backoff()
123+
}
122124

123-
fn default_with_reflect<K: Resource<DynamicType = ()> + Clone + 'static>(
124-
writer: Writer<K>,
125-
stream: impl Send + WatchStreamExt<Item = Result<watcher::Event<K>, watcher::Error>>,
126-
) -> impl WatchStreamExt<Item = Result<K, watcher::Error>> {
127-
stream
128-
.modify(|g| g.managed_fields_mut().clear())
129-
.reflect(writer)
130-
.touched_objects()
131-
.predicate_filter(predicates::resource_version)
132-
.default_backoff()
125+
fn default_with_reflect<K>(
126+
self,
127+
writer: Writer<K>,
128+
) -> impl WatchStreamExt<Item = Result<K, watcher::Error>>
129+
where
130+
K: Resource<DynamicType = ()> + Clone + 'static,
131+
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
132+
{
133+
self.modify(|g| g.managed_fields_mut().clear())
134+
.reflect(writer)
135+
.touched_objects()
136+
.predicate_filter(predicates::resource_version)
137+
.default_backoff()
138+
}
133139
}
134140

141+
impl<St: ?Sized> ControllerDefault for St where St: Stream {}
142+
135143
pub async fn run_fleet_addon_config_controller(state: State) {
136144
let client = Client::try_default()
137145
.await
@@ -192,13 +200,11 @@ pub async fn run_fleet_helm_controller(state: State) {
192200
.await
193201
.expect("failed to create kube Client");
194202
let (reader, writer) = reflector::store();
195-
let fleet_addon_config = default_with_reflect(
196-
writer,
197-
watcher(
198-
Api::<FleetAddonConfig>::all(client.clone()),
199-
Config::default().any_semantic(),
200-
),
203+
let fleet_addon_config = watcher(
204+
Api::<FleetAddonConfig>::all(client.clone()),
205+
Config::default().any_semantic(),
201206
)
207+
.default_with_reflect(writer)
202208
.predicate_filter(predicates::generation);
203209

204210
let fleet_addon_config_controller = Controller::for_stream(fleet_addon_config, reader)
@@ -288,22 +294,25 @@ pub async fn run_cluster_controller(state: State) {
288294
.default_backoff()
289295
.for_each(|_| futures::future::ready(()));
290296

291-
let fleet = default_handling(metadata_watcher(
297+
let fleet = metadata_watcher(
292298
Api::<fleet_cluster::Cluster>::all(client.clone()),
293299
Config::default().any_semantic(),
294-
));
300+
)
301+
.default_handling();
295302

296-
let groups = default_handling(metadata_watcher(
303+
let groups = metadata_watcher(
297304
Api::<ClusterGroup>::all(client.clone()),
298305
Config::default()
299306
.labels_from(&ClusterGroup::group_selector())
300307
.any_semantic(),
301-
));
308+
)
309+
.default_handling();
302310

303-
let mappings = default_handling(metadata_watcher(
311+
let mappings = metadata_watcher(
304312
Api::<BundleNamespaceMapping>::all(client.clone()),
305313
Config::default().any_semantic(),
306-
));
314+
)
315+
.default_handling();
307316

308317
let (sub, reader) = state.dispatcher.subscribe();
309318
let clusters = Controller::for_shared_stream(sub, reader.clone())
@@ -353,20 +362,19 @@ pub async fn run_cluster_class_controller(state: State) {
353362
.for_each(|_| futures::future::ready(()));
354363

355364
let (reader, writer) = reflector::store();
356-
let cluster_classes = default_with_reflect(
357-
writer,
358-
watcher(
359-
Api::<ClusterClass>::all(client.clone()),
360-
Config::default().any_semantic(),
361-
),
362-
);
363-
364-
let groups = default_handling(metadata_watcher(
365+
let cluster_classes = watcher(
366+
Api::<ClusterClass>::all(client.clone()),
367+
Config::default().any_semantic(),
368+
)
369+
.default_with_reflect(writer);
370+
371+
let groups = metadata_watcher(
365372
Api::<ClusterGroup>::all(client.clone()),
366373
Config::default()
367374
.labels_from(&ClusterGroup::group_selector())
368375
.any_semantic(),
369-
));
376+
)
377+
.default_handling();
370378

371379
let cluster_class_controller = Controller::for_stream(cluster_classes, reader)
372380
.owns_stream(groups)

src/controllers/cluster.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use k8s_openapi::api::core::v1::Namespace;
1313
use kube::api::{ApiResource, ListParams, Object, PatchParams};
1414

1515
use kube::client::scope;
16-
use kube::core::SelectorExt as _;
1716
use kube::runtime::watcher::{self, Config};
1817
use kube::{api::ResourceExt, runtime::controller::Action, Resource};
1918
use kube::{Api, Client};

0 commit comments

Comments
 (0)