@@ -12,6 +12,7 @@ import (
1212	"github.com/spf13/pflag" 
1313	appsv1 "k8s.io/api/apps/v1" 
1414	corev1 "k8s.io/api/core/v1" 
15+ 	apierrors "k8s.io/apimachinery/pkg/api/errors" 
1516	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
1617	clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" 
1718	"sigs.k8s.io/cluster-api/controllers/remote" 
@@ -228,6 +229,7 @@ func (c *CiliumCNI) apply(
228229			c .client ,
229230			helmChart ,
230231		).
232+ 			WithValueTemplater (templateValues ).
231233			WithDefaultWaiter ()
232234	case  "" :
233235		resp .SetStatus (runtimehooksv1 .ResponseStatusFailure )
@@ -259,65 +261,145 @@ func runApply(
259261		return  err 
260262	}
261263
264+ 	// It is possible to disable kube-proxy and migrate to Cilium's kube-proxy replacement feature in a running cluster. 
265+ 	// In this case, we need to wait for Cilium to be restated with new configuration and then cleanup kube-proxy. 
266+ 
262267	// If skip kube-proxy is not set, return early. 
263- 	// Otherwise, wait for Cilium to be rolled out and then cleanup kube-proxy if installed. 
264268	if  ! capiutils .ShouldSkipKubeProxy (cluster ) {
265269		return  nil 
266270	}
267271
272+ 	remoteClient , err  :=  remote .NewClusterClient (
273+ 		ctx ,
274+ 		"" ,
275+ 		client ,
276+ 		ctrlclient .ObjectKeyFromObject (cluster ),
277+ 	)
278+ 	if  err  !=  nil  {
279+ 		return  fmt .Errorf ("error creating remote cluster client: %w" , err )
280+ 	}
281+ 
282+ 	// If kube-proxy is not installed, 
283+ 	// assume that the one-time migration of kube-proxy is complete and return early. 
284+ 	isKubeProxyInstalled , err  :=  isKubeProxyInstalled (ctx , remoteClient )
285+ 	if  err  !=  nil  {
286+ 		return  fmt .Errorf ("failed to check if kube-proxy is installed: %w" , err )
287+ 	}
288+ 	if  ! isKubeProxyInstalled  {
289+ 		return  nil 
290+ 	}
291+ 
292+ 	log .Info (
293+ 		fmt .Sprintf (
294+ 			"Waiting for Cilium ConfigMap to be updated with new configuration for cluster %s" ,
295+ 			ctrlclient .ObjectKeyFromObject (cluster ),
296+ 		),
297+ 	)
298+ 	if  err  :=  waitForCiliumConfigMapToBeUpdatedWithKubeProxyReplacement (ctx , remoteClient ); err  !=  nil  {
299+ 		return  fmt .Errorf ("failed to wait for Cilium ConfigMap to be updated: %w" , err )
300+ 	}
301+ 
268302	log .Info (
269- 		fmt .Sprintf ("Waiting for Cilium to be ready for cluster %s" , ctrlclient .ObjectKeyFromObject (cluster )),
303+ 		fmt .Sprintf (
304+ 			"Trigger a rollout of Cilium DaemonSet Pods for cluster %s" ,
305+ 			ctrlclient .ObjectKeyFromObject (cluster ),
306+ 		),
270307	)
271- 	if  err  :=  waitForCiliumToBeReady (ctx , client ,  cluster ); err  !=  nil  {
272- 		return  fmt .Errorf ("failed to wait for Cilium to be ready : %w" , err )
308+ 	if  err  :=  forceCiliumRollout (ctx , remoteClient ); err  !=  nil  {
309+ 		return  fmt .Errorf ("failed to force trigger a rollout of Cilium DaemonSet Pods : %w" , err )
273310	}
274311
275312	log .Info (
276313		fmt .Sprintf ("Cleaning up kube-proxy for cluster %s" , ctrlclient .ObjectKeyFromObject (cluster )),
277314	)
278- 	if  err  :=  cleanupKubeProxy (ctx , client ,  cluster ); err  !=  nil  {
315+ 	if  err  :=  cleanupKubeProxy (ctx , remoteClient ); err  !=  nil  {
279316		return  fmt .Errorf ("failed to cleanup kube-proxy: %w" , err )
280317	}
281318
282319	return  nil 
283320}
284321
285322const  (
323+ 	kubeProxyReplacementConfigKey  =  "kube-proxy-replacement" 
324+ 	ciliumConfigMapName            =  "cilium-config" 
325+ 
326+ 	restartedAtAnnotation  =  "caren.nutanix.com/restartedAt" 
327+ 
286328	kubeProxyName       =  "kube-proxy" 
287329	kubeProxyNamespace  =  "kube-system" 
288330)
289331
290- func  waitForCiliumToBeReady (
291- 	ctx  context.Context ,
292- 	c  ctrlclient.Client ,
293- 	cluster  * clusterv1.Cluster ,
294- ) error  {
295- 	remoteClient , err  :=  remote .NewClusterClient (
332+ // Use vars to override in integration tests. 
333+ var  (
334+ 	waitInterval  =  1  *  time .Second 
335+ 	waitTimeout   =  30  *  time .Second 
336+ )
337+ 
338+ func  waitForCiliumConfigMapToBeUpdatedWithKubeProxyReplacement (ctx  context.Context , c  ctrlclient.Client ) error  {
339+ 	cm  :=  & corev1.ConfigMap {
340+ 		ObjectMeta : metav1.ObjectMeta {
341+ 			Name :      ciliumConfigMapName ,
342+ 			Namespace : defaultCiliumNamespace ,
343+ 		},
344+ 	}
345+ 	if  err  :=  wait .ForObject (
296346		ctx ,
297- 		"" ,
298- 		c ,
299- 		ctrlclient .ObjectKeyFromObject (cluster ),
300- 	)
301- 	if  err  !=  nil  {
302- 		return  fmt .Errorf ("error creating remote cluster client: %w" , err )
347+ 		wait.ForObjectInput [* corev1.ConfigMap ]{
348+ 			Reader : c ,
349+ 			Target : cm .DeepCopy (),
350+ 			Check : func (_  context.Context , obj  * corev1.ConfigMap ) (bool , error ) {
351+ 				return  obj .Data [kubeProxyReplacementConfigKey ] ==  "true" , nil 
352+ 			},
353+ 			Interval : waitInterval ,
354+ 			Timeout :  waitTimeout ,
355+ 		},
356+ 	); err  !=  nil  {
357+ 		return  fmt .Errorf ("failed to wait for ConfigMap %s to be updated: %w" , ctrlclient .ObjectKeyFromObject (cm ), err )
303358	}
359+ 	return  nil 
360+ }
304361
362+ func  forceCiliumRollout (ctx  context.Context , c  ctrlclient.Client ) error  {
305363	ds  :=  & appsv1.DaemonSet {
306364		ObjectMeta : metav1.ObjectMeta {
307365			Name :      defaultCiliumReleaseName ,
308366			Namespace : defaultCiliumNamespace ,
309367		},
310368	}
369+ 	if  err  :=  c .Get (ctx , ctrlclient .ObjectKeyFromObject (ds ), ds ); err  !=  nil  {
370+ 		return  fmt .Errorf ("failed to get cilium daemon set: %w" , err )
371+ 	}
372+ 
373+ 	// Update the DaemonSet to force a rollout. 
374+ 	annotations  :=  ds .Spec .Template .Annotations 
375+ 	if  annotations  ==  nil  {
376+ 		annotations  =  make (map [string ]string )
377+ 	}
378+ 	if  _ , ok  :=  annotations [restartedAtAnnotation ]; ! ok  {
379+ 		// Only set the annotation once to avoid a race conditition where rollouts are triggered repeatedly. 
380+ 		annotations [restartedAtAnnotation ] =  time .Now ().UTC ().Format (time .RFC3339Nano )
381+ 	}
382+ 	ds .Spec .Template .Annotations  =  annotations 
383+ 	if  err  :=  c .Update (ctx , ds ); err  !=  nil  {
384+ 		return  fmt .Errorf ("failed to update cilium daemon set: %w" , err )
385+ 	}
386+ 
311387	if  err  :=  wait .ForObject (
312388		ctx ,
313389		wait.ForObjectInput [* appsv1.DaemonSet ]{
314- 			Reader : remoteClient ,
390+ 			Reader : c ,
315391			Target : ds .DeepCopy (),
316392			Check : func (_  context.Context , obj  * appsv1.DaemonSet ) (bool , error ) {
317- 				return  obj .Status .NumberAvailable  ==  obj .Status .DesiredNumberScheduled  &&  obj .Status .NumberUnavailable  ==  0 , nil 
393+ 				if  obj .Generation  !=  obj .Status .ObservedGeneration  {
394+ 					return  false , nil 
395+ 				}
396+ 				isUpdated  :=  obj .Status .NumberAvailable  ==  obj .Status .DesiredNumberScheduled  && 
397+ 					// We're forcing a rollout so we expect the UpdatedNumberScheduled to be always set. 
398+ 					obj .Status .UpdatedNumberScheduled  ==  obj .Status .DesiredNumberScheduled 
399+ 				return  isUpdated , nil 
318400			},
319- 			Interval : 1   *   time . Second ,
320- 			Timeout :  30   *   time . Second ,
401+ 			Interval : waitInterval ,
402+ 			Timeout :  waitTimeout ,
321403		},
322404	); err  !=  nil  {
323405		return  fmt .Errorf (
@@ -331,17 +413,7 @@ func waitForCiliumToBeReady(
331413}
332414
333415// cleanupKubeProxy cleans up kube-proxy DaemonSet and ConfigMap on the remote cluster when kube-proxy is disabled. 
334- func  cleanupKubeProxy (ctx  context.Context , c  ctrlclient.Client , cluster  * clusterv1.Cluster ) error  {
335- 	remoteClient , err  :=  remote .NewClusterClient (
336- 		ctx ,
337- 		"" ,
338- 		c ,
339- 		ctrlclient .ObjectKeyFromObject (cluster ),
340- 	)
341- 	if  err  !=  nil  {
342- 		return  fmt .Errorf ("error creating remote cluster client: %w" , err )
343- 	}
344- 
416+ func  cleanupKubeProxy (ctx  context.Context , c  ctrlclient.Client ) error  {
345417	objs  :=  []ctrlclient.Object {
346418		& appsv1.DaemonSet {
347419			ObjectMeta : metav1.ObjectMeta {
@@ -357,10 +429,27 @@ func cleanupKubeProxy(ctx context.Context, c ctrlclient.Client, cluster *cluster
357429		},
358430	}
359431	for  _ , obj  :=  range  objs  {
360- 		if  err  :=  ctrlclient .IgnoreNotFound (remoteClient .Delete (ctx , obj )); err  !=  nil  {
432+ 		if  err  :=  ctrlclient .IgnoreNotFound (c .Delete (ctx , obj )); err  !=  nil  {
361433			return  fmt .Errorf ("failed to delete %s/%s: %w" , obj .GetNamespace (), obj .GetName (), err )
362434		}
363435	}
364436
365437	return  nil 
366438}
439+ 
440+ func  isKubeProxyInstalled (ctx  context.Context , c  ctrlclient.Client ) (bool , error ) {
441+ 	ds  :=  & appsv1.DaemonSet {
442+ 		ObjectMeta : metav1.ObjectMeta {
443+ 			Name :      kubeProxyName ,
444+ 			Namespace : kubeProxyNamespace ,
445+ 		},
446+ 	}
447+ 	err  :=  c .Get (ctx , ctrlclient .ObjectKeyFromObject (ds ), ds )
448+ 	if  err  !=  nil  {
449+ 		if  apierrors .IsNotFound (err ) {
450+ 			return  false , nil 
451+ 		}
452+ 		return  false , err 
453+ 	}
454+ 	return  true , nil 
455+ }
0 commit comments