Skip to content

Commit

Permalink
Add analysis informers and listers to experiment controller
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Oct 20, 2019
1 parent 3648c50 commit 94a00f0
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 124 deletions.
2 changes: 2 additions & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func NewManager(
replicaSetInformer,
rolloutsInformer,
experimentsInformer,
analysisRunInformer,
analysisTemplateInformer,
resyncPeriod,
rolloutWorkqueue,
experimentWorkqueue,
Expand Down
52 changes: 32 additions & 20 deletions experiments/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ type ExperimentController struct {
// rsControl is used for adopting/releasing replica sets.
replicaSetControl controller.RSControlInterface

replicaSetLister appslisters.ReplicaSetLister
rolloutsLister listers.RolloutLister
experimentsLister listers.ExperimentLister
replicaSetLister appslisters.ReplicaSetLister
rolloutsLister listers.RolloutLister
experimentsLister listers.ExperimentLister
analysisTemplateLister listers.AnalysisTemplateLister
analysisRunLister listers.AnalysisRunLister

replicaSetSynced cache.InformerSynced
experimentSynced cache.InformerSynced
rolloutSynced cache.InformerSynced
replicaSetSynced cache.InformerSynced
experimentSynced cache.InformerSynced
rolloutSynced cache.InformerSynced
analysisRunSynced cache.InformerSynced
analysisTemplateSynced cache.InformerSynced

metricsServer *metrics.MetricsServer

Expand Down Expand Up @@ -72,6 +76,8 @@ func NewExperimentController(
replicaSetInformer appsinformers.ReplicaSetInformer,
rolloutsInformer informers.RolloutInformer,
experimentsInformer informers.ExperimentInformer,
analysisRunInformer informers.AnalysisRunInformer,
analysisTemplateInformer informers.AnalysisTemplateInformer,
resyncPeriod time.Duration,
rolloutWorkQueue workqueue.RateLimitingInterface,
experimentWorkQueue workqueue.RateLimitingInterface,
Expand All @@ -84,21 +90,25 @@ func NewExperimentController(
}

controller := &ExperimentController{
kubeclientset: kubeclientset,
argoProjClientset: argoProjClientset,
replicaSetControl: replicaSetControl,
replicaSetLister: replicaSetInformer.Lister(),
rolloutsLister: rolloutsInformer.Lister(),
experimentsLister: experimentsInformer.Lister(),
metricsServer: metricsServer,
rolloutWorkqueue: rolloutWorkQueue,
experimentWorkqueue: experimentWorkQueue,
kubeclientset: kubeclientset,
argoProjClientset: argoProjClientset,
replicaSetControl: replicaSetControl,
replicaSetLister: replicaSetInformer.Lister(),
rolloutsLister: rolloutsInformer.Lister(),
experimentsLister: experimentsInformer.Lister(),
analysisTemplateLister: analysisTemplateInformer.Lister(),
analysisRunLister: analysisRunInformer.Lister(),
metricsServer: metricsServer,
rolloutWorkqueue: rolloutWorkQueue,
experimentWorkqueue: experimentWorkQueue,

replicaSetSynced: replicaSetInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
recorder: recorder,
resyncPeriod: resyncPeriod,
replicaSetSynced: replicaSetInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
recorder: recorder,
resyncPeriod: resyncPeriod,
}

controller.enqueueExperiment = func(obj interface{}) {
Expand Down Expand Up @@ -237,6 +247,8 @@ func (ec *ExperimentController) syncHandler(key string) error {
ec.kubeclientset,
ec.argoProjClientset,
ec.replicaSetLister,
ec.analysisTemplateLister,
ec.analysisRunLister,
ec.recorder,
ec.enqueueExperimentAfter,
)
Expand Down
20 changes: 13 additions & 7 deletions experiments/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ func (f *fixture) newController(resync resyncFunc) (*ExperimentController, infor
k8sI.Apps().V1().ReplicaSets(),
i.Argoproj().V1alpha1().Rollouts(),
i.Argoproj().V1alpha1().Experiments(),
i.Argoproj().V1alpha1().AnalysisRuns(),
i.Argoproj().V1alpha1().AnalysisTemplates(),
resync(),
rolloutWorkqueue,
experimentWorkqueue,
Expand Down Expand Up @@ -358,7 +360,7 @@ func (f *fixture) runController(experimentName string, startInformers bool, expe
i.Start(stopCh)
k8sI.Start(stopCh)

assert.True(f.t, cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutSynced, c.experimentSynced))
assert.True(f.t, cache.WaitForCacheSync(stopCh, c.replicaSetSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced))
}

err := c.syncHandler(experimentName)
Expand Down Expand Up @@ -425,7 +427,7 @@ func checkAction(expected, actual core.Action, t *testing.T) {

// filterInformerActions filters list, and watch actions for testing resources.
// Since list, and watch don't change resource state we can filter it to lower
// nose level in our tests.
// noise level in our tests.
func filterInformerActions(actions []core.Action) []core.Action {
ret := []core.Action{}
for _, action := range actions {
Expand All @@ -434,7 +436,11 @@ func filterInformerActions(actions []core.Action) []core.Action {
action.Matches("list", "replicaSets") ||
action.Matches("watch", "replicaSets") ||
action.Matches("list", "experiments") ||
action.Matches("watch", "experiments") {
action.Matches("watch", "experiments") ||
action.Matches("list", "analysistemplates") ||
action.Matches("watch", "analysistemplates") ||
action.Matches("list", "analysisruns") ||
action.Matches("watch", "analysisruns") {
continue
}
ret = append(ret, action)
Expand Down Expand Up @@ -563,18 +569,18 @@ const (
NoChange availableAtResults = "NoChange"
)

func validatePatch(t *testing.T, patch string, running *bool, availableleAt availableAtResults, templateStatuses []v1alpha1.TemplateStatus, conditions []v1alpha1.ExperimentCondition) {
func validatePatch(t *testing.T, patch string, running *bool, availableAt availableAtResults, templateStatuses []v1alpha1.TemplateStatus, conditions []v1alpha1.ExperimentCondition) {
e := v1alpha1.Experiment{}
err := json.Unmarshal([]byte(patch), &e)
if err != nil {
panic(err)
}
actualStatus := e.Status
if availableleAt == Set {
if availableAt == Set {
assert.NotNil(t, actualStatus.AvailableAt)
} else if availableleAt == Nulled {
} else if availableAt == Nulled {
assert.Contains(t, patch, `"availableAt": null`)
} else if availableleAt == NoChange {
} else if availableAt == NoChange {
assert.Nil(t, actualStatus.AvailableAt)
}
assert.Equal(t, e.Status.Running, running)
Expand Down
128 changes: 34 additions & 94 deletions experiments/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func newExperimentContext(
kubeclientset kubernetes.Interface,
argoProjClientset clientset.Interface,
replicaSetLister appslisters.ReplicaSetLister,
analysisTemplateLister rolloutslisters.AnalysisTemplateLister,
analysisRunLister rolloutslisters.AnalysisRunLister,
recorder record.EventRecorder,
enqueueExperimentAfter func(obj interface{}, duration time.Duration),
) *experimentContext {
Expand All @@ -59,6 +61,8 @@ func newExperimentContext(
kubeclientset: kubeclientset,
argoProjClientset: argoProjClientset,
replicaSetLister: replicaSetLister,
analysisTemplateLister: analysisTemplateLister,
analysisRunLister: analysisRunLister,
recorder: recorder,
enqueueExperimentAfter: enqueueExperimentAfter,

Expand Down Expand Up @@ -88,94 +92,39 @@ func (ec *experimentContext) reconcile() *v1alpha1.ExperimentStatus {
return ec.calculateStatus()
}

func (ec *experimentContext) reconcileTemplates() error {
var origErr error
statuses := experimentutil.GetTemplateStatusMapping(ec.ex.Status)
for _, template := range ec.ex.Spec.Templates {
templateReady, err := ec.reconcileTemplateOld(template, statuses[template.Name])
if err != nil && origErr == nil {
origErr = err
}
if !templateReady {
ec.log.Infof("Not finished reconciling template %s", template.Name)
}
}
return origErr
}

// reconcileTemplate reconciles a template to a ReplicaSet. Creates or scales them down as necessary
func (ec *experimentContext) reconcileTemplate(template v1alpha1.TemplateSpec) {
ec.log.Infof("Reconciling template %s", template.Name)
templateStatus := experimentutil.GetTemplateStatus(ec.ex.Status, template.Name)
var collisionCount *int32
if templateStatus != nil {
collisionCount = templateStatus.CollisionCount
if templateStatus == nil {
templateStatus = &v1alpha1.TemplateStatus{
Name: template.Name,
}
}
existingTemplateRS, replicaSetExists := ec.templateRSs[template.Name]
if !replicaSetExists {
if ec.isTerminating {
ec.log.Warnf("Skipping ReplicaSet creation for template %s: experiment is terminating", template.Name)
return
}
newRS, err := ec.createReplicaSet(template, collisionCount)
if err != nil {
ec.log.Warnf("Failed to create ReplicaSet: %v", err)
return
}
if newRS != nil {
ec.templateRSs[template.Name] = newRS
}
return
}

// If we get here, replicaset exists. We need to ensure it's scaled properly based on
// termination, or changed replica count
var templateReplicaCount int32 = 0
if !ec.isTerminating {
templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template)
}
if *existingTemplateRS.Spec.Replicas != templateReplicaCount {
ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount)
}
}

// reconcileTemplate reconciles a template to a ReplicaSet. Creates or deletes them as necessary
func (ec *experimentContext) reconcileTemplateOld(template v1alpha1.TemplateSpec, templateStatus v1alpha1.TemplateStatus) (bool, error) {
ec.log.Infof("Reconciling template %s", template.Name)
existingTemplateRS, ok := ec.templateRSs[template.Name]
if !ok {
if ec.isTerminating {
ec.log.Warnf("Skipping ReplicaSet creation for template %s since experiment is terminating", template.Name)
return false, nil
} else {
newRS, err := ec.createReplicaSet(template, templateStatus.CollisionCount)
if err != nil {
return false, err
ec.log.Warnf("Failed to create ReplicaSet: %v", err)
}
if newRS != nil {
ec.templateRSs[template.Name] = newRS
}
ec.templateRSs[template.Name] = newRS
return false, nil
}
}

// If we get here, replicaset exists. We need to ensure it's scaled properly based on
// termination, or new replica count
var templateReplicaCount int32 = 0
if !ec.isTerminating {
templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template)
}
if *existingTemplateRS.Spec.Replicas != templateReplicaCount {
scaled, _, err := ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount)
if err != nil {
return false, err
} else {
// If we get here, replicaset exists. We need to ensure it's scaled properly based on
// termination, or changed replica count
var templateReplicaCount int32 = 0
if !ec.isTerminating {
templateReplicaCount = experimentutil.CalculateTemplateReplicasCount(ec.ex, template)
}
if scaled {
return false, nil
if *existingTemplateRS.Spec.Replicas != templateReplicaCount {
ec.scaleReplicaSetAndRecordEvent(existingTemplateRS, templateReplicaCount)
}
}
if templateReplicaCount != replicasetutil.GetAvailableReplicaCountForReplicaSets([]*appsv1.ReplicaSet{existingTemplateRS}) {
return false, nil
}
return true, nil
}

// enqueueAfterDuration enqueues the experiment at the appropriate duration time after status.availableAt
Expand Down Expand Up @@ -293,33 +242,25 @@ func (ec *experimentContext) createAnalysisRun(analysis v1alpha1.ExperimentAnaly
}

func (ec *experimentContext) calculateStatus() *v1alpha1.ExperimentStatus {
newStatus := v1alpha1.ExperimentStatus{
Conditions: ec.ex.Status.Conditions,
}

newStatus.Running = ec.ex.Status.Running
if !experimentutil.HasStarted(ec.ex) {
newStatus.Running = pointer.BoolPtr(true)
ec.newStatus.Running = pointer.BoolPtr(true)
}

if passed, _ := experimentutil.PassedDurations(ec.ex); passed {
newStatus.Running = pointer.BoolPtr(false)
ec.newStatus.Running = pointer.BoolPtr(false)
}

previousTemplateStatus := experimentutil.GetTemplateStatusMapping(ec.ex.Status)

allAvailable := true
for i := range ec.ex.Spec.Templates {
template := ec.ex.Spec.Templates[i]
templateStatus := v1alpha1.TemplateStatus{
Name: template.Name,
}
if previousStatus, ok := previousTemplateStatus[template.Name]; ok {
templateStatus.CollisionCount = previousStatus.CollisionCount
for _, template := range ec.ex.Spec.Templates {
templateStatus := experimentutil.GetTemplateStatus(*ec.newStatus, template.Name)
if templateStatus == nil {
allAvailable = false
templateStatus = &v1alpha1.TemplateStatus{
Name: template.Name,
}
}

rs, ok := ec.templateRSs[template.Name]
if ok {
if rs, ok := ec.templateRSs[template.Name]; ok {
replicaCount := defaults.GetExperimentTemplateReplicasOrDefault(template)
templateStatus.Replicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{rs})
templateStatus.UpdatedReplicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{rs})
Expand All @@ -331,15 +272,14 @@ func (ec *experimentContext) calculateStatus() *v1alpha1.ExperimentStatus {
} else {
allAvailable = false
}
newStatus.TemplateStatuses = append(newStatus.TemplateStatuses, templateStatus)
experimentutil.SetTemplateStatus(ec.newStatus, *templateStatus)
}

newStatus.AvailableAt = ec.ex.Status.AvailableAt
if allAvailable && ec.ex.Status.AvailableAt == nil {
if allAvailable && ec.newStatus.AvailableAt == nil {
now := metav1.Now()
newStatus.AvailableAt = &now
ec.newStatus.AvailableAt = &now
}
return calculateExperimentConditions(ec.ex, newStatus)
return calculateExperimentConditions(ec.ex, *ec.newStatus)
}

// newAnalysisRun generates an AnalysisRun from the experiment and template
Expand Down
10 changes: 7 additions & 3 deletions experiments/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"

//informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions"
"github.com/argoproj/argo-rollouts/utils/conditions"
)

Expand All @@ -40,15 +40,18 @@ func newTestContext(ex *v1alpha1.Experiment, objects ...runtime.Object) *experim

k8sI := kubeinformers.NewSharedInformerFactory(kubeclient, noResyncPeriodFunc())
rsLister := k8sI.Apps().V1().ReplicaSets().Lister()
//rolloutsI := informers.NewSharedInformerFactory(f.client, resync())
//analysisRunLister := rolloutsI.Argoproj().V1alpha1().AnalysisRuns().Lister()
rolloutsI := informers.NewSharedInformerFactory(rolloutclient, noResyncPeriodFunc())
analysisRunLister := rolloutsI.Argoproj().V1alpha1().AnalysisRuns().Lister()
analysisTemplateLister := rolloutsI.Argoproj().V1alpha1().AnalysisTemplates().Lister()

return newExperimentContext(
ex,
make(map[string]*appsv1.ReplicaSet),
kubeclient,
rolloutclient,
rsLister,
analysisTemplateLister,
analysisRunLister,
&record.FakeRecorder{},
func(obj interface{}, duration time.Duration) {},
)
Expand Down Expand Up @@ -226,4 +229,5 @@ func TestFailReplicaSetCreation(t *testing.T) {
return true, nil, errors.New("intentional error")
})
exCtx.reconcile()
// TODO: check that we set condition
}
Loading

0 comments on commit 94a00f0

Please sign in to comment.