diff --git a/controllers/upgradejob_controller.go b/controllers/upgradejob_controller.go index 90f7f94..6afd09e 100644 --- a/controllers/upgradejob_controller.go +++ b/controllers/upgradejob_controller.go @@ -47,7 +47,7 @@ type UpgradeJobReconciler struct { ManagedUpstreamClusterVersionName string } -var ClusterVersionLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job" +var JobLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job" const ( UpgradeJobHookJobTrackerFinalizer = "upgradejobs.managedupgrade.appuio.io/hook-job-tracker" @@ -96,7 +96,13 @@ func (r *UpgradeJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Don't execute hooks created after the job was finished. _, eserr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventSuccess, sc.Reason, sc.LastTransitionTime.Time) _, eferr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, sc.Reason, sc.LastTransitionTime.Time) - return ctrl.Result{}, multierr.Combine(eserr, eferr, r.cleanupLock(ctx, &uj)) + return ctrl.Result{}, multierr.Combine( + eserr, + eferr, + // Prevent pools that did not have any changes/updates from being paused indefinitely. + r.cleanupMachineConfigPools(ctx, uj), + r.cleanupLock(ctx, uj), + ) } fc := apimeta.FindStatusCondition(uj.Status.Conditions, managedupgradev1beta1.UpgradeJobConditionFailed) if fc != nil && fc.Status == metav1.ConditionTrue { @@ -104,7 +110,7 @@ func (r *UpgradeJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Don't execute hooks created after the job was finished. _, efaerr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFailure, fc.Reason, fc.LastTransitionTime.Time) _, efierr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, fc.Reason, fc.LastTransitionTime.Time) - return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, &uj)) + return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, uj)) } cont, err := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventCreate, "", time.Time{}) @@ -195,7 +201,7 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana return ctrl.Result{}, fmt.Errorf("failed to lock cluster version: %w", err) } - if err := r.pauseUnpauseMachineConfigPools(ctx, uj, false); err != nil { + if err := r.pauseUnpauseMachineConfigPools(ctx, uj); err != nil { return ctrl.Result{}, fmt.Errorf("failed to pause machine config pools: %w", err) } @@ -319,11 +325,6 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana return ctrl.Result{}, nil } - // Ensure pools that were paused but did not need an upgrade are unpaused - if err := r.pauseUnpauseMachineConfigPools(ctx, uj, true); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to ensure machine config pools are unpaused: %w", err) - } - // Set the upgrade as successful r.setStatusCondition(&uj.Status.Conditions, metav1.Condition{ Type: managedupgradev1beta1.UpgradeJobConditionSucceeded, @@ -373,7 +374,7 @@ func JobFromClusterVersionMapper(c client.Reader, cvName string) handler.MapFunc // upgradeJobNameFromLockedClusterVersion returns the upgrade job name from the locked cluster version. // If the cluster version is not locked, it returns false. func upgradeJobNameFromLockedClusterVersion(cv configv1.ClusterVersion) (ok bool, nn types.NamespacedName) { - job := cv.GetAnnotations()[ClusterVersionLockAnnotation] + job := cv.GetAnnotations()[JobLockAnnotation] if job == "" { return false, types.NamespacedName{} } @@ -441,7 +442,7 @@ func (r *UpgradeJobReconciler) runHealthCheck( return true, r.Status().Update(ctx, uj) } -func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error { +func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error { var version configv1.ClusterVersion if err := r.Get(ctx, types.NamespacedName{ Name: r.ManagedUpstreamClusterVersionName, @@ -449,9 +450,9 @@ func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj *managedupgra return fmt.Errorf("failed to get cluster version: %w", err) } - lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation] + lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation] if hasLockingJob && lockingJob == uj.Namespace+"/"+uj.Name { - delete(version.Annotations, ClusterVersionLockAnnotation) + delete(version.Annotations, JobLockAnnotation) if err := r.Update(ctx, &version); err != nil { return fmt.Errorf("failed to unlock cluster version: %w", err) } @@ -466,11 +467,11 @@ func (r *UpgradeJobReconciler) tryLockClusterVersion(ctx context.Context, versio version.Annotations = map[string]string{} } - lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation] + lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation] if hasLockingJob && lockingJob != lockVal { return fmt.Errorf("cluster version is locked by %s", lockingJob) } else if !hasLockingJob { - version.Annotations[ClusterVersionLockAnnotation] = lockVal + version.Annotations[JobLockAnnotation] = lockVal // There is no race condition between the Get and Update calls because the server will reject the update with a Conflict error if the resource has been modified since the Get call. if err := r.Client.Update(ctx, version); err != nil { return fmt.Errorf("failed to lock cluster version: %w", err) @@ -873,10 +874,9 @@ func findTrackedHookJob(ujhookName, event string, uj managedupgradev1beta1.Upgra // pauseUnpauseMachineConfigPools pauses or unpauses the machine config pools that match the given selectors in .Spec.MachineConfigPools and have a delay set. // The decision to pause or unpause is based on `pool.DelayUpgrade.DelayMin` relative to the startAfter time of the upgrade job. -// If ensureUnpause is true, it will unpause the pools even if the delay has not expired. // It sets a timeout condition and returns an error if the delay is expired. // It also returns an error if the machine config pools cannot be listed or updated. -func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, ensureUnpause bool) error { +func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error { l := log.FromContext(ctx).WithName("UpgradeJobReconciler.pauseUnpauseMachineConfigPools") var controllerManagesPools bool @@ -887,8 +887,8 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex } timeSinceStart := r.timeSinceStartAfter(uj) beforeMinDelay := timeSinceStart < pool.DelayUpgrade.DelayMin.Duration - shouldPause := !ensureUnpause && beforeMinDelay - l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "ensureUnpause", ensureUnpause, "timeSinceStart", timeSinceStart) + shouldPause := beforeMinDelay + l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "timeSinceStart", timeSinceStart) sel, err := metav1.LabelSelectorAsSelector(pool.MatchLabels) if err != nil { @@ -919,6 +919,11 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex } if mcp.Spec.Paused != shouldPause { l.Info("Updating machine config pools pause field", "from", mcp.Spec.Paused, "to", shouldPause) + if mcp.Annotations == nil { + mcp.Annotations = map[string]string{} + } + // Mark the MCP as managed by the upgrade job for later cleanup + mcp.Annotations[JobLockAnnotation] = uj.Namespace + "/" + uj.Name mcp.Spec.Paused = shouldPause if err := r.Update(ctx, &mcp); err != nil { return fmt.Errorf("failed to pause/unpause machine config pool %q: %w", mcp.Name, err) @@ -1040,3 +1045,30 @@ func (r *UpgradeJobReconciler) checkAndMarkSkipped(ctx context.Context, uj manag } return false, nil } + +// cleanupMachineConfigPools removes the JobLockAnnotation from all machine config pools that have it set to the upgrade job and unpauses annotated pools if they are paused. +func (r *UpgradeJobReconciler) cleanupMachineConfigPools(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error { + l := log.FromContext(ctx).WithName("UpgradeJobReconciler.cleanupMachineConfigPools") + + var mcpl machineconfigurationv1.MachineConfigPoolList + if err := r.List(ctx, &mcpl); err != nil { + return fmt.Errorf("failed to list machine config pools: %w", err) + } + + errs := make([]error, 0, len(mcpl.Items)) + for _, mcp := range mcpl.Items { + if mcp.Annotations[JobLockAnnotation] != uj.Namespace+"/"+uj.Name { + continue + } + delete(mcp.Annotations, JobLockAnnotation) + if mcp.Spec.Paused { + l.Info("unpausing machine config pool", "pool", mcp.Name) + mcp.Spec.Paused = false + } + if err := r.Update(ctx, &mcp); err != nil { + errs = append(errs, fmt.Errorf("failed to cleanup machine config pool %q: %w", mcp.Name, err)) + } + } + + return multierr.Combine(errs...) +} diff --git a/controllers/upgradejob_controller_test.go b/controllers/upgradejob_controller_test.go index 933be6a..0ee0650 100644 --- a/controllers/upgradejob_controller_test.go +++ b/controllers/upgradejob_controller_test.go @@ -168,7 +168,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) { _, err := subject.Reconcile(ctx, requestForObject(upgradeJob)) require.NoError(t, err) require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - lock, ok := ucv.Annotations[ClusterVersionLockAnnotation] + lock, ok := ucv.Annotations[JobLockAnnotation] require.True(t, ok, "lock annotation must be set") require.Equal(t, upgradeJob.Namespace+"/"+upgradeJob.Name, lock, "lock annotation must contain upgrade job reference") }) @@ -295,7 +295,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) { _, err = subject.Reconcile(ctx, requestForObject(upgradeJob)) require.NoError(t, err) require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation") _, err = subject.Reconcile(ctx, requestForObject(upgradeJob)) require.NoError(t, err, "should ignore requests if cluster version is not locked") }) @@ -425,7 +425,7 @@ func Test_UpgradeJobReconciler_Reconcile_Skipped_Job(t *testing.T) { require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) require.Empty(t, ucv.Spec.DesiredUpdate, "cluster version should not be updated") - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "cluster version should not be locked") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "cluster version should not be locked") }) step(t, "`Success` and `Finish` hooks", func(t *testing.T) { @@ -969,7 +969,7 @@ func Test_UpgradeJobReconciler_Reconcile_UpgradeWithdrawn(t *testing.T) { require.NotNil(t, failedCond, "should set failed condition") require.Equal(t, managedupgradev1beta1.UpgradeJobReasonUpgradeWithdrawn, failedCond.Reason) require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation") } func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) { @@ -1024,7 +1024,7 @@ func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) { require.NotNil(t, failedCond, "should set failed condition") require.Equal(t, managedupgradev1beta1.UpgradeJobReasonTimedOut, failedCond.Reason) require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation") } func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) { @@ -1088,7 +1088,7 @@ func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) { require.NotNil(t, failedCond, "should set failed condition") require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPreHealthCheckFailed, failedCond.Reason) require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation") } func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) { @@ -1161,7 +1161,7 @@ func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) { require.NotNil(t, failedCond, "should set failed condition") require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPostHealthCheckFailed, failedCond.Reason) require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv)) - require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation") + require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation") } func Test_UpgradeJobReconciler_Reconcile_PausedMachineConfigPools(t *testing.T) { @@ -1544,7 +1544,7 @@ func Test_JobFromClusterVersionHandler(t *testing.T) { require.Len(t, subject(context.Background(), nil), 0, "should not return a reconcile request if clusterversion is not locked") ucv.Annotations = map[string]string{ - ClusterVersionLockAnnotation: "ns/upgrade-1234-4-5-13", + JobLockAnnotation: "ns/upgrade-1234-4-5-13", } require.NoError(t, client.Update(context.Background(), ucv))