Skip to content

Commit

Permalink
Fix race condition when unpausing MCP at the end of a noop-for-pool m…
Browse files Browse the repository at this point in the history
…aintenance (#120)

The previous version was able to get into an invalid state: the upgrade job was updated twice ([1][1], [2][2]) at the end of an update and the k8s client cache does not guarantee read after write consistency [3][3]. This lead to the MCPs being paused again just after force unpausing them.

[1]: https://github.com/appuio/openshift-upgrade-controller/blob/0c1b407fd99a17ada4242a6454303911938377cd/controllers/upgradejob_controller.go#L939
[2]: https://github.com/appuio/openshift-upgrade-controller/blob/0c1b407fd99a17ada4242a6454303911938377cd/controllers/upgradejob_controller.go#L334
[3]: https://pkg.go.dev/sigs.k8s.io/controller-runtime#hdr-Clients_and_Caches
  • Loading branch information
bastjan authored Nov 15, 2024
1 parent 8ee8214 commit a28b8d4
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 27 deletions.
70 changes: 51 additions & 19 deletions controllers/upgradejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type UpgradeJobReconciler struct {
ManagedUpstreamClusterVersionName string
}

var ClusterVersionLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job"
var JobLockAnnotation = managedupgradev1beta1.GroupVersion.Group + "/upgrade-job"

const (
UpgradeJobHookJobTrackerFinalizer = "upgradejobs.managedupgrade.appuio.io/hook-job-tracker"
Expand Down Expand Up @@ -96,15 +96,21 @@ func (r *UpgradeJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Don't execute hooks created after the job was finished.
_, eserr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventSuccess, sc.Reason, sc.LastTransitionTime.Time)
_, eferr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, sc.Reason, sc.LastTransitionTime.Time)
return ctrl.Result{}, multierr.Combine(eserr, eferr, r.cleanupLock(ctx, &uj))
return ctrl.Result{}, multierr.Combine(
eserr,
eferr,
// Prevent pools that did not have any changes/updates from being paused indefinitely.
r.cleanupMachineConfigPools(ctx, uj),
r.cleanupLock(ctx, uj),
)
}
fc := apimeta.FindStatusCondition(uj.Status.Conditions, managedupgradev1beta1.UpgradeJobConditionFailed)
if fc != nil && fc.Status == metav1.ConditionTrue {
// Ignore hooks status, they can't influence the upgrade anymore.
// Don't execute hooks created after the job was finished.
_, efaerr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFailure, fc.Reason, fc.LastTransitionTime.Time)
_, efierr := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventFinish, fc.Reason, fc.LastTransitionTime.Time)
return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, &uj))
return ctrl.Result{}, multierr.Combine(efaerr, efierr, r.cleanupLock(ctx, uj))
}

cont, err := r.executeHooks(ctx, &uj, managedupgradev1beta1.EventCreate, "", time.Time{})
Expand Down Expand Up @@ -195,7 +201,7 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana
return ctrl.Result{}, fmt.Errorf("failed to lock cluster version: %w", err)
}

if err := r.pauseUnpauseMachineConfigPools(ctx, uj, false); err != nil {
if err := r.pauseUnpauseMachineConfigPools(ctx, uj); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to pause machine config pools: %w", err)
}

Expand Down Expand Up @@ -319,11 +325,6 @@ func (r *UpgradeJobReconciler) reconcileStartedJob(ctx context.Context, uj *mana
return ctrl.Result{}, nil
}

// Ensure pools that were paused but did not need an upgrade are unpaused
if err := r.pauseUnpauseMachineConfigPools(ctx, uj, true); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to ensure machine config pools are unpaused: %w", err)
}

// Set the upgrade as successful
r.setStatusCondition(&uj.Status.Conditions, metav1.Condition{
Type: managedupgradev1beta1.UpgradeJobConditionSucceeded,
Expand Down Expand Up @@ -373,7 +374,7 @@ func JobFromClusterVersionMapper(c client.Reader, cvName string) handler.MapFunc
// upgradeJobNameFromLockedClusterVersion returns the upgrade job name from the locked cluster version.
// If the cluster version is not locked, it returns false.
func upgradeJobNameFromLockedClusterVersion(cv configv1.ClusterVersion) (ok bool, nn types.NamespacedName) {
job := cv.GetAnnotations()[ClusterVersionLockAnnotation]
job := cv.GetAnnotations()[JobLockAnnotation]
if job == "" {
return false, types.NamespacedName{}
}
Expand Down Expand Up @@ -441,17 +442,17 @@ func (r *UpgradeJobReconciler) runHealthCheck(
return true, r.Status().Update(ctx, uj)
}

func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error {
func (r *UpgradeJobReconciler) cleanupLock(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error {
var version configv1.ClusterVersion
if err := r.Get(ctx, types.NamespacedName{
Name: r.ManagedUpstreamClusterVersionName,
}, &version); err != nil {
return fmt.Errorf("failed to get cluster version: %w", err)
}

lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation]
lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation]
if hasLockingJob && lockingJob == uj.Namespace+"/"+uj.Name {
delete(version.Annotations, ClusterVersionLockAnnotation)
delete(version.Annotations, JobLockAnnotation)
if err := r.Update(ctx, &version); err != nil {
return fmt.Errorf("failed to unlock cluster version: %w", err)
}
Expand All @@ -466,11 +467,11 @@ func (r *UpgradeJobReconciler) tryLockClusterVersion(ctx context.Context, versio
version.Annotations = map[string]string{}
}

lockingJob, hasLockingJob := version.Annotations[ClusterVersionLockAnnotation]
lockingJob, hasLockingJob := version.Annotations[JobLockAnnotation]
if hasLockingJob && lockingJob != lockVal {
return fmt.Errorf("cluster version is locked by %s", lockingJob)
} else if !hasLockingJob {
version.Annotations[ClusterVersionLockAnnotation] = lockVal
version.Annotations[JobLockAnnotation] = lockVal
// There is no race condition between the Get and Update calls because the server will reject the update with a Conflict error if the resource has been modified since the Get call.
if err := r.Client.Update(ctx, version); err != nil {
return fmt.Errorf("failed to lock cluster version: %w", err)
Expand Down Expand Up @@ -873,10 +874,9 @@ func findTrackedHookJob(ujhookName, event string, uj managedupgradev1beta1.Upgra

// pauseUnpauseMachineConfigPools pauses or unpauses the machine config pools that match the given selectors in .Spec.MachineConfigPools and have a delay set.
// The decision to pause or unpause is based on `pool.DelayUpgrade.DelayMin` relative to the startAfter time of the upgrade job.
// If ensureUnpause is true, it will unpause the pools even if the delay has not expired.
// It sets a timeout condition and returns an error if the delay is expired.
// It also returns an error if the machine config pools cannot be listed or updated.
func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob, ensureUnpause bool) error {
func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Context, uj *managedupgradev1beta1.UpgradeJob) error {
l := log.FromContext(ctx).WithName("UpgradeJobReconciler.pauseUnpauseMachineConfigPools")

var controllerManagesPools bool
Expand All @@ -887,8 +887,8 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex
}
timeSinceStart := r.timeSinceStartAfter(uj)
beforeMinDelay := timeSinceStart < pool.DelayUpgrade.DelayMin.Duration
shouldPause := !ensureUnpause && beforeMinDelay
l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "ensureUnpause", ensureUnpause, "timeSinceStart", timeSinceStart)
shouldPause := beforeMinDelay
l = l.WithValues("poolconfig_matchLabels", pool.MatchLabels, "shouldPause", shouldPause, "beforeMinDelay", beforeMinDelay, "timeSinceStart", timeSinceStart)

sel, err := metav1.LabelSelectorAsSelector(pool.MatchLabels)
if err != nil {
Expand Down Expand Up @@ -919,6 +919,11 @@ func (r *UpgradeJobReconciler) pauseUnpauseMachineConfigPools(ctx context.Contex
}
if mcp.Spec.Paused != shouldPause {
l.Info("Updating machine config pools pause field", "from", mcp.Spec.Paused, "to", shouldPause)
if mcp.Annotations == nil {
mcp.Annotations = map[string]string{}
}
// Mark the MCP as managed by the upgrade job for later cleanup
mcp.Annotations[JobLockAnnotation] = uj.Namespace + "/" + uj.Name
mcp.Spec.Paused = shouldPause
if err := r.Update(ctx, &mcp); err != nil {
return fmt.Errorf("failed to pause/unpause machine config pool %q: %w", mcp.Name, err)
Expand Down Expand Up @@ -1040,3 +1045,30 @@ func (r *UpgradeJobReconciler) checkAndMarkSkipped(ctx context.Context, uj manag
}
return false, nil
}

// cleanupMachineConfigPools removes the JobLockAnnotation from all machine config pools that have it set to the upgrade job and unpauses annotated pools if they are paused.
func (r *UpgradeJobReconciler) cleanupMachineConfigPools(ctx context.Context, uj managedupgradev1beta1.UpgradeJob) error {
l := log.FromContext(ctx).WithName("UpgradeJobReconciler.cleanupMachineConfigPools")

var mcpl machineconfigurationv1.MachineConfigPoolList
if err := r.List(ctx, &mcpl); err != nil {
return fmt.Errorf("failed to list machine config pools: %w", err)
}

errs := make([]error, 0, len(mcpl.Items))
for _, mcp := range mcpl.Items {
if mcp.Annotations[JobLockAnnotation] != uj.Namespace+"/"+uj.Name {
continue
}
delete(mcp.Annotations, JobLockAnnotation)
if mcp.Spec.Paused {
l.Info("unpausing machine config pool", "pool", mcp.Name)
mcp.Spec.Paused = false
}
if err := r.Update(ctx, &mcp); err != nil {
errs = append(errs, fmt.Errorf("failed to cleanup machine config pool %q: %w", mcp.Name, err))
}
}

return multierr.Combine(errs...)
}
16 changes: 8 additions & 8 deletions controllers/upgradejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) {
_, err := subject.Reconcile(ctx, requestForObject(upgradeJob))
require.NoError(t, err)
require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
lock, ok := ucv.Annotations[ClusterVersionLockAnnotation]
lock, ok := ucv.Annotations[JobLockAnnotation]
require.True(t, ok, "lock annotation must be set")
require.Equal(t, upgradeJob.Namespace+"/"+upgradeJob.Name, lock, "lock annotation must contain upgrade job reference")
})
Expand Down Expand Up @@ -295,7 +295,7 @@ func Test_UpgradeJobReconciler_Reconcile_E2E_Upgrade(t *testing.T) {
_, err = subject.Reconcile(ctx, requestForObject(upgradeJob))
require.NoError(t, err)
require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
_, err = subject.Reconcile(ctx, requestForObject(upgradeJob))
require.NoError(t, err, "should ignore requests if cluster version is not locked")
})
Expand Down Expand Up @@ -425,7 +425,7 @@ func Test_UpgradeJobReconciler_Reconcile_Skipped_Job(t *testing.T) {

require.NoError(t, c.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Spec.DesiredUpdate, "cluster version should not be updated")
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "cluster version should not be locked")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "cluster version should not be locked")
})

step(t, "`Success` and `Finish` hooks", func(t *testing.T) {
Expand Down Expand Up @@ -969,7 +969,7 @@ func Test_UpgradeJobReconciler_Reconcile_UpgradeWithdrawn(t *testing.T) {
require.NotNil(t, failedCond, "should set failed condition")
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonUpgradeWithdrawn, failedCond.Reason)
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
}

func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) {
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func Test_UpgradeJobReconciler_Reconcile_Timeout(t *testing.T) {
require.NotNil(t, failedCond, "should set failed condition")
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonTimedOut, failedCond.Reason)
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
}

func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) {
Expand Down Expand Up @@ -1088,7 +1088,7 @@ func Test_UpgradeJobReconciler_Reconcile_PreHealthCheckTimeout(t *testing.T) {
require.NotNil(t, failedCond, "should set failed condition")
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPreHealthCheckFailed, failedCond.Reason)
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
}

func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) {
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func Test_UpgradeJobReconciler_Reconcile_PostHealthCheckTimeout(t *testing.T) {
require.NotNil(t, failedCond, "should set failed condition")
require.Equal(t, managedupgradev1beta1.UpgradeJobReasonPostHealthCheckFailed, failedCond.Reason)
require.NoError(t, client.Get(ctx, requestForObject(ucv).NamespacedName, ucv))
require.Empty(t, ucv.Annotations[ClusterVersionLockAnnotation], "should clear lock annotation")
require.Empty(t, ucv.Annotations[JobLockAnnotation], "should clear lock annotation")
}

func Test_UpgradeJobReconciler_Reconcile_PausedMachineConfigPools(t *testing.T) {
Expand Down Expand Up @@ -1544,7 +1544,7 @@ func Test_JobFromClusterVersionHandler(t *testing.T) {
require.Len(t, subject(context.Background(), nil), 0, "should not return a reconcile request if clusterversion is not locked")

ucv.Annotations = map[string]string{
ClusterVersionLockAnnotation: "ns/upgrade-1234-4-5-13",
JobLockAnnotation: "ns/upgrade-1234-4-5-13",
}
require.NoError(t, client.Update(context.Background(), ucv))

Expand Down

0 comments on commit a28b8d4

Please sign in to comment.