From 79fbb6075bdf8698eb5dc53a6e6fe097a0bdf5ca Mon Sep 17 00:00:00 2001 From: Zach Aller Date: Mon, 4 Dec 2023 09:32:53 -0600 Subject: [PATCH] fix: rollouts getting stuck due to bad rs informer updates (#3200) * fix: for rollouts getting stuck due to bad rs informer updates Signed-off-by: Zach Aller * fix error msg Signed-off-by: Zach Aller * change logic Signed-off-by: Zach Aller * error fmt Signed-off-by: Zach Aller * change if logic Signed-off-by: Zach Aller * add test Signed-off-by: Zach Aller * cleanup test Signed-off-by: Zach Aller * cleanup test Signed-off-by: Zach Aller * do not double call Signed-off-by: Zach Aller --------- Signed-off-by: Zach Aller --- rollout/replicaset.go | 20 +++++++++++----- rollout/replicaset_test.go | 47 ++++++++++++++++++++++++++++++++++---- rollout/sync.go | 18 ++++++++++++--- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/rollout/replicaset.go b/rollout/replicaset.go index 7d9a71f62a..401e1b579d 100644 --- a/rollout/replicaset.go +++ b/rollout/replicaset.go @@ -35,9 +35,13 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error { } patch := fmt.Sprintf(removeScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey) _, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{}) - if err == nil { - c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name) - c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + return fmt.Errorf("error removing scale-down-deadline annotation from RS '%s': %w", rs.Name, err) + } + c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name) + err = c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + return fmt.Errorf("error updating replicaset informer in removeScaleDownDelay: %w", err) } return err } @@ -60,9 +64,13 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay deadline := timeutil.MetaNow().Add(scaleDownDelaySeconds).UTC().Format(time.RFC3339) patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, deadline) rs, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{}) - if err == nil { - c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds) - c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + return fmt.Errorf("error adding scale-down-deadline annotation to RS '%s': %w", rs.Name, err) + } + c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds) + err = c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + return fmt.Errorf("error updating replicaset informer in addScaleDownDelay: %w", err) } return err } diff --git a/rollout/replicaset_test.go b/rollout/replicaset_test.go index 262bd23dd7..d93fa7c546 100644 --- a/rollout/replicaset_test.go +++ b/rollout/replicaset_test.go @@ -1,6 +1,7 @@ package rollout import ( + "fmt" "strconv" "testing" "time" @@ -134,8 +135,9 @@ func TestReconcileNewReplicaSet(t *testing.T) { abortScaleDownAnnotated bool abortScaleDownDelayPassed bool expectedNewReplicas int + failRSUpdate bool + abort bool }{ - { name: "New Replica Set matches rollout replica: No scale", rolloutReplicas: 10, @@ -163,6 +165,7 @@ func TestReconcileNewReplicaSet(t *testing.T) { newReplicas: 10, // ScaleDownOnAbort: true, abortScaleDownDelaySeconds: 5, + abort: true, abortScaleDownAnnotated: true, abortScaleDownDelayPassed: true, scaleExpected: true, @@ -174,6 +177,7 @@ func TestReconcileNewReplicaSet(t *testing.T) { newReplicas: 8, // ScaleDownOnAbort: true, abortScaleDownDelaySeconds: 5, + abort: true, abortScaleDownAnnotated: true, abortScaleDownDelayPassed: false, scaleExpected: false, @@ -184,10 +188,20 @@ func TestReconcileNewReplicaSet(t *testing.T) { rolloutReplicas: 10, newReplicas: 10, abortScaleDownDelaySeconds: 5, + abort: true, abortScaleDownAnnotated: false, scaleExpected: false, expectedNewReplicas: 0, }, + { + name: "Fail to update RS: No scale and add default annotation", + rolloutReplicas: 10, + newReplicas: 10, + scaleExpected: false, + failRSUpdate: true, + abort: true, + abortScaleDownDelaySeconds: -1, + }, } for i := range tests { test := tests[i] @@ -199,6 +213,12 @@ func TestReconcileNewReplicaSet(t *testing.T) { fake := fake.Clientset{} k8sfake := k8sfake.Clientset{} + if test.failRSUpdate { + k8sfake.PrependReactor("patch", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.ReplicaSet{}, fmt.Errorf("should not patch replica set") + }) + } + f := newFixture(t) defer f.Close() f.objects = append(f.objects, rollout) @@ -226,15 +246,22 @@ func TestReconcileNewReplicaSet(t *testing.T) { rollout: rollout, }, } - roCtx.enqueueRolloutAfter = func(obj interface{}, duration time.Duration) {} + roCtx.enqueueRolloutAfter = func(obj any, duration time.Duration) {} + + rollout.Status.Abort = test.abort + roCtx.stableRS.Status.AvailableReplicas = int32(test.rolloutReplicas) + rollout.Spec.Strategy = v1alpha1.RolloutStrategy{ + BlueGreen: &v1alpha1.BlueGreenStrategy{ + AbortScaleDownDelaySeconds: &test.abortScaleDownDelaySeconds, + }, + } + if test.abortScaleDownDelaySeconds > 0 { - rollout.Status.Abort = true rollout.Spec.Strategy = v1alpha1.RolloutStrategy{ BlueGreen: &v1alpha1.BlueGreenStrategy{ AbortScaleDownDelaySeconds: &test.abortScaleDownDelaySeconds, }, } - if test.abortScaleDownAnnotated { var deadline string if test.abortScaleDownDelayPassed { @@ -246,7 +273,19 @@ func TestReconcileNewReplicaSet(t *testing.T) { } } + if test.abortScaleDownDelaySeconds < 0 { + rollout.Spec.Strategy = v1alpha1.RolloutStrategy{ + BlueGreen: &v1alpha1.BlueGreenStrategy{ + AbortScaleDownDelaySeconds: nil, + }, + } + } + scaled, err := roCtx.reconcileNewReplicaSet() + if test.failRSUpdate { + assert.Error(t, err) + return + } if err != nil { t.Errorf("unexpected error: %v", err) return diff --git a/rollout/sync.go b/rollout/sync.go index 25c0c14813..2baeac1698 100644 --- a/rollout/sync.go +++ b/rollout/sync.go @@ -91,7 +91,10 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) { return nil, fmt.Errorf("error updating replicaset revision: %v", err) } c.log.Infof("Synced revision on ReplicaSet '%s' to '%s'", rs.Name, newRevision) - c.replicaSetInformer.GetIndexer().Update(rs) + err = c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + return nil, fmt.Errorf("error updating replicaset informer in syncReplicaSetRevision: %w", err) + } return rs, nil } @@ -372,12 +375,21 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, if fullScaleDown && !c.shouldDelayScaleDownOnAbort() { delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey) } + rs, err = c.kubeclientset.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) - if err == nil && sizeNeedsUpdate { + if err != nil { + return scaled, rs, fmt.Errorf("error updating replicaset %s: %w", rs.Name, err) + } + err = c.replicaSetInformer.GetIndexer().Update(rs) + if err != nil { + err = fmt.Errorf("error updating replicaset informer in scaleReplicaSet: %w", err) + return scaled, rs, err + } + + if sizeNeedsUpdate { scaled = true revision, _ := replicasetutil.Revision(rs) c.recorder.Eventf(rollout, record.EventOptions{EventReason: conditions.ScalingReplicaSetReason}, conditions.ScalingReplicaSetMessage, scalingOperation, rs.Name, revision, oldScale, newScale) - c.replicaSetInformer.GetIndexer().Update(rs) } } return scaled, rs, err