Skip to content

Commit

Permalink
Add conditions and duration to experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
dthomson25 committed Aug 23, 2019
1 parent 4205f99 commit 98f1bdb
Show file tree
Hide file tree
Showing 14 changed files with 1,087 additions and 123 deletions.
54 changes: 54 additions & 0 deletions experiments/conditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package experiments

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
)

func (ec *ExperimentController) calculateExperimentConditions(experiment *v1alpha1.Experiment, newStatus v1alpha1.ExperimentStatus, templateRSs map[string]*appsv1.ReplicaSet) v1alpha1.ExperimentStatus {
switch {
case conditions.ExperimentCompleted(newStatus):
msg := fmt.Sprintf(conditions.ExperimentCompletedMessage, experiment.Name)
condition := conditions.NewExperimentConditions(v1alpha1.ExperimentProgressing, corev1.ConditionFalse, conditions.ExperimentCompleteReason, msg)
conditions.SetExperimentCondition(&newStatus, *condition)
case conditions.ExperimentProgressing(experiment, newStatus):
currentCond := conditions.GetExperimentCondition(experiment.Status, v1alpha1.ExperimentProgressing)
// If there is any progress made, continue by not checking if the experiment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf(conditions.ExperimentProgressingMessage, experiment.Name)
condition := conditions.NewExperimentConditions(v1alpha1.ExperimentProgressing, corev1.ConditionTrue, conditions.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetExperimentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == corev1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
conditions.RemoveExperimentCondition(&newStatus, v1alpha1.ExperimentProgressing)
}
conditions.SetExperimentCondition(&newStatus, *condition)
case conditions.ExperimentRunning(experiment):
// Update the experiment conditions with a message for the new replica sets that
// was successfully deployed and is running for the timed duration from the
// `spec.duration` field. If the condition already exists, we ignore this update.
msg := fmt.Sprintf(conditions.ExperimentRunningMessage, experiment.Name)
condition := conditions.NewExperimentConditions(v1alpha1.ExperimentProgressing, corev1.ConditionTrue, conditions.NewRSAvailableReason, msg)
conditions.SetExperimentCondition(&newStatus, *condition)
case conditions.ExperimentTimeOut(experiment, newStatus):
// Update the experiments with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf(conditions.ExperimentTimeOutMessage, experiment.Name)
condition := conditions.NewExperimentConditions(v1alpha1.ExperimentProgressing, corev1.ConditionFalse, conditions.TimedOutReason, msg)
conditions.SetExperimentCondition(&newStatus, *condition)
}
return newStatus
}
82 changes: 82 additions & 0 deletions experiments/conditions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package experiments

import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
)

func TestUpdateProgressingLastUpdateTime(t *testing.T) {
f := newFixture(t)
defer f.Close()

templates := generateTemplates("bar")
templates[0].Replicas = pointer.Int32Ptr(2)
e := newExperiment("foo", templates, 0, pointer.BoolPtr(true))
e.Status.TemplateStatuses = []v1alpha1.TemplateStatus{{
Name: "bar",
}}
prevCond := newCondition(conditions.ReplicaSetUpdatedReason, e)
prevTime := metav1.NewTime(metav1.Now().Add(-10 * time.Second))
prevCond.LastUpdateTime = prevTime
prevCond.LastTransitionTime = prevTime
e.Status.Conditions = []v1alpha1.ExperimentCondition{
*prevCond,
}

f.experimentLister = append(f.experimentLister, e)
f.objects = append(f.objects, e)
rs := templateToRS(e, templates[0], 1)
f.replicaSetLister = append(f.replicaSetLister, rs)
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchExperimentAction(e)

f.run(getKey(e, t))

patch := f.getPatchedExperiment(patchIndex)
cond := []v1alpha1.ExperimentCondition{*newCondition(conditions.ReplicaSetUpdatedReason, e)}
cond[0].LastTransitionTime = prevTime.Rfc3339Copy()
templateStatuses := []v1alpha1.TemplateStatus{
generateTemplatesStatus("bar", 1, 1),
}
validatePatch(t, patch, nil, NoChange, templateStatuses, cond)
}

func TestEnterTimeoutDegradedState(t *testing.T) {
f := newFixture(t)
defer f.Close()

templates := generateTemplates("bar")
e := newExperiment("foo", templates, 0, pointer.BoolPtr(true))
e.Status.TemplateStatuses = []v1alpha1.TemplateStatus{{
Name: "bar",
}}
e.Spec.ProgressDeadlineSeconds = pointer.Int32Ptr(30)
prevCond := newCondition(conditions.ReplicaSetUpdatedReason, e)
prevTime := metav1.NewTime(metav1.Now().Add(-1 * time.Minute))
prevCond.LastUpdateTime = prevTime
prevCond.LastTransitionTime = prevTime
e.Status.Conditions = []v1alpha1.ExperimentCondition{
*prevCond,
}

f.experimentLister = append(f.experimentLister, e)
f.objects = append(f.objects, e)
rs := templateToRS(e, templates[0], 0)
f.replicaSetLister = append(f.replicaSetLister, rs)
f.kubeobjects = append(f.kubeobjects, rs)

patchIndex := f.expectPatchExperimentAction(e)

f.run(getKey(e, t))

patch := f.getPatchedExperiment(patchIndex)
cond := []v1alpha1.ExperimentCondition{*newCondition(conditions.TimedOutReason, e)}
validatePatch(t, patch, nil, NoChange, nil, cond)
}
98 changes: 95 additions & 3 deletions experiments/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
"github.com/argoproj/argo-rollouts/utils/conditions"
)

var (
Expand Down Expand Up @@ -138,6 +139,51 @@ func newExperiment(name string, templates []v1alpha1.TemplateSpec, duration int3
return ex
}

func newCondition(reason string, experiment *v1alpha1.Experiment) *v1alpha1.ExperimentCondition {
if reason == conditions.ReplicaSetUpdatedReason {
return &v1alpha1.ExperimentCondition{
Type: v1alpha1.ExperimentProgressing,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now().Rfc3339Copy(),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
Reason: reason,
Message: fmt.Sprintf(conditions.ExperimentProgressingMessage, experiment.Name),
}
}
if reason == conditions.ExperimentCompleteReason {
return &v1alpha1.ExperimentCondition{
Type: v1alpha1.ExperimentProgressing,
Status: corev1.ConditionFalse,
LastUpdateTime: metav1.Now().Rfc3339Copy(),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
Reason: reason,
Message: fmt.Sprintf(conditions.ExperimentCompletedMessage, experiment.Name),
}
}
if reason == conditions.ReplicaSetUpdatedReason {
return &v1alpha1.ExperimentCondition{
Type: v1alpha1.ExperimentProgressing,
Status: corev1.ConditionFalse,
LastUpdateTime: metav1.Now().Rfc3339Copy(),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
Reason: reason,
Message: fmt.Sprintf(conditions.ExperimentRunningMessage, experiment.Name),
}
}
if reason == conditions.TimedOutReason {
return &v1alpha1.ExperimentCondition{
Type: v1alpha1.ExperimentProgressing,
Status: corev1.ConditionFalse,
LastUpdateTime: metav1.Now().Rfc3339Copy(),
LastTransitionTime: metav1.Now().Rfc3339Copy(),
Reason: reason,
Message: fmt.Sprintf(conditions.ExperimentTimeOutMessage, experiment.Name),
}
}

return nil
}

func templateToRS(ex *v1alpha1.Experiment, template v1alpha1.TemplateSpec, availableReplicas int32) *appsv1.ReplicaSet {
newRSTemplate := *template.Template.DeepCopy()
podHash := controller.ComputeHash(&newRSTemplate, nil)
Expand Down Expand Up @@ -173,15 +219,22 @@ func generateRSName(ex *v1alpha1.Experiment, template v1alpha1.TemplateSpec) str
return fmt.Sprintf("%s-%s-%s", ex.Name, template.Name, controller.ComputeHash(&template.Template, nil))
}

func calculatePatch(ex *v1alpha1.Experiment, patch string, templates []v1alpha1.TemplateStatus) string {
func calculatePatch(ex *v1alpha1.Experiment, patch string, templates []v1alpha1.TemplateStatus, condition *v1alpha1.ExperimentCondition) string {
patchMap := make(map[string]interface{})
err := json.Unmarshal([]byte(patch), &patchMap)
if err != nil {
panic(err)
}
newStatus := patchMap["status"].(map[string]interface{})
newStatus["templateStatuses"] = templates
patchMap["status"] = newStatus
if templates != nil {
newStatus["templateStatuses"] = templates
patchMap["status"] = newStatus
}
if condition != nil {
newStatus["conditions"] = []v1alpha1.ExperimentCondition{*condition}
patchMap["status"] = newStatus
}

patchBytes, err := json.Marshal(patchMap)
if err != nil {
panic(err)
Expand Down Expand Up @@ -390,6 +443,12 @@ func (f *fixture) expectUpdateExperimentAction(experiment *v1alpha1.Experiment)
return len
}

func (f *fixture) expectGetReplicaSetAction(r *appsv1.ReplicaSet) int {
len := len(f.kubeactions)
f.kubeactions = append(f.kubeactions, core.NewGetAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r.Name))
return len
}

func (f *fixture) expectPatchReplicaSetAction(r *appsv1.ReplicaSet) int {
len := len(f.kubeactions)
f.kubeactions = append(f.kubeactions, core.NewPatchAction(schema.GroupVersionResource{Resource: "replicasets"}, r.Namespace, r.Name, types.MergePatchType, nil))
Expand Down Expand Up @@ -470,3 +529,36 @@ func TestNoReconcileForDeletedExperiment(t *testing.T) {

f.run(getKey(e, t))
}

type availableAtResults string

const (
Set availableAtResults = "Set"
Nulled availableAtResults = "NulledOut"
NoChange availableAtResults = "NoChange"
)

func validatePatch(t *testing.T, patch string, running *bool, availableleAt availableAtResults, templateStatuses []v1alpha1.TemplateStatus, conditions []v1alpha1.ExperimentCondition) {
e := v1alpha1.Experiment{}
err := json.Unmarshal([]byte(patch), &e)
if err != nil {
panic(err)
}
actualStatus := e.Status
if availableleAt == Set {
assert.NotNil(t, actualStatus.AvailableAt)
} else if availableleAt == Nulled {
assert.Contains(t, patch, `"availableAt": null`)
} else if availableleAt == NoChange {
assert.Nil(t, actualStatus.AvailableAt)
}
assert.Equal(t, e.Status.Running, running)
assert.Len(t, actualStatus.TemplateStatuses, len(templateStatuses))
for i := range templateStatuses {
assert.Contains(t, actualStatus.TemplateStatuses, templateStatuses[i])
}
assert.Len(t, actualStatus.Conditions, len(conditions))
for i := range conditions {
assert.Contains(t, actualStatus.Conditions, conditions[i])
}
}
46 changes: 41 additions & 5 deletions experiments/experiment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package experiments

import (
"time"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
Expand All @@ -22,10 +24,15 @@ func (ec *ExperimentController) reconcileExperiment(experiment *v1alpha1.Experim
return ec.syncExperimentStatus(experiment, templateRSs)
}

if experiment.Status.AvailableAt != nil && !experimentutil.PassedDurations(experiment) {
ec.checkEnqueueExperimentDuringRun(experiment)
}

statuses := experimentutil.GetTemplateStatusMapping(experiment.Status)
for i := range experiment.Spec.Templates {
template := experiment.Spec.Templates[i]
logCtx.Infof("Reconciling template %s", template.Name)
templateReady, err := ec.reconcileTemplate(experiment, template, templateRSs)
templateReady, err := ec.reconcileTemplate(experiment, template, statuses[template.Name], templateRSs)
if err != nil {
return err
}
Expand All @@ -37,11 +44,11 @@ func (ec *ExperimentController) reconcileExperiment(experiment *v1alpha1.Experim
return ec.syncExperimentStatus(experiment, templateRSs)
}

func (ec *ExperimentController) reconcileTemplate(experiment *v1alpha1.Experiment, template v1alpha1.TemplateSpec, templateRSs map[string]*appsv1.ReplicaSet) (bool, error) {
func (ec *ExperimentController) reconcileTemplate(experiment *v1alpha1.Experiment, template v1alpha1.TemplateSpec, templateStatus v1alpha1.TemplateStatus, templateRSs map[string]*appsv1.ReplicaSet) (bool, error) {
name := template.Name
existingTemplateRS, ok := templateRSs[name]
if !ok {
newRS, err := ec.reconcileReplicaSet(experiment, template)
newRS, err := ec.reconcileReplicaSet(experiment, template, templateStatus)
if err != nil {
return false, err
}
Expand All @@ -64,21 +71,49 @@ func (ec *ExperimentController) reconcileTemplate(experiment *v1alpha1.Experimen
return true, nil
}

func (ec *ExperimentController) checkEnqueueExperimentDuringRun(experiment *v1alpha1.Experiment) {
if experiment.Status.AvailableAt == nil || experiment.Spec.Duration == nil {
return
}
logCtx := logutil.WithExperiment(experiment)
now := metav1.Now()
startTime := experiment.Status.AvailableAt
expiredTime := startTime.Add(time.Duration(*experiment.Spec.Duration) * time.Second)
nextResync := now.Add(ec.resyncPeriod)
if nextResync.After(expiredTime) && expiredTime.After(now.Time) {
timeRemaining := expiredTime.Sub(now.Time)
logCtx.Infof("Enqueueing Experiment in %s seconds", timeRemaining.String())
ec.enqueueExperimentAfter(experiment, timeRemaining)
}
}

func (ec *ExperimentController) syncExperimentStatus(experiment *v1alpha1.Experiment, templateRSs map[string]*appsv1.ReplicaSet) error {
newStatus := v1alpha1.ExperimentStatus{}
newStatus := v1alpha1.ExperimentStatus{
Conditions: experiment.Status.Conditions,
}

newStatus.Running = experiment.Status.Running
if !experimentutil.HasStarted(experiment) {
newStatus.Running = pointer.BoolPtr(true)
}

if experimentutil.PassedDurations(experiment) {
newStatus.Running = pointer.BoolPtr(false)
}

previousTemplateStatus := experimentutil.GetTemplateStatusMapping(experiment.Status)

allAvailable := true
for i := range experiment.Spec.Templates {
template := experiment.Spec.Templates[i]
rs, ok := templateRSs[template.Name]
templateStatus := v1alpha1.TemplateStatus{
Name: template.Name,
}
if previousStatus, ok := previousTemplateStatus[template.Name]; ok {
templateStatus.CollisionCount = previousStatus.CollisionCount
}

rs, ok := templateRSs[template.Name]
if ok {
replicaCount := defaults.GetExperimentTemplateReplicasOrDefault(template)
templateStatus.Replicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{rs})
Expand All @@ -100,6 +135,7 @@ func (ec *ExperimentController) syncExperimentStatus(experiment *v1alpha1.Experi
newStatus.AvailableAt = &now
}

newStatus = ec.calculateExperimentConditions(experiment, newStatus, templateRSs)
return ec.persistExperimentStatus(experiment, &newStatus)
}

Expand Down
Loading

0 comments on commit 98f1bdb

Please sign in to comment.