From c4690ddcac771edfa761936763b01af4ba01f5c8 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Mon, 30 Sep 2019 16:07:33 -0700 Subject: [PATCH] Integrate analysis controller with provider interfaces (#171) --- Gopkg.lock | 14 +- Makefile | 6 +- analysis/analysis.go | 180 +++++++++++--- analysis/analysis_test.go | 231 +++++++++++++++--- analysis/controller.go | 25 +- analysis/controller_test.go | 9 + analysis/sync.go | 36 +++ cmd/rollouts-controller/main.go | 5 +- controller/controller.go | 39 ++- docs/CONTRIBUTING.md | 13 +- experiments/controller.go | 6 +- experiments/experiment.go | 2 +- experiments/replicaset.go | 4 +- manifests/crds/analysis-run-crd.yaml | 7 +- manifests/install.yaml | 7 +- manifests/namespace-install.yaml | 7 +- pkg/apis/rollouts/v1alpha1/analysis_types.go | 27 +- .../rollouts/v1alpha1/openapi_generated.go | 27 +- .../v1alpha1/zz_generated.deepcopy.go | 6 +- providers/mocks/Provider.go | 68 ++++++ providers/provider.go | 2 +- test/e2e/analysis-run.yaml | 11 + utils/analysis/analysis.go | 56 +++-- utils/analysis/analysis_test.go | 103 ++++++-- 24 files changed, 725 insertions(+), 166 deletions(-) create mode 100644 analysis/sync.go create mode 100644 providers/mocks/Provider.go create mode 100644 test/e2e/analysis-run.yaml diff --git a/Gopkg.lock b/Gopkg.lock index cfb182e478..18d90997d8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -398,10 +398,21 @@ revision = "298182f68c66c05229eb03ac171abe6e309ee79a" version = "v1.0.3" +[[projects]] + digest = "1:711eebe744c0151a9d09af2315f0bb729b2ec7637ef4c410fa90a18ef74b65b6" + name = "github.com/stretchr/objx" + packages = ["."] + pruneopts = "" + revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" + version = "v0.1.1" + [[projects]] digest = "1:c587772fb8ad29ad4db67575dad25ba17a51f072ff18a22b4f0257a4d9c24f75" name = "github.com/stretchr/testify" - packages = ["assert"] + packages = [ + "assert", + "mock", + ] pruneopts = "" revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686" version = "v1.2.2" @@ -1068,6 +1079,7 @@ "github.com/sirupsen/logrus", "github.com/spf13/cobra", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/valyala/fasttemplate", "k8s.io/api/apps/v1", "k8s.io/api/core/v1", diff --git a/Makefile b/Makefile index 5e625a1419..a7cf9f737c 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,7 @@ endif all: controller image .PHONY: codegen -codegen: +codegen: mocks ./hack/update-codegen.sh ./hack/update-openapigen.sh go run ./hack/gen-crd-spec/main.go @@ -77,6 +77,10 @@ lint: test: go test -v -covermode=count -coverprofile=coverage.out `go list ./...` +.PHONY: mocks +mocks: + mockery -dir ./providers -name Provider -output ./providers/mocks + .PHONY: manifests manifests: ./hack/update-manifests.sh diff --git a/analysis/analysis.go b/analysis/analysis.go index c82a23b1c4..0bf7dd527d 100644 --- a/analysis/analysis.go +++ b/analysis/analysis.go @@ -1,10 +1,13 @@ package analysis import ( + "fmt" "sync" "time" log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" analysisutil "github.com/argoproj/argo-rollouts/utils/analysis" @@ -15,6 +18,15 @@ const ( // DefaultMaxConsecutiveErrors is the default number times a metric can error in sequence before // erroring the entire metric. DefaultMaxConsecutiveErrors = 4 + // DefaultErrorRetryInterval is the default interval to retry a measurement upon error, in the + // event an interval was not specified + DefaultErrorRetryInterval int32 = 10 +) + +// Event reasons for analysis events +const ( + EventReasonStatusFailed = "Failed" + EventReasonStatusCompleted = "Complete" ) // metricTask holds the metric which need to be measured during this reconciliation along with @@ -24,32 +36,56 @@ type metricTask struct { incompleteMeasurement *v1alpha1.Measurement } -func (c *AnalysisController) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) error { - if origRun.Status.Status.Completed() { - return nil +func (c *AnalysisController) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alpha1.AnalysisRun { + if origRun.Status != nil && origRun.Status.Status.Completed() { + return origRun } log := logutil.WithAnalysisRun(origRun) run := origRun.DeepCopy() - if run.Status.Status == "" { + + if run.Status == nil { + run.Status = &v1alpha1.AnalysisRunStatus{ + MetricResults: make([]v1alpha1.MetricResult, 0), + } err := analysisutil.ValidateAnalysisTemplateSpec(run.Spec.AnalysisSpec) if err != nil { - log.Warnf("analysis spec invalid: %v", err) + message := fmt.Sprintf("analysis spec invalid: %v", err) + log.Warn(message) run.Status.Status = v1alpha1.AnalysisStatusError + run.Status.Message = message + c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", run.Status.Status) + return run } - return nil } tasks := generateMetricTasks(run) + log.Infof("taking %d measurements", len(tasks)) + c.runMeasurements(run, tasks) - runMeasurements(run, tasks) - - asssessRunStatus(run) + newStatus := c.asssessRunStatus(run) + if newStatus != run.Status.Status { + message := fmt.Sprintf("analysis transitioned from %s -> %s", run.Status.Status, newStatus) + if newStatus.Completed() { + switch newStatus { + case v1alpha1.AnalysisStatusError, v1alpha1.AnalysisStatusFailed: + c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", newStatus) + default: + c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "analysis completed %s", newStatus) + } + } + log.Info(message) + run.Status.Status = newStatus + } nextReconcileTime := calculateNextReconcileTime(run) if nextReconcileTime != nil { enqueueSeconds := nextReconcileTime.Sub(time.Now()) + if enqueueSeconds < 0 { + enqueueSeconds = 0 + } + log.Infof("enqueuing analysis after %v", enqueueSeconds) c.enqueueAnalysisAfter(run, enqueueSeconds) } - return nil + return run } // generateMetricTasks generates a list of metrics tasks needed to be measured as part of this @@ -84,12 +120,20 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask { log.WithField("metric", metric.Name).Infof("running initial measurement") continue } - if metric.Interval == nil { - // a measurement was already taken, and reoccurrence was not desired + metricResult := analysisutil.GetResult(run, metric.Name) + effectiveCount := metric.EffectiveCount() + if effectiveCount != nil && metricResult.Count >= *effectiveCount { + // we have reached desired count continue } - if time.Now().After(lastMeasurement.FinishedAt.Add(time.Duration(*metric.Interval) * time.Second)) { - // we are due for a measurement + // if we get here, we know we need to take a measurement (eventually). check last measurement + // to decide if it should be taken now. metric.Interval can be null because we may be + // retrying a metric due to error. + interval := DefaultErrorRetryInterval + if metric.Interval != nil { + interval = *metric.Interval + } + if time.Now().After(lastMeasurement.FinishedAt.Add(time.Duration(interval) * time.Second)) { tasks = append(tasks, metricTask{metric: metric}) log.WithField("metric", metric.Name).Infof("running overdue measurement") continue @@ -99,26 +143,49 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask { } // runMeasurements iterates a list of metric tasks, and runs or resumes measurements -func runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) { +func (c *AnalysisController) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) { var wg sync.WaitGroup + // resultsLock should be held whenever we are accessing or setting status.metricResults since + // we are performing queries in parallel + var resultsLock sync.Mutex for _, task := range tasks { wg.Add(1) - //var provider provider.MetricProvider - //provider = provider.NewProvider(task.metric) - //go func(p provider.Provider, t metricTask) { - go func(p interface{}, t metricTask) { + go func(t metricTask) { defer wg.Done() + + log := logutil.WithAnalysisRun(run).WithField("metric", t.metric.Name) + + resultsLock.Lock() + metricResult := analysisutil.GetResult(run, t.metric.Name) + resultsLock.Unlock() + if metricResult == nil { + metricResult = &v1alpha1.MetricResult{} + } + var newMeasurement v1alpha1.Measurement - metricResult := run.Status.MetricResults[t.metric.Name] - if t.incompleteMeasurement == nil { - // newMeasurement = p.Run(metric) - metricResult.Measurements = append(metricResult.Measurements, newMeasurement) + provider, err := c.newProvider(*log, t.metric) + if err != nil { + if t.incompleteMeasurement != nil { + newMeasurement = *t.incompleteMeasurement + } else { + startedAt := metav1.Now() + newMeasurement.StartedAt = &startedAt + } + newMeasurement.Status = v1alpha1.AnalysisStatusError } else { - // newMeasurement = p.Resume(metric, measurement) - metricResult.Measurements[len(metricResult.Measurements)-1] = newMeasurement + if t.incompleteMeasurement == nil { + newMeasurement, err = provider.Run(t.metric, run.Spec.Arguments) + } else { + newMeasurement, err = provider.Resume(t.metric, run.Spec.Arguments, *t.incompleteMeasurement) + } } + if newMeasurement.Status.Completed() { + if newMeasurement.FinishedAt == nil { + finishedAt := metav1.Now() + newMeasurement.FinishedAt = &finishedAt + } switch newMeasurement.Status { case v1alpha1.AnalysisStatusSuccessful: metricResult.Successful++ @@ -133,8 +200,22 @@ func runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) { metricResult.Error++ } } - }(nil, task) - //}(provider, task) + if t.incompleteMeasurement == nil { + metricResult.Measurements = append(metricResult.Measurements, newMeasurement) + } else { + metricResult.Measurements[len(metricResult.Measurements)-1] = newMeasurement + } + if err != nil { + metricResult.Message = err.Error() + } else { + metricResult.Message = "" + } + metricResult.Name = t.metric.Name + resultsLock.Lock() + analysisutil.SetResult(run, *metricResult) + resultsLock.Unlock() + + }(task) } wg.Wait() } @@ -142,12 +223,27 @@ func runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) { // asssessRunStatus assesses the overall status of this AnalysisRun // If any metric is not yet completed, the AnalysisRun is still considered Running // Once all metrics are complete, the worst status is used as the overall AnalysisRun status -func asssessRunStatus(run *v1alpha1.AnalysisRun) v1alpha1.AnalysisStatus { +func (c *AnalysisController) asssessRunStatus(run *v1alpha1.AnalysisRun) v1alpha1.AnalysisStatus { var worstStatus v1alpha1.AnalysisStatus terminating := analysisutil.IsTerminating(run) + for _, metric := range run.Spec.AnalysisSpec.Metrics { - if result, ok := run.Status.MetricResults[metric.Name]; ok { - metricStatus := assessMetricStatus(metric, result, terminating) + if result := analysisutil.GetResult(run, metric.Name); result != nil { + log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name) + metricStatus := assessMetricStatus(metric, *result, terminating) + if result.Status != metricStatus { + log.Infof("metric transitioned from %s -> %s", result.Status, metricStatus) + if metricStatus.Completed() { + switch metricStatus { + case v1alpha1.AnalysisStatusError, v1alpha1.AnalysisStatusFailed: + c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "metric '%s' completed %s", metric.Name, metricStatus) + default: + c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "metric '%s' completed %s", metric.Name, metricStatus) + } + } + result.Status = metricStatus + analysisutil.SetResult(run, *result) + } if !metricStatus.Completed() { // if any metric is not completed, then entire analysis run is considered running return v1alpha1.AnalysisStatusRunning @@ -204,7 +300,8 @@ func assessMetricStatus(metric v1alpha1.Metric, result v1alpha1.MetricResult, te // Error and Failed counters are ignored because those checks have already been taken into // consideration above, and we do not want to fail the metric if failures < maxFailures. // TODO(jessesuen): may need to tweak this logic - if metric.Count > 0 && result.Count >= metric.Count { + effectiveCount := metric.EffectiveCount() + if effectiveCount != nil && result.Count >= *effectiveCount { var status v1alpha1.AnalysisStatus if result.Successful > result.Inconclusive { status = v1alpha1.AnalysisStatusSuccessful @@ -244,12 +341,27 @@ func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time { // TODO(jessesuen) perhaps ask provider for an appropriate time to poll? continue } - if metric.Interval == nil { - // a measurement was already taken, and reoccurrence was not desired + metricResult := analysisutil.GetResult(run, metric.Name) + effectiveCount := metric.EffectiveCount() + if effectiveCount != nil && metricResult.Count >= *effectiveCount { + // we have reached desired count + continue + } + var interval int32 + if metric.Interval != nil { + interval = *metric.Interval + } else if lastMeasurement.Status == v1alpha1.AnalysisStatusError { + interval = DefaultErrorRetryInterval + } else { + // if we get here, an interval was not set (meaning reoccurrence was not desired), and + // there was no error (meaning we don't need to retry). no need to requeue this metric. + // NOTE: we shouldn't ever get here since it means we are not doing proper bookkeeping + // of count. + log.WithField("metric", metric.Name).Warnf("skipping requeue. no interval or error (count: %d, effectiveCount: %d)", metricResult.Count, metric.EffectiveCount()) continue } // Take the earliest time of all metrics - metricReconcileTime := lastMeasurement.FinishedAt.Add(time.Duration(*metric.Interval) * time.Second) + metricReconcileTime := lastMeasurement.FinishedAt.Add(time.Duration(interval) * time.Second) if reconcileTime == nil || reconcileTime.After(metricReconcileTime) { reconcileTime = &metricReconcileTime } diff --git a/analysis/analysis_test.go b/analysis/analysis_test.go index 876909bebd..dea9643a85 100644 --- a/analysis/analysis_test.go +++ b/analysis/analysis_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" @@ -15,6 +16,16 @@ func timePtr(t metav1.Time) *metav1.Time { return &t } +func newMeasurement(status v1alpha1.AnalysisStatus) v1alpha1.Measurement { + now := metav1.Now() + return v1alpha1.Measurement{ + Status: status, + Value: "100", + StartedAt: &now, + FinishedAt: &now, + } +} + func TestGenerateMetricTasksInterval(t *testing.T) { run := &v1alpha1.AnalysisRun{ Spec: v1alpha1.AnalysisRunSpec{ @@ -29,8 +40,9 @@ func TestGenerateMetricTasksInterval(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{ { @@ -51,10 +63,10 @@ func TestGenerateMetricTasksInterval(t *testing.T) { } { // ensure we do take measurements when outside interval - successRate := run.Status.MetricResults["success-rate"] + successRate := run.Status.MetricResults[0] successRate.Measurements[0].StartedAt = timePtr(metav1.NewTime(time.Now().Add(-61 * time.Second))) successRate.Measurements[0].FinishedAt = timePtr(metav1.NewTime(time.Now().Add(-61 * time.Second))) - run.Status.MetricResults["success-rate"] = successRate + run.Status.MetricResults[0] = successRate tasks := generateMetricTasks(run) assert.Equal(t, 1, len(tasks)) } @@ -76,8 +88,9 @@ func TestGenerateMetricTasksFailing(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "latency": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "latency", Status: v1alpha1.AnalysisStatusFailed, }, }, @@ -92,7 +105,7 @@ func TestGenerateMetricTasksFailing(t *testing.T) { assert.Equal(t, 2, len(tasks)) } -func TestGenerateMetricTasksNoInterval(t *testing.T) { +func TestGenerateMetricTasksNoIntervalOrCount(t *testing.T) { run := &v1alpha1.AnalysisRun{ Spec: v1alpha1.AnalysisRunSpec{ AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ @@ -105,9 +118,10 @@ func TestGenerateMetricTasksNoInterval(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { - Status: v1alpha1.AnalysisStatusRunning, + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", + Count: 1, Measurements: []v1alpha1.Measurement{ { Value: "99", @@ -121,15 +135,16 @@ func TestGenerateMetricTasksNoInterval(t *testing.T) { }, } { - // ensure we don't take measurement when interval is not specified and we already took measurement + // ensure we don't take measurement when result count indicates we completed tasks := generateMetricTasks(run) assert.Equal(t, 0, len(tasks)) } { - // ensure we do take measurements when measurment has not been taken - successRate := run.Status.MetricResults["success-rate"] + // ensure we do take measurements when measurement has not been taken + successRate := run.Status.MetricResults[0] successRate.Measurements = nil - run.Status.MetricResults["success-rate"] = successRate + successRate.Count = 0 + run.Status.MetricResults[0] = successRate tasks := generateMetricTasks(run) assert.Equal(t, 1, len(tasks)) } @@ -148,8 +163,9 @@ func TestGenerateMetricTasksIncomplete(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{ { @@ -170,7 +186,44 @@ func TestGenerateMetricTasksIncomplete(t *testing.T) { } } +func TestGenerateMetricTasksError(t *testing.T) { + run := &v1alpha1.AnalysisRun{ + Spec: v1alpha1.AnalysisRunSpec{ + AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ + Metrics: []v1alpha1.Metric{ + { + Name: "success-rate", + }, + }, + }, + }, + Status: &v1alpha1.AnalysisRunStatus{ + Status: v1alpha1.AnalysisStatusRunning, + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", + Status: v1alpha1.AnalysisStatusRunning, + Error: 1, + Measurements: []v1alpha1.Measurement{ + { + Status: v1alpha1.AnalysisStatusError, + StartedAt: timePtr(metav1.NewTime(time.Now().Add(-120 * time.Second))), + FinishedAt: timePtr(metav1.NewTime(time.Now().Add(-120 * time.Second))), + }, + }, + }, + }, + }, + } + // ensure we generate a task when have a measurement which was errored + tasks := generateMetricTasks(run) + assert.Equal(t, 1, len(tasks)) +} + func TestAssessRunStatus(t *testing.T) { + f := newFixture(t) + defer f.Close() + c, _, _ := f.newController(noResyncPeriodFunc) run := &v1alpha1.AnalysisRun{ Spec: v1alpha1.AnalysisRunSpec{ AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ @@ -189,31 +242,35 @@ func TestAssessRunStatus(t *testing.T) { // ensure if one metric is still running, entire run is still running run.Status = &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "latency": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "latency", Status: v1alpha1.AnalysisStatusSuccessful, }, - "success-rate": { + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, }, }, } - assert.Equal(t, v1alpha1.AnalysisStatusRunning, asssessRunStatus(run)) + assert.Equal(t, v1alpha1.AnalysisStatusRunning, c.asssessRunStatus(run)) } { // ensure we take the worst of the completed metrics run.Status = &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "latency": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "latency", Status: v1alpha1.AnalysisStatusSuccessful, }, - "success-rate": { + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusFailed, }, }, } - assert.Equal(t, v1alpha1.AnalysisStatusFailed, asssessRunStatus(run)) + assert.Equal(t, v1alpha1.AnalysisStatusFailed, c.asssessRunStatus(run)) } } @@ -353,8 +410,9 @@ func TestCalculateNextReconcileTimeInterval(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{ { @@ -371,12 +429,12 @@ func TestCalculateNextReconcileTimeInterval(t *testing.T) { // ensure we requeue at correct interval assert.Equal(t, now.Add(time.Second*30), *calculateNextReconcileTime(run)) // when in-flight is not set, we do not requeue - run.Status.MetricResults["success-rate"].Measurements[0].FinishedAt = nil - run.Status.MetricResults["success-rate"].Measurements[0].Status = v1alpha1.AnalysisStatusRunning + run.Status.MetricResults[0].Measurements[0].FinishedAt = nil + run.Status.MetricResults[0].Measurements[0].Status = v1alpha1.AnalysisStatusRunning assert.Nil(t, calculateNextReconcileTime(run)) // do not queue completed metrics nowMinus120 := metav1.NewTime(now.Add(time.Second * -120)) - run.Status.MetricResults["success-rate"] = v1alpha1.MetricResult{ + run.Status.MetricResults[0] = v1alpha1.MetricResult{ Status: v1alpha1.AnalysisStatusSuccessful, Measurements: []v1alpha1.Measurement{ { @@ -405,8 +463,9 @@ func TestCalculateNextReconcileTimeNoInterval(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusSuccessful, Measurements: []v1alpha1.Measurement{ { @@ -444,8 +503,9 @@ func TestCalculateNextReconcileEarliestMetric(t *testing.T) { }, Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{ { @@ -456,7 +516,8 @@ func TestCalculateNextReconcileEarliestMetric(t *testing.T) { }, }, }, - "latency": { + { + Name: "latency", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{ { @@ -473,3 +534,105 @@ func TestCalculateNextReconcileEarliestMetric(t *testing.T) { // ensure we requeue at correct interval assert.Equal(t, now.Add(time.Second*10), *calculateNextReconcileTime(run)) } + +func TestCalculateNextReconcileUponError(t *testing.T) { + now := metav1.Now() + run := &v1alpha1.AnalysisRun{ + Spec: v1alpha1.AnalysisRunSpec{ + AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ + Metrics: []v1alpha1.Metric{ + { + Name: "success-rate", + }, + }, + }, + }, + Status: &v1alpha1.AnalysisRunStatus{ + Status: v1alpha1.AnalysisStatusRunning, + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", + Status: v1alpha1.AnalysisStatusRunning, + Error: 1, + Measurements: []v1alpha1.Measurement{ + { + Value: "99", + Status: v1alpha1.AnalysisStatusError, + StartedAt: &now, + FinishedAt: &now, + }, + }, + }, + }, + }, + } + // ensure we requeue at correct interval + assert.Equal(t, now.Add(time.Second*time.Duration(DefaultErrorRetryInterval)), *calculateNextReconcileTime(run)) +} + +func TestReconcileAnalysisRunInitial(t *testing.T) { + f := newFixture(t) + defer f.Close() + c, _, _ := f.newController(noResyncPeriodFunc) + run := &v1alpha1.AnalysisRun{ + Spec: v1alpha1.AnalysisRunSpec{ + AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ + Metrics: []v1alpha1.Metric{ + { + Name: "success-rate", + Interval: pointer.Int32Ptr(60), + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, + }, + }, + }, + }, + } + f.provider.On("Run", mock.Anything, mock.Anything).Return(newMeasurement(v1alpha1.AnalysisStatusSuccessful), nil) + { + newRun := c.reconcileAnalysisRun(run) + assert.Equal(t, v1alpha1.AnalysisStatusRunning, newRun.Status.MetricResults[0].Status) + assert.Equal(t, v1alpha1.AnalysisStatusRunning, newRun.Status.Status) + assert.Equal(t, 1, len(newRun.Status.MetricResults[0].Measurements)) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.MetricResults[0].Measurements[0].Status) + } + { + // now set count to one and run should be completed immediately + run.Spec.AnalysisSpec.Metrics[0].Count = 1 + newRun := c.reconcileAnalysisRun(run) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.MetricResults[0].Status) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.Status) + assert.Equal(t, 1, len(newRun.Status.MetricResults[0].Measurements)) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.MetricResults[0].Measurements[0].Status) + } + { + // run should complete immediately if both count and interval are omitted + run.Spec.AnalysisSpec.Metrics[0].Count = 0 + run.Spec.AnalysisSpec.Metrics[0].Interval = nil + newRun := c.reconcileAnalysisRun(run) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.MetricResults[0].Status) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.Status) + assert.Equal(t, 1, len(newRun.Status.MetricResults[0].Measurements)) + assert.Equal(t, v1alpha1.AnalysisStatusSuccessful, newRun.Status.MetricResults[0].Measurements[0].Status) + } +} + +func TestReconcileAnalysisRunInvalid(t *testing.T) { + f := newFixture(t) + defer f.Close() + c, _, _ := f.newController(noResyncPeriodFunc) + run := &v1alpha1.AnalysisRun{ + Spec: v1alpha1.AnalysisRunSpec{ + AnalysisSpec: v1alpha1.AnalysisTemplateSpec{ + Metrics: []v1alpha1.Metric{ + { + Name: "success-rate", + }, + }, + }, + }, + } + newRun := c.reconcileAnalysisRun(run) + assert.Equal(t, v1alpha1.AnalysisStatusError, newRun.Status.Status) +} diff --git a/analysis/controller.go b/analysis/controller.go index 0f2926f6c0..b08d8810ec 100644 --- a/analysis/controller.go +++ b/analysis/controller.go @@ -12,9 +12,11 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/argoproj/argo-rollouts/controller/metrics" + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions/rollouts/v1alpha1" listers "github.com/argoproj/argo-rollouts/pkg/client/listers/rollouts/v1alpha1" + "github.com/argoproj/argo-rollouts/providers" controllerutil "github.com/argoproj/argo-rollouts/utils/controller" logutil "github.com/argoproj/argo-rollouts/utils/log" ) @@ -24,7 +26,7 @@ type AnalysisController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // analysisclientset is a clientset for our own API group - arogProjClientset clientset.Interface + argoProjClientset clientset.Interface analysisRunLister listers.AnalysisRunLister @@ -32,6 +34,8 @@ type AnalysisController struct { metricsServer *metrics.MetricsServer + newProvider func(logCtx log.Entry, metric v1alpha1.Metric) (providers.Provider, error) + // used for unit testing enqueueAnalysis func(obj interface{}) enqueueAnalysisAfter func(obj interface{}, duration time.Duration) @@ -51,7 +55,7 @@ type AnalysisController struct { // NewAnalysisController returns a new analysis controller func NewAnalysisController( kubeclientset kubernetes.Interface, - arogProjClientset clientset.Interface, + argoProjClientset clientset.Interface, analysisRunInformer informers.AnalysisRunInformer, resyncPeriod time.Duration, analysisRunWorkQueue workqueue.RateLimitingInterface, @@ -60,7 +64,7 @@ func NewAnalysisController( controller := &AnalysisController{ kubeclientset: kubeclientset, - arogProjClientset: arogProjClientset, + argoProjClientset: argoProjClientset, analysisRunLister: analysisRunInformer.Lister(), metricsServer: metricsServer, analysisRunWorkQueue: analysisRunWorkQueue, @@ -77,6 +81,8 @@ func NewAnalysisController( controllerutil.EnqueueAfter(obj, duration, analysisRunWorkQueue) } + controller.newProvider = providers.NewProvider + log.Info("Setting up analysis event handlers") // Set up an event handler for when analysis resources change analysisRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -110,7 +116,7 @@ func (c *AnalysisController) syncHandler(key string) error { return err } log.WithField(logutil.AnalysisRunKey, name).WithField(logutil.NamespaceKey, namespace).Infof("Started syncing Analysis at (%v)", startTime) - ar, err := c.analysisRunLister.AnalysisRuns(namespace).Get(name) + run, err := c.analysisRunLister.AnalysisRuns(namespace).Get(name) if k8serrors.IsNotFound(err) { log.WithField(logutil.AnalysisRunKey, name).WithField(logutil.NamespaceKey, namespace).Info("Analysis has been deleted") return nil @@ -121,16 +127,17 @@ func (c *AnalysisController) syncHandler(key string) error { defer func() { duration := time.Since(startTime) - //TODO(jesseseun) Add metrics for analysis + //TODO(jessesuen) Add metrics for analysis //arc.metricsServer.IncReconcile(r, duration) - logCtx := logutil.WithAnalysisRun(ar).WithField("time_ms", duration.Seconds()*1e3) + logCtx := logutil.WithAnalysisRun(run).WithField("time_ms", duration.Seconds()*1e3) logCtx.Info("Reconciliation completed") }() - if ar.DeletionTimestamp != nil { - logutil.WithAnalysisRun(ar).Info("No reconciliation as analysis marked for deletion") + if run.DeletionTimestamp != nil { + logutil.WithAnalysisRun(run).Info("No reconciliation as analysis marked for deletion") return nil } - return c.reconcileAnalysisRun(ar) + newRun := c.reconcileAnalysisRun(run) + return c.persistAnalysisRunStatus(run, newRun.Status) } diff --git a/analysis/controller_test.go b/analysis/controller_test.go index 8abf6eb9f2..e153627d89 100644 --- a/analysis/controller_test.go +++ b/analysis/controller_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/bouk/monkey" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,6 +25,8 @@ import ( "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" informers "github.com/argoproj/argo-rollouts/pkg/client/informers/externalversions" + "github.com/argoproj/argo-rollouts/providers" + "github.com/argoproj/argo-rollouts/providers/mocks" ) var ( @@ -43,6 +46,8 @@ type fixture struct { objects []runtime.Object enqueuedObjects map[string]int unfreezeTime func() + // fake provider + provider *mocks.Provider } func newFixture(t *testing.T) *fixture { @@ -107,6 +112,10 @@ func (f *fixture) newController(resync resyncFunc) (*AnalysisController, informe c.enqueueAnalysisAfter = func(obj interface{}, duration time.Duration) { c.enqueueAnalysis(obj) } + f.provider = &mocks.Provider{} + c.newProvider = func(logCtx log.Entry, metric v1alpha1.Metric) (providers.Provider, error) { + return f.provider, nil + } for _, ar := range f.analysisRunLister { i.Argoproj().V1alpha1().AnalysisRuns().Informer().GetIndexer().Add(ar) diff --git a/analysis/sync.go b/analysis/sync.go new file mode 100644 index 0000000000..2d67ee0275 --- /dev/null +++ b/analysis/sync.go @@ -0,0 +1,36 @@ +package analysis + +import ( + patchtypes "k8s.io/apimachinery/pkg/types" + + "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + "github.com/argoproj/argo-rollouts/utils/diff" + logutil "github.com/argoproj/argo-rollouts/utils/log" +) + +func (c *AnalysisController) persistAnalysisRunStatus(orig *v1alpha1.AnalysisRun, newStatus *v1alpha1.AnalysisRunStatus) error { + logCtx := logutil.WithAnalysisRun(orig) + patch, modified, err := diff.CreateTwoWayMergePatch( + &v1alpha1.AnalysisRun{ + Status: orig.Status, + }, + &v1alpha1.AnalysisRun{ + Status: newStatus, + }, v1alpha1.AnalysisRun{}) + if err != nil { + logCtx.Errorf("Error constructing AnalysisRun status patch: %v", err) + return err + } + if !modified { + logCtx.Info("No status changes. Skipping patch") + return nil + } + logCtx.Debugf("AnalysisRun Patch: %s", patch) + _, err = c.argoProjClientset.ArgoprojV1alpha1().AnalysisRuns(orig.Namespace).Patch(orig.Name, patchtypes.MergePatchType, patch) + if err != nil { + logCtx.Warningf("Error updating analysisRun: %v", err) + return err + } + logCtx.Info("Patch status successfully") + return nil +} diff --git a/cmd/rollouts-controller/main.go b/cmd/rollouts-controller/main.go index c1bb044e75..dd3d9ed03c 100644 --- a/cmd/rollouts-controller/main.go +++ b/cmd/rollouts-controller/main.go @@ -40,6 +40,7 @@ func newCommand() *cobra.Command { metricsPort int rolloutThreads int experimentThreads int + analysisThreads int serviceThreads int ) var command = cobra.Command{ @@ -85,6 +86,7 @@ func newCommand() *cobra.Command { kubeInformerFactory.Core().V1().Services(), argoRolloutsInformerFactory.Argoproj().V1alpha1().Rollouts(), argoRolloutsInformerFactory.Argoproj().V1alpha1().Experiments(), + argoRolloutsInformerFactory.Argoproj().V1alpha1().AnalysisRuns(), resyncDuration, metricsPort) @@ -93,7 +95,7 @@ func newCommand() *cobra.Command { kubeInformerFactory.Start(stopCh) argoRolloutsInformerFactory.Start(stopCh) - if err = cm.Run(rolloutThreads, serviceThreads, experimentThreads, stopCh); err != nil { + if err = cm.Run(rolloutThreads, serviceThreads, experimentThreads, analysisThreads, stopCh); err != nil { glog.Fatalf("Error running controller: %s", err.Error()) } return nil @@ -106,6 +108,7 @@ func newCommand() *cobra.Command { command.Flags().IntVar(&metricsPort, "metricsport", controller.DefaultMetricsPort, "Set the port the metrics endpoint should be exposed over") command.Flags().IntVar(&rolloutThreads, "rollout-threads", controller.DefaultRolloutThreads, "Set the number of worker threads for the Rollout controller") command.Flags().IntVar(&experimentThreads, "experiment-threads", controller.DefaultExperimentThreads, "Set the number of worker threads for the Experiment controller") + command.Flags().IntVar(&analysisThreads, "analysis-threads", controller.DefaultAnalysisThreads, "Set the number of worker threads for the Experiment controller") command.Flags().IntVar(&serviceThreads, "service-threads", controller.DefaultServiceThreads, "Set the number of worker threads for the Service controller") return &command } diff --git a/controller/controller.go b/controller/controller.go index 9c58dd6ce1..f3169300df 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -19,6 +19,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "github.com/argoproj/argo-rollouts/analysis" "github.com/argoproj/argo-rollouts/controller/metrics" "github.com/argoproj/argo-rollouts/experiments" clientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" @@ -37,13 +38,16 @@ const ( // DefaultMetricsPort Default port to expose the metrics endpoint DefaultMetricsPort = 8090 - // DefaultRolloutThreads Default number of worker threads to start with the controller + // DefaultRolloutThreads Default number of rollout worker threads to start with the controller DefaultRolloutThreads = 10 - // DefaultExperimentThreads Default number of worker threads to start with the controller + // DefaultExperimentThreads Default number of experiment worker threads to start with the controller DefaultExperimentThreads = 10 - // DefaultServiceThreads Default number of worker threads to start with the controller + // DefaultAnalysisThreads Default number of analysis worker threads to start with the controller + DefaultAnalysisThreads = 30 + + // DefaultServiceThreads Default number of service worker threads to start with the controller DefaultServiceThreads = 10 ) @@ -52,16 +56,19 @@ type Manager struct { metricsServer *metrics.MetricsServer rolloutController *rollout.RolloutController experimentController *experiments.ExperimentController + analysisController *analysis.AnalysisController serviceController *service.ServiceController rolloutSynced cache.InformerSynced experimentSynced cache.InformerSynced + analysisRunSynced cache.InformerSynced serviceSynced cache.InformerSynced replicasSetSynced cache.InformerSynced - rolloutWorkqueue workqueue.RateLimitingInterface - serviceWorkqueue workqueue.RateLimitingInterface - experimentWorkqueue workqueue.RateLimitingInterface + rolloutWorkqueue workqueue.RateLimitingInterface + serviceWorkqueue workqueue.RateLimitingInterface + experimentWorkqueue workqueue.RateLimitingInterface + analysisRunWorkqueue workqueue.RateLimitingInterface } // NewManager returns a new manager to manage all the controllers @@ -72,6 +79,7 @@ func NewManager( servicesInformer coreinformers.ServiceInformer, rolloutsInformer informers.RolloutInformer, experimentsInformer informers.ExperimentInformer, + analysisRunInformer informers.AnalysisRunInformer, resyncPeriod time.Duration, metricsPort int) *Manager { @@ -89,6 +97,7 @@ func NewManager( rolloutWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Rollouts") experimentWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Experiments") + analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AnalysisRuns") serviceWorkqueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Services") metricsServer := metrics.NewMetricsServer(metricsAddr, rolloutsInformer.Lister()) @@ -110,13 +119,20 @@ func NewManager( replicaSetInformer, rolloutsInformer, experimentsInformer, - resyncPeriod, rolloutWorkqueue, experimentWorkqueue, metricsServer, recorder) + analysisController := analysis.NewAnalysisController(kubeclientset, + argoprojclientset, + analysisRunInformer, + resyncPeriod, + analysisRunWorkqueue, + metricsServer, + recorder) + serviceController := service.NewServiceController( kubeclientset, servicesInformer, @@ -131,13 +147,16 @@ func NewManager( rolloutSynced: rolloutsInformer.Informer().HasSynced, serviceSynced: servicesInformer.Informer().HasSynced, experimentSynced: experimentsInformer.Informer().HasSynced, + analysisRunSynced: analysisRunInformer.Informer().HasSynced, replicasSetSynced: replicaSetInformer.Informer().HasSynced, rolloutWorkqueue: rolloutWorkqueue, experimentWorkqueue: experimentWorkqueue, + analysisRunWorkqueue: analysisRunWorkqueue, serviceWorkqueue: serviceWorkqueue, rolloutController: rolloutController, serviceController: serviceController, experimentController: experimentController, + analysisController: analysisController, } return cm @@ -147,15 +166,16 @@ func NewManager( // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. -func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, experimentThreadiness int, stopCh <-chan struct{}) error { +func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, experimentThreadiness, analysisThreadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() defer c.serviceWorkqueue.ShutDown() defer c.rolloutWorkqueue.ShutDown() defer c.experimentWorkqueue.ShutDown() + defer c.analysisRunWorkqueue.ShutDown() // Wait for the caches to be synced before starting workers log.Info("Waiting for controller's informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.serviceSynced, c.rolloutSynced, c.experimentSynced, c.replicasSetSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.serviceSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.replicasSetSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -164,6 +184,7 @@ func (c *Manager) Run(rolloutThreadiness, serviceThreadiness, experimentThreadin go wait.Until(func() { c.rolloutController.Run(rolloutThreadiness, stopCh) }, time.Second, stopCh) go wait.Until(func() { c.serviceController.Run(serviceThreadiness, stopCh) }, time.Second, stopCh) go wait.Until(func() { c.experimentController.Run(experimentThreadiness, stopCh) }, time.Second, stopCh) + go wait.Until(func() { c.analysisController.Run(analysisThreadiness, stopCh) }, time.Second, stopCh) log.Info("Started controller") go func() { diff --git a/docs/CONTRIBUTING.md b/docs/CONTRIBUTING.md index bad4e4041a..5f1217cf92 100644 --- a/docs/CONTRIBUTING.md +++ b/docs/CONTRIBUTING.md @@ -12,10 +12,17 @@ Install: * [kustomize](https://github.com/kubernetes-sigs/kustomize/releases) * [minikube](https://kubernetes.io/docs/setup/minikube/) or Docker for Desktop -Argo Rollout uses the `controller-gen` binary in order to auto-generate the crd manifest and golangci-lint to lint the project. Please run the following commands to install them: +Argo Rollout additionally uses +* `controller-gen` binary in order to auto-generate the crd manifest +* `golangci-lint` to lint the project. +* `mockery` to generate mock objects. -* `go get -u github.com/kubernetes-sigs/controller-tools/cmd/controller-gen` -* `go get -u github.com/golangci/golangci-lint/cmd/golangci-lint` +Run the following commands to install them: +```bash +go get -u github.com/kubernetes-sigs/controller-tools/cmd/controller-gen +go get -u github.com/golangci/golangci-lint/cmd/golangci-lint +go get -u github.com/vektra/mockery/.../ +``` Brew users can quickly install the lot: diff --git a/experiments/controller.go b/experiments/controller.go index ec1c2fe3c3..2895478796 100644 --- a/experiments/controller.go +++ b/experiments/controller.go @@ -31,7 +31,7 @@ type ExperimentController struct { // kubeclientset is a standard kubernetes clientset kubeclientset kubernetes.Interface // experimentsclientset is a clientset for our own API group - arogProjClientset clientset.Interface + argoProjClientset clientset.Interface // rsControl is used for adopting/releasing replica sets. replicaSetControl controller.RSControlInterface @@ -66,7 +66,7 @@ type ExperimentController struct { // NewExperimentController returns a new experiment controller func NewExperimentController( kubeclientset kubernetes.Interface, - arogProjClientset clientset.Interface, + argoProjClientset clientset.Interface, replicaSetInformer appsinformers.ReplicaSetInformer, rolloutsInformer informers.RolloutInformer, experimentsInformer informers.ExperimentInformer, @@ -83,7 +83,7 @@ func NewExperimentController( controller := &ExperimentController{ kubeclientset: kubeclientset, - arogProjClientset: arogProjClientset, + argoProjClientset: argoProjClientset, replicaSetControl: replicaSetControl, replicaSetLister: replicaSetInformer.Lister(), rolloutsLister: rolloutsInformer.Lister(), diff --git a/experiments/experiment.go b/experiments/experiment.go index add164f9d7..1aef757378 100644 --- a/experiments/experiment.go +++ b/experiments/experiment.go @@ -148,7 +148,7 @@ func (ec *ExperimentController) persistExperimentStatus(orig *v1alpha1.Experimen return nil } logCtx.Debugf("Experiment Patch: %s", patch) - _, err = ec.arogProjClientset.ArgoprojV1alpha1().Experiments(orig.Namespace).Patch(orig.Name, patchtypes.MergePatchType, patch) + _, err = ec.argoProjClientset.ArgoprojV1alpha1().Experiments(orig.Namespace).Patch(orig.Name, patchtypes.MergePatchType, patch) if err != nil { logCtx.Warningf("Error updating experiment: %v", err) return err diff --git a/experiments/replicaset.go b/experiments/replicaset.go index fa1d690525..0f3526d6fd 100644 --- a/experiments/replicaset.go +++ b/experiments/replicaset.go @@ -40,7 +40,7 @@ func (c *ExperimentController) getReplicaSetsForExperiment(experiment *v1alpha1. // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing ReplicaSets (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := c.arogProjClientset.ArgoprojV1alpha1().Experiments(experiment.Namespace).Get(experiment.Name, metav1.GetOptions{}) + fresh, err := c.argoProjClientset.ArgoprojV1alpha1().Experiments(experiment.Namespace).Get(experiment.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (ec *ExperimentController) reconcileReplicaSet(experiment *v1alpha1.Experim } patch := fmt.Sprintf(CollisionCountPatch, string(templateStatusBytes)) - _, patchErr := ec.arogProjClientset.ArgoprojV1alpha1().Experiments(experiment.Namespace).Patch(experiment.Name, patchtypes.MergePatchType, []byte(patch)) + _, patchErr := ec.argoProjClientset.ArgoprojV1alpha1().Experiments(experiment.Namespace).Patch(experiment.Name, patchtypes.MergePatchType, []byte(patch)) logCtx.WithField("patch", patch).Debug("Applied Patch") if patchErr != nil { logCtx.Errorf("Error patching service %s", err.Error()) diff --git a/manifests/crds/analysis-run-crd.yaml b/manifests/crds/analysis-run-crd.yaml index d5c973df45..a6488c6e18 100644 --- a/manifests/crds/analysis-run-crd.yaml +++ b/manifests/crds/analysis-run-crd.yaml @@ -87,8 +87,10 @@ spec: type: object status: properties: + message: + type: string metricResults: - additionalProperties: + items: properties: count: format: int32 @@ -136,11 +138,10 @@ spec: - name - status type: object - type: object + type: array status: type: string required: - - metricResults - status type: object required: diff --git a/manifests/install.yaml b/manifests/install.yaml index 0fef477026..bc48383989 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -88,8 +88,10 @@ spec: type: object status: properties: + message: + type: string metricResults: - additionalProperties: + items: properties: count: format: int32 @@ -137,11 +139,10 @@ spec: - name - status type: object - type: object + type: array status: type: string required: - - metricResults - status type: object required: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index f4fe801718..394a3367f4 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -88,8 +88,10 @@ spec: type: object status: properties: + message: + type: string metricResults: - additionalProperties: + items: properties: count: format: int32 @@ -137,11 +139,10 @@ spec: - name - status type: object - type: object + type: array status: type: string required: - - metricResults - status type: object required: diff --git a/pkg/apis/rollouts/v1alpha1/analysis_types.go b/pkg/apis/rollouts/v1alpha1/analysis_types.go index ef171358e4..2843a891d4 100644 --- a/pkg/apis/rollouts/v1alpha1/analysis_types.go +++ b/pkg/apis/rollouts/v1alpha1/analysis_types.go @@ -35,6 +35,10 @@ type Metric struct { // Interval defines the interval in seconds between each metric analysis // If omitted, will perform the metric analysis only once Interval *int32 `json:"interval,omitempty"` + // Count is the number of times to run measurement. If both interval and count are omitted, + // the effective count is 1. If only interval is specified, metric runs indefinitely. + // A count > 1 must specify an interval. + Count int32 `json:"count,omitempty"` // SuccessCondition is an expression which determines if a measurement is considered successful // Expression is a goevaluate expression. The keyword `result` is a variable reference to the // value of measurement. Results can be both structured data or primitive. @@ -47,8 +51,6 @@ type Metric struct { // If both success and failure conditions are specified, and the measurement does not fall into // either condition, the measurement is considered Inconclusive FailureCondition string `json:"failureCondition,omitempty"` - // Count is the number of times to run measurement. If omitted, runs indefinitely - Count int32 `json:"count,omitempty"` // MaxFailures is the maximum number of times the measurement is allowed to fail, before the // entire metric is considered failed (default: 0) MaxFailures int32 `json:"maxFailures,omitempty"` @@ -61,6 +63,21 @@ type Metric struct { Provider AnalysisProvider `json:"provider"` } +// EffectiveCount is the effective count based on whether or not count/interval is specified +// If neither count or interval is specified, the effective count is 1 +// If only interval is specified, metric runs indefinitely and there is no effective count (nil) +// Otherwise, it is the user specified value +func (m *Metric) EffectiveCount() *int32 { + if m.Count == 0 { + if m.Interval == nil { + one := int32(1) + return &one + } + return nil + } + return &m.Count +} + // AnalysisProvider which external system to use to verify the analysis // Only one of the fields in this struct should be non-nil type AnalysisProvider struct { @@ -140,8 +157,10 @@ type Argument struct { type AnalysisRunStatus struct { // Status is the status of the analysis run Status AnalysisStatus `json:"status"` - // Metrics contains the metrics collected during the run - MetricResults map[string]MetricResult `json:"metricResults"` + // Message is a message explaining current statuss + Message string `json:"message,omitempty"` + // MetricResults contains the metrics collected during the run + MetricResults []MetricResult `json:"metricResults,omitempty"` } // MetricResult contain a list of the most recent measurements for a single metric along with diff --git a/pkg/apis/rollouts/v1alpha1/openapi_generated.go b/pkg/apis/rollouts/v1alpha1/openapi_generated.go index 9e180b0e79..785cb0a4f8 100644 --- a/pkg/apis/rollouts/v1alpha1/openapi_generated.go +++ b/pkg/apis/rollouts/v1alpha1/openapi_generated.go @@ -245,11 +245,18 @@ func schema_pkg_apis_rollouts_v1alpha1_AnalysisRunStatus(ref common.ReferenceCal Format: "", }, }, + "message": { + SchemaProps: spec.SchemaProps{ + Description: "Message is a message explaining current statuss", + Type: []string{"string"}, + Format: "", + }, + }, "metricResults": { SchemaProps: spec.SchemaProps{ Description: "Metrics contains the metrics collected during the run", - Type: []string{"object"}, - AdditionalProperties: &spec.SchemaOrBool{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ Ref: ref("github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1.MetricResult"), @@ -259,7 +266,7 @@ func schema_pkg_apis_rollouts_v1alpha1_AnalysisRunStatus(ref common.ReferenceCal }, }, }, - Required: []string{"status", "metricResults"}, + Required: []string{"status"}, }, }, Dependencies: []string{ @@ -946,6 +953,13 @@ func schema_pkg_apis_rollouts_v1alpha1_Metric(ref common.ReferenceCallback) comm Format: "int32", }, }, + "count": { + SchemaProps: spec.SchemaProps{ + Description: "Count is the number of times to run measurement. If both interval and count are omitted, the effective count is 1. If only interval is specified, metric runs indefinitely. A count > 1 must specify an interval.", + Type: []string{"integer"}, + Format: "int32", + }, + }, "successCondition": { SchemaProps: spec.SchemaProps{ Description: "SuccessCondition is an expression which determines if a measurement is considered successful Expression is a goevaluate expression. The keyword `result` is a variable reference to the value of measurement. Results can be both structured data or primitive. Examples:\n result > 10\n (result.requests_made * result.requests_succeeded / 100) >= 90\n result IN (red, yellow)", @@ -960,13 +974,6 @@ func schema_pkg_apis_rollouts_v1alpha1_Metric(ref common.ReferenceCallback) comm Format: "", }, }, - "count": { - SchemaProps: spec.SchemaProps{ - Description: "Count is the number of times to run measurement. If omitted, runs indefinitely", - Type: []string{"integer"}, - Format: "int32", - }, - }, "maxFailures": { SchemaProps: spec.SchemaProps{ Description: "MaxFailures is the maximum number of times the measurement is allowed to fail, before the entire metric is considered failed (default: 0)", diff --git a/pkg/apis/rollouts/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/rollouts/v1alpha1/zz_generated.deepcopy.go index 89a36ec97b..f25cefeab2 100644 --- a/pkg/apis/rollouts/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/rollouts/v1alpha1/zz_generated.deepcopy.go @@ -144,9 +144,9 @@ func (in *AnalysisRunStatus) DeepCopyInto(out *AnalysisRunStatus) { *out = *in if in.MetricResults != nil { in, out := &in.MetricResults, &out.MetricResults - *out = make(map[string]MetricResult, len(*in)) - for key, val := range *in { - (*out)[key] = *val.DeepCopy() + *out = make([]MetricResult, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) } } return diff --git a/providers/mocks/Provider.go b/providers/mocks/Provider.go new file mode 100644 index 0000000000..e4aec05943 --- /dev/null +++ b/providers/mocks/Provider.go @@ -0,0 +1,68 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +import v1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" + +// Provider is an autogenerated mock type for the Provider type +type Provider struct { + mock.Mock +} + +// Resume provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Provider) Resume(_a0 v1alpha1.Metric, _a1 []v1alpha1.Argument, _a2 v1alpha1.Measurement) (v1alpha1.Measurement, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 v1alpha1.Measurement + if rf, ok := ret.Get(0).(func(v1alpha1.Metric, []v1alpha1.Argument, v1alpha1.Measurement) v1alpha1.Measurement); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Get(0).(v1alpha1.Measurement) + } + + var r1 error + if rf, ok := ret.Get(1).(func(v1alpha1.Metric, []v1alpha1.Argument, v1alpha1.Measurement) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Run provides a mock function with given fields: _a0, _a1 +func (_m *Provider) Run(_a0 v1alpha1.Metric, _a1 []v1alpha1.Argument) (v1alpha1.Measurement, error) { + ret := _m.Called(_a0, _a1) + + var r0 v1alpha1.Measurement + if rf, ok := ret.Get(0).(func(v1alpha1.Metric, []v1alpha1.Argument) v1alpha1.Measurement); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(v1alpha1.Measurement) + } + + var r1 error + if rf, ok := ret.Get(1).(func(v1alpha1.Metric, []v1alpha1.Argument) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Type provides a mock function with given fields: +func (_m *Provider) Type() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} diff --git a/providers/provider.go b/providers/provider.go index 774a18e61c..ba0527e269 100644 --- a/providers/provider.go +++ b/providers/provider.go @@ -14,7 +14,7 @@ type Provider interface { // Run start a new external system call for a measurement //idempotent and do nothing if a call has been started Run(v1alpha1.Metric, []v1alpha1.Argument) (v1alpha1.Measurement, error) - // Checks if the external system call is finished and returns the current measuremtn + // Checks if the external system call is finished and returns the current measurement Resume(v1alpha1.Metric, []v1alpha1.Argument, v1alpha1.Measurement) (v1alpha1.Measurement, error) // Type gets the provider type Type() string diff --git a/test/e2e/analysis-run.yaml b/test/e2e/analysis-run.yaml new file mode 100644 index 0000000000..6b6e6acd62 --- /dev/null +++ b/test/e2e/analysis-run.yaml @@ -0,0 +1,11 @@ +kind: AnalysisRun +apiVersion: argoproj.io/v1alpha1 +metadata: + generateName: sample-run- +spec: + analysisSpec: + metrics: + - name: success-rate + provider: + prometheus: + server: http://prometheus-operator-prometheus.prometheus-operator:9090 diff --git a/utils/analysis/analysis.go b/utils/analysis/analysis.go index b4c84a571c..57edf207f4 100644 --- a/utils/analysis/analysis.go +++ b/utils/analysis/analysis.go @@ -38,6 +38,16 @@ func ValidateMetric(metric v1alpha1.Metric) error { if metric.MaxConsecutiveErrors != nil && *metric.MaxConsecutiveErrors < 0 { return fmt.Errorf("maxConsecutiveErrors must be >= 0") } + numProviders := 0 + if metric.Provider.Prometheus != nil { + numProviders++ + } + if numProviders == 0 { + return fmt.Errorf("no provider specified") + } + if numProviders > 1 { + return fmt.Errorf("multiple providers specified") + } return nil } @@ -85,35 +95,45 @@ func IsFailing(run *v1alpha1.AnalysisRun) bool { return false } -// MetricResult returns the metric result by name -func MetricResult(run *v1alpha1.AnalysisRun, metricName string) *v1alpha1.MetricResult { - metricResult, ok := run.Status.MetricResults[metricName] - if !ok { - return nil +// GetResult returns the metric result by name +func GetResult(run *v1alpha1.AnalysisRun, metricName string) *v1alpha1.MetricResult { + for _, result := range run.Status.MetricResults { + if result.Name == metricName { + return &result + } } - return &metricResult + return nil +} + +// SetResult updates the metric result +func SetResult(run *v1alpha1.AnalysisRun, result v1alpha1.MetricResult) { + for i, r := range run.Status.MetricResults { + if r.Name == result.Name { + run.Status.MetricResults[i] = result + return + } + } + run.Status.MetricResults = append(run.Status.MetricResults, result) } // MetricCompleted returns whether or not a metric was completed or not func MetricCompleted(run *v1alpha1.AnalysisRun, metricName string) bool { - metricResult, ok := run.Status.MetricResults[metricName] - if !ok { - return false + if result := GetResult(run, metricName); result != nil { + return result.Status.Completed() } - return metricResult.Status.Completed() + return false } // LastMeasurement returns the last measurement started or completed for a specific metric func LastMeasurement(run *v1alpha1.AnalysisRun, metricName string) *v1alpha1.Measurement { - result, ok := run.Status.MetricResults[metricName] - if !ok { - return nil - } - totalMeasurements := len(result.Measurements) - if totalMeasurements == 0 { - return nil + if result := GetResult(run, metricName); result != nil { + totalMeasurements := len(result.Measurements) + if totalMeasurements == 0 { + return nil + } + return &result.Measurements[totalMeasurements-1] } - return &result.Measurements[totalMeasurements-1] + return nil } // ConsecutiveErrors returns number of most recent consecutive errors diff --git a/utils/analysis/analysis_test.go b/utils/analysis/analysis_test.go index 953a4725dc..3301d2dbf8 100644 --- a/utils/analysis/analysis_test.go +++ b/utils/analysis/analysis_test.go @@ -17,6 +17,9 @@ func TestValidateMetrics(t *testing.T) { Name: "success-rate", Count: 1, MaxFailures: 2, + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } @@ -31,6 +34,9 @@ func TestValidateMetrics(t *testing.T) { Count: 2, Interval: pointer.Int32Ptr(60), MaxFailures: 2, + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } @@ -50,6 +56,9 @@ func TestValidateMetrics(t *testing.T) { { Name: "success-rate", Count: 2, + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } @@ -61,9 +70,15 @@ func TestValidateMetrics(t *testing.T) { Metrics: []v1alpha1.Metric{ { Name: "success-rate", + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, { Name: "success-rate", + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } @@ -76,6 +91,9 @@ func TestValidateMetrics(t *testing.T) { { Name: "success-rate", MaxFailures: -1, + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } @@ -88,12 +106,27 @@ func TestValidateMetrics(t *testing.T) { { Name: "success-rate", MaxConsecutiveErrors: pointer.Int32Ptr(-1), + Provider: v1alpha1.AnalysisProvider{ + Prometheus: &v1alpha1.PrometheusMetric{}, + }, }, }, } err := ValidateAnalysisTemplateSpec(spec) assert.EqualError(t, err, "metrics[0]: maxConsecutiveErrors must be >= 0") } + { + spec := v1alpha1.AnalysisTemplateSpec{ + Metrics: []v1alpha1.Metric{ + { + Name: "success-rate", + Count: 1, + }, + }, + } + err := ValidateAnalysisTemplateSpec(spec) + assert.EqualError(t, err, "metrics[0]: no provider specified") + } } func TestIsWorst(t *testing.T) { @@ -122,47 +155,67 @@ func TestIsFailing(t *testing.T) { run := &v1alpha1.AnalysisRun{ Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "other-metric": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "other-metric", Status: v1alpha1.AnalysisStatusRunning, }, - "success-rate": { + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, }, }, }, } - successRate := run.Status.MetricResults["success-rate"] + successRate := run.Status.MetricResults[1] assert.False(t, IsFailing(run)) successRate.Status = v1alpha1.AnalysisStatusError - run.Status.MetricResults["success-rate"] = successRate + run.Status.MetricResults[1] = successRate assert.True(t, IsFailing(run)) successRate.Status = v1alpha1.AnalysisStatusFailed - run.Status.MetricResults["success-rate"] = successRate + run.Status.MetricResults[1] = successRate assert.True(t, IsFailing(run)) } -func TestMetricResult(t *testing.T) { +func TestGetResult(t *testing.T) { run := &v1alpha1.AnalysisRun{ Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, }, }, }, } - assert.Nil(t, MetricResult(run, "non-existent")) - assert.Equal(t, run.Status.MetricResults["success-rate"], *MetricResult(run, "success-rate")) + assert.Nil(t, GetResult(run, "non-existent")) + assert.Equal(t, run.Status.MetricResults[0], *GetResult(run, "success-rate")) +} + +func TestSetResult(t *testing.T) { + run := &v1alpha1.AnalysisRun{ + Status: &v1alpha1.AnalysisRunStatus{}, + } + res := v1alpha1.MetricResult{ + Name: "success-rate", + Status: v1alpha1.AnalysisStatusRunning, + } + + SetResult(run, res) + assert.Equal(t, res, run.Status.MetricResults[0]) + res.Status = v1alpha1.AnalysisStatusFailed + SetResult(run, res) + assert.Equal(t, res, run.Status.MetricResults[0]) } func TestMetricCompleted(t *testing.T) { run := &v1alpha1.AnalysisRun{ Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, }, }, @@ -171,7 +224,8 @@ func TestMetricCompleted(t *testing.T) { assert.False(t, MetricCompleted(run, "non-existent")) assert.False(t, MetricCompleted(run, "success-rate")) - run.Status.MetricResults["success-rate"] = v1alpha1.MetricResult{ + run.Status.MetricResults[0] = v1alpha1.MetricResult{ + Name: "success-rate", Status: v1alpha1.AnalysisStatusError, } assert.True(t, MetricCompleted(run, "success-rate")) @@ -189,8 +243,9 @@ func TestLastMeasurement(t *testing.T) { run := &v1alpha1.AnalysisRun{ Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "success-rate": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, Measurements: []v1alpha1.Measurement{m1, m2}, }, @@ -199,9 +254,9 @@ func TestLastMeasurement(t *testing.T) { } assert.Nil(t, LastMeasurement(run, "non-existent")) assert.Equal(t, m2, *LastMeasurement(run, "success-rate")) - successRate := run.Status.MetricResults["success-rate"] + successRate := run.Status.MetricResults[0] successRate.Measurements = []v1alpha1.Measurement{} - run.Status.MetricResults["success-rate"] = successRate + run.Status.MetricResults[0] = successRate assert.Nil(t, LastMeasurement(run, "success-rate")) } @@ -209,11 +264,13 @@ func TestIsTerminating(t *testing.T) { run := &v1alpha1.AnalysisRun{ Status: &v1alpha1.AnalysisRunStatus{ Status: v1alpha1.AnalysisStatusRunning, - MetricResults: map[string]v1alpha1.MetricResult{ - "other-metric": { + MetricResults: []v1alpha1.MetricResult{ + { + Name: "other-metric", Status: v1alpha1.AnalysisStatusRunning, }, - "success-rate": { + { + Name: "success-rate", Status: v1alpha1.AnalysisStatusRunning, }, }, @@ -223,9 +280,9 @@ func TestIsTerminating(t *testing.T) { run.Spec.Terminate = true assert.True(t, IsTerminating(run)) run.Spec.Terminate = false - successRate := run.Status.MetricResults["success-rate"] + successRate := run.Status.MetricResults[1] successRate.Status = v1alpha1.AnalysisStatusError - run.Status.MetricResults["success-rate"] = successRate + run.Status.MetricResults[1] = successRate assert.True(t, IsTerminating(run)) }