From 94a00f057121d2d6658cac092026a92b69002245 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Sat, 19 Oct 2019 18:19:41 -0700 Subject: [PATCH] Add analysis informers and listers to experiment controller --- controller/controller.go | 2 + experiments/controller.go | 52 ++++--- experiments/controller_test.go | 20 ++- experiments/experiment.go | 128 +++++------------- experiments/experiment_test.go | 10 +- .../e2e/functional/analysis-template-job.yaml | 19 +++ .../functional/experiment-with-analysis.yaml | 22 +++ 7 files changed, 129 insertions(+), 124 deletions(-) create mode 100644 test/e2e/functional/analysis-template-job.yaml create mode 100644 test/e2e/functional/experiment-with-analysis.yaml diff --git a/controller/controller.go b/controller/controller.go index 0e393d518e..e229d6e4bb 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -129,6 +129,8 @@ func NewManager( replicaSetInformer, rolloutsInformer, experimentsInformer, + analysisRunInformer, + analysisTemplateInformer, resyncPeriod, rolloutWorkqueue, experimentWorkqueue, diff --git a/experiments/controller.go b/experiments/controller.go index ae39b9e0fd..43a8e3e01f 100644 --- a/experiments/controller.go +++ b/experiments/controller.go @@ -38,13 +38,17 @@ type ExperimentController struct { // rsControl is used for adopting/releasing replica sets. replicaSetControl controller.RSControlInterface - replicaSetLister appslisters.ReplicaSetLister - rolloutsLister listers.RolloutLister - experimentsLister listers.ExperimentLister + replicaSetLister appslisters.ReplicaSetLister + rolloutsLister listers.RolloutLister + experimentsLister listers.ExperimentLister + analysisTemplateLister listers.AnalysisTemplateLister + analysisRunLister listers.AnalysisRunLister - replicaSetSynced cache.InformerSynced - experimentSynced cache.InformerSynced - rolloutSynced cache.InformerSynced + replicaSetSynced cache.InformerSynced + experimentSynced cache.InformerSynced + rolloutSynced cache.InformerSynced + analysisRunSynced cache.InformerSynced + analysisTemplateSynced cache.InformerSynced metricsServer *metrics.MetricsServer @@ -72,6 +76,8 @@ func NewExperimentController( replicaSetInformer appsinformers.ReplicaSetInformer, rolloutsInformer informers.RolloutInformer, experimentsInformer informers.ExperimentInformer, + analysisRunInformer informers.AnalysisRunInformer, + analysisTemplateInformer informers.AnalysisTemplateInformer, resyncPeriod time.Duration, rolloutWorkQueue workqueue.RateLimitingInterface, experimentWorkQueue workqueue.RateLimitingInterface, @@ -84,21 +90,25 @@ func NewExperimentController( } controller := &ExperimentController{ - kubeclientset: kubeclientset, - argoProjClientset: argoProjClientset, - replicaSetControl: replicaSetControl, - replicaSetLister: replicaSetInformer.Lister(), - rolloutsLister: rolloutsInformer.Lister(), - experimentsLister: experimentsInformer.Lister(), - metricsServer: metricsServer, - rolloutWorkqueue: rolloutWorkQueue, - experimentWorkqueue: experimentWorkQueue, + kubeclientset: kubeclientset, + argoProjClientset: argoProjClientset, + replicaSetControl: replicaSetControl, + replicaSetLister: replicaSetInformer.Lister(), + rolloutsLister: rolloutsInformer.Lister(), + experimentsLister: experimentsInformer.Lister(), + analysisTemplateLister: analysisTemplateInformer.Lister(), + analysisRunLister: analysisRunInformer.Lister(), + metricsServer: metricsServer, + rolloutWorkqueue: rolloutWorkQueue, + experimentWorkqueue: experimentWorkQueue, - replicaSetSynced: replicaSetInformer.Informer().HasSynced, - experimentSynced: experimentsInformer.Informer().HasSynced, - rolloutSynced: rolloutsInformer.Informer().HasSynced, - recorder: recorder, - resyncPeriod: resyncPeriod, + replicaSetSynced: replicaSetInformer.Informer().HasSynced, + experimentSynced: experimentsInformer.Informer().HasSynced, + rolloutSynced: rolloutsInformer.Informer().HasSynced, + analysisRunSynced: analysisRunInformer.Informer().HasSynced, + analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced, + recorder: recorder, + resyncPeriod: resyncPeriod, } controller.enqueueExperiment = func(obj interface{}) { @@ -237,6 +247,8 @@ func (ec *ExperimentController) syncHandler(key string) error { ec.kubeclientset, ec.argoProjClientset, ec.replicaSetLister, + ec.analysisTemplateLister, + ec.analysisRunLister, ec.recorder, ec.enqueueExperimentAfter, ) diff --git a/experiments/controller_test.go b/experiments/controller_test.go index 4d2a21391c..2354dff404 100644 --- a/experiments/controller_test.go +++ b/experiments/controller_test.go @@ -306,6 +306,8 @@ func (f *fixture) newController(resync resyncFunc) (*ExperimentController, infor k8sI.Apps().V1().ReplicaSets(), i.Argoproj().V1alpha1().Rollouts(), i.Argoproj().V1alpha1().Experiments(), + i.Argoproj().V1alpha1().AnalysisRuns(), + i.Argoproj().V1alpha1().AnalysisTemplates(), resync(), rolloutWorkqueue, experimentWorkqueue, @@ -358,7 +360,7 @@ func (f *fixture) runController(experimentName string, startInformers bool, expe i.Start(stopCh) k8sI.Start(stopCh) - assert.True(f.t, cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutSynced, c.experimentSynced)) + assert.True(f.t, cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced)) } err := c.syncHandler(experimentName) @@ -425,7 +427,7 @@ func checkAction(expected, actual core.Action, t *testing.T) { // filterInformerActions filters list, and watch actions for testing resources. // Since list, and watch don't change resource state we can filter it to lower -// nose level in our tests. +// noise level in our tests. func filterInformerActions(actions []core.Action) []core.Action { ret := []core.Action{} for _, action := range actions { @@ -434,7 +436,11 @@ func filterInformerActions(actions []core.Action) []core.Action { action.Matches("list", "replicaSets") || action.Matches("watch", "replicaSets") || action.Matches("list", "experiments") || - action.Matches("watch", "experiments") { + action.Matches("watch", "experiments") || + action.Matches("list", "analysistemplates") || + action.Matches("watch", "analysistemplates") || + action.Matches("list", "analysisruns") || + action.Matches("watch", "analysisruns") { continue } ret = append(ret, action) @@ -563,18 +569,18 @@ const ( NoChange availableAtResults = "NoChange" ) -func validatePatch(t *testing.T, patch string, running *bool, availableleAt availableAtResults, templateStatuses []v1alpha1.TemplateStatus, conditions []v1alpha1.ExperimentCondition) { +func validatePatch(t *testing.T, patch string, running *bool, availableAt availableAtResults, templateStatuses []v1alpha1.TemplateStatus, conditions []v1alpha1.ExperimentCondition) { e := v1alpha1.Experiment{} err := json.Unmarshal([]byte(patch), &e) if err != nil { panic(err) } actualStatus := e.Status - if availableleAt == Set { + if availableAt == Set { assert.NotNil(t, actualStatus.AvailableAt) - } else if availableleAt == Nulled { + } else if availableAt == Nulled { assert.Contains(t, patch, `"availableAt": null`) - } else if availableleAt == NoChange { + } else if availableAt == NoChange { assert.Nil(t, actualStatus.AvailableAt) } assert.Equal(t, e.Status.Running, running) diff --git a/experiments/experiment.go b/experiments/experiment.go index 8dc47105d6..1e6c9a7e26 100644 --- a/experiments/experiment.go +++ b/experiments/experiment.go @@ -49,6 +49,8 @@ func newExperimentContext( kubeclientset kubernetes.Interface, argoProjClientset clientset.Interface, replicaSetLister appslisters.ReplicaSetLister, + analysisTemplateLister rolloutslisters.AnalysisTemplateLister, + analysisRunLister rolloutslisters.AnalysisRunLister, recorder record.EventRecorder, enqueueExperimentAfter func(obj interface{}, duration time.Duration), ) *experimentContext { @@ -59,6 +61,8 @@ func newExperimentContext( kubeclientset: kubeclientset, argoProjClientset: argoProjClientset, replicaSetLister: replicaSetLister, + analysisTemplateLister: analysisTemplateLister, + analysisRunLister: analysisRunLister, recorder: recorder, enqueueExperimentAfter: enqueueExperimentAfter, @@ -88,94 +92,39 @@ func (ec *experimentContext) reconcile() *v1alpha1.ExperimentStatus { return ec.calculateStatus() } -func (ec *experimentContext) reconcileTemplates() error { - var origErr error - statuses := experimentutil.GetTemplateStatusMapping(ec.ex.Status) - for _, template := range ec.ex.Spec.Templates { - templateReady, err := ec.reconcileTemplateOld(template, statuses[template.Name]) - if err != nil && origErr == nil { - origErr = err - } - if !templateReady { - ec.log.Infof("Not finished reconciling template %s", template.Name) - } - } - return origErr -} - // reconcileTemplate reconciles a template to a ReplicaSet. Creates or scales them down as necessary func (ec *experimentContext) reconcileTemplate(template v1alpha1.TemplateSpec) { ec.log.Infof("Reconciling template %s", template.Name) templateStatus := experimentutil.GetTemplateStatus(ec.ex.Status, template.Name) - var collisionCount *int32 - if templateStatus != nil { - collisionCount = templateStatus.CollisionCount + if templateStatus == nil { + templateStatus = &v1alpha1.TemplateStatus{ + Name: template.Name, + } } existingTemplateRS, replicaSetExists := ec.templateRSs[template.Name] if !replicaSetExists { if ec.isTerminating { ec.log.Warnf("Skipping ReplicaSet creation for template %s: experiment is terminating", template.Name) - return - } - newRS, err := ec.createReplicaSet(template, collisionCount) - if err != nil { - ec.log.Warnf("Failed to create ReplicaSet: %v", err) - return - } - if newRS != nil { - ec.templateRSs[template.Name] = newRS - } - return - } - - // If we get here, replicaset exists. We need to ensure it's scaled properly based on - // termination, or changed replica count - var templateReplicaCount int32 = 0 - if !ec.isTerminating { - templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template) - } - if *existingTemplateRS.Spec.Replicas != templateReplicaCount { - ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount) - } -} - -// reconcileTemplate reconciles a template to a ReplicaSet. Creates or deletes them as necessary -func (ec *experimentContext) reconcileTemplateOld(template v1alpha1.TemplateSpec, templateStatus v1alpha1.TemplateStatus) (bool, error) { - ec.log.Infof("Reconciling template %s", template.Name) - existingTemplateRS, ok := ec.templateRSs[template.Name] - if !ok { - if ec.isTerminating { - ec.log.Warnf("Skipping ReplicaSet creation for template %s since experiment is terminating", template.Name) - return false, nil } else { newRS, err := ec.createReplicaSet(template, templateStatus.CollisionCount) if err != nil { - return false, err + ec.log.Warnf("Failed to create ReplicaSet: %v", err) + } + if newRS != nil { + ec.templateRSs[template.Name] = newRS } - ec.templateRSs[template.Name] = newRS - return false, nil } - } - - // If we get here, replicaset exists. We need to ensure it's scaled properly based on - // termination, or new replica count - var templateReplicaCount int32 = 0 - if !ec.isTerminating { - templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template) - } - if *existingTemplateRS.Spec.Replicas != templateReplicaCount { - scaled, _, err := ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount) - if err != nil { - return false, err + } else { + // If we get here, replicaset exists. We need to ensure it's scaled properly based on + // termination, or changed replica count + var templateReplicaCount int32 = 0 + if !ec.isTerminating { + templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template) } - if scaled { - return false, nil + if *existingTemplateRS.Spec.Replicas != templateReplicaCount { + ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount) } } - if templateReplicaCount != replicasetutil.GetAvailableReplicaCountForReplicaSets([]*appsv1.ReplicaSet{existingTemplateRS}) { - return false, nil - } - return true, nil } // enqueueAfterDuration enqueues the experiment at the appropriate duration time after status.availableAt @@ -293,33 +242,25 @@ func (ec *experimentContext) createAnalysisRun(analysis v1alpha1.ExperimentAnaly } func (ec *experimentContext) calculateStatus() *v1alpha1.ExperimentStatus { - newStatus := v1alpha1.ExperimentStatus{ - Conditions: ec.ex.Status.Conditions, - } - - newStatus.Running = ec.ex.Status.Running if !experimentutil.HasStarted(ec.ex) { - newStatus.Running = pointer.BoolPtr(true) + ec.newStatus.Running = pointer.BoolPtr(true) } if passed, _ := experimentutil.PassedDurations(ec.ex); passed { - newStatus.Running = pointer.BoolPtr(false) + ec.newStatus.Running = pointer.BoolPtr(false) } - previousTemplateStatus := experimentutil.GetTemplateStatusMapping(ec.ex.Status) - allAvailable := true - for i := range ec.ex.Spec.Templates { - template := ec.ex.Spec.Templates[i] - templateStatus := v1alpha1.TemplateStatus{ - Name: template.Name, - } - if previousStatus, ok := previousTemplateStatus[template.Name]; ok { - templateStatus.CollisionCount = previousStatus.CollisionCount + for _, template := range ec.ex.Spec.Templates { + templateStatus := experimentutil.GetTemplateStatus(*ec.newStatus, template.Name) + if templateStatus == nil { + allAvailable = false + templateStatus = &v1alpha1.TemplateStatus{ + Name: template.Name, + } } - rs, ok := ec.templateRSs[template.Name] - if ok { + if rs, ok := ec.templateRSs[template.Name]; ok { replicaCount := defaults.GetExperimentTemplateReplicasOrDefault(template) templateStatus.Replicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{rs}) templateStatus.UpdatedReplicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{rs}) @@ -331,15 +272,14 @@ func (ec *experimentContext) calculateStatus() *v1alpha1.ExperimentStatus { } else { allAvailable = false } - newStatus.TemplateStatuses = append(newStatus.TemplateStatuses, templateStatus) + experimentutil.SetTemplateStatus(ec.newStatus, *templateStatus) } - newStatus.AvailableAt = ec.ex.Status.AvailableAt - if allAvailable && ec.ex.Status.AvailableAt == nil { + if allAvailable && ec.newStatus.AvailableAt == nil { now := metav1.Now() - newStatus.AvailableAt = &now + ec.newStatus.AvailableAt = &now } - return calculateExperimentConditions(ec.ex, newStatus) + return calculateExperimentConditions(ec.ex, *ec.newStatus) } // newAnalysisRun generates an AnalysisRun from the experiment and template diff --git a/experiments/experiment_test.go b/experiments/experiment_test.go index 2b517a1c53..185eceea61 100644 --- a/experiments/experiment_test.go +++ b/experiments/experiment_test.go @@ -20,7 +20,7 @@ import ( "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" - //informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions" + informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions" "github.com/argoproj/argo-rollouts/utils/conditions" ) @@ -40,8 +40,9 @@ func newTestContext(ex *v1alpha1.Experiment, objects ...runtime.Object) *experim k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, noResyncPeriodFunc()) rsLister := k8sI.Apps().V1().ReplicaSets().Lister() - //rolloutsI := informers.NewSharedInformerFactory(f.client, resync()) - //analysisRunLister := rolloutsI.Argoproj().V1alpha1().AnalysisRuns().Lister() + rolloutsI := informers.NewSharedInformerFactory(rolloutclient, noResyncPeriodFunc()) + analysisRunLister := rolloutsI.Argoproj().V1alpha1().AnalysisRuns().Lister() + analysisTemplateLister := rolloutsI.Argoproj().V1alpha1().AnalysisTemplates().Lister() return newExperimentContext( ex, @@ -49,6 +50,8 @@ func newTestContext(ex *v1alpha1.Experiment, objects ...runtime.Object) *experim kubeclient, rolloutclient, rsLister, + analysisTemplateLister, + analysisRunLister, &record.FakeRecorder{}, func(obj interface{}, duration time.Duration) {}, ) @@ -226,4 +229,5 @@ func TestFailReplicaSetCreation(t *testing.T) { return true, nil, errors.New("intentional error") }) exCtx.reconcile() + // TODO: check that we set condition } diff --git a/test/e2e/functional/analysis-template-job.yaml b/test/e2e/functional/analysis-template-job.yaml new file mode 100644 index 0000000000..39c02ba961 --- /dev/null +++ b/test/e2e/functional/analysis-template-job.yaml @@ -0,0 +1,19 @@ + +kind: AnalysisTemplate +apiVersion: argoproj.io/v1alpha1 +metadata: + name: job +spec: + metrics: + - name: test + provider: + job: + spec: + template: + spec: + containers: + - name: sleep + image: alpine:3.8 + command: [sleep, "30"] + restartPolicy: Never + backoffLimit: 0 diff --git a/test/e2e/functional/experiment-with-analysis.yaml b/test/e2e/functional/experiment-with-analysis.yaml new file mode 100644 index 0000000000..642b3f07b4 --- /dev/null +++ b/test/e2e/functional/experiment-with-analysis.yaml @@ -0,0 +1,22 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Experiment +metadata: + generateName: experiment-with-analysis- +spec: + duration: 3600 + templates: + - name: baseline + selector: + matchLabels: + app: rollouts-demo + template: + metadata: + labels: + app: rollouts-demo + spec: + containers: + - name: rollouts-demo + image: argoproj/rollouts-demo:blue + analyses: + - name: job + templateName: job