@@ -36,7 +36,7 @@ use std::collections::BTreeMap;
3636use std:: ops:: Deref ;
3737use std:: sync:: Arc ;
3838use tokio:: { sync:: RwLock , time:: Duration } ;
39- use tracing:: { self , warn} ;
39+ use tracing:: { self , info , warn} ;
4040
4141/// State shared between the controller and the web server
4242#[ derive( Clone ) ]
@@ -349,8 +349,23 @@ pub async fn run_cluster_controller(state: State) {
349349 . default_backoff ( )
350350 . for_each ( |_| futures:: future:: ready ( ( ) ) ) ;
351351
352- let cluster_ns_controller = Controller :: new (
353- Api :: < Cluster > :: all ( client. clone ( ) ) , Default :: default ( ) )
352+ let capi_clusters = metadata_watcher (
353+ Api :: < Cluster > :: all ( client. clone ( ) ) ,
354+ Config :: default ( ) . any_semantic ( ) ,
355+ )
356+ . default_handling ( ) ;
357+
358+ let ( sub, reader) = state. dispatcher . subscribe ( ) ;
359+ let cluster_ns_controller = Controller :: for_shared_stream ( sub, reader. clone ( ) )
360+ . watches_stream ( capi_clusters, move |_cluster| {
361+ reader
362+ . state ( )
363+ . into_iter ( )
364+ . filter_map ( move |c : Arc < Cluster > | {
365+ info ! ( "Reconcile CAPI cluster: {}" , c. name_any( ) ) ;
366+ Some ( ObjectRef :: from_obj ( & * c) )
367+ } )
368+ } )
354369 . shutdown_on_signal ( )
355370 . run (
356371 Cluster :: reconcile_fleet_annotation_in_capi_ns,
0 commit comments