Skip to content

Commit

Permalink
Integrate analysis controller with provider interfaces (argoproj#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen authored Sep 30, 2019
1 parent fbb05e1 commit c4690dd
Show file tree
Hide file tree
Showing 24 changed files with 725 additions and 166 deletions.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ endif
all: controller image

.PHONY: codegen
codegen:
codegen: mocks
./hack/update-codegen.sh
./hack/update-openapigen.sh
go run ./hack/gen-crd-spec/main.go
Expand Down Expand Up @@ -77,6 +77,10 @@ lint:
test:
go test -v -covermode=count -coverprofile=coverage.out `go list ./...`

.PHONY: mocks
mocks:
mockery -dir ./providers -name Provider -output ./providers/mocks

.PHONY: manifests
manifests:
./hack/update-manifests.sh
Expand Down
180 changes: 146 additions & 34 deletions analysis/analysis.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package analysis

import (
"fmt"
"sync"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
analysisutil "github.com/argoproj/argo-rollouts/utils/analysis"
Expand All @@ -15,6 +18,15 @@ const (
// DefaultMaxConsecutiveErrors is the default number times a metric can error in sequence before
// erroring the entire metric.
DefaultMaxConsecutiveErrors = 4
// DefaultErrorRetryInterval is the default interval to retry a measurement upon error, in the
// event an interval was not specified
DefaultErrorRetryInterval int32 = 10
)

// Event reasons for analysis events
const (
EventReasonStatusFailed = "Failed"
EventReasonStatusCompleted = "Complete"
)

// metricTask holds the metric which need to be measured during this reconciliation along with
Expand All @@ -24,32 +36,56 @@ type metricTask struct {
incompleteMeasurement *v1alpha1.Measurement
}

func (c *AnalysisController) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) error {
if origRun.Status.Status.Completed() {
return nil
func (c *AnalysisController) reconcileAnalysisRun(origRun *v1alpha1.AnalysisRun) *v1alpha1.AnalysisRun {
if origRun.Status != nil && origRun.Status.Status.Completed() {
return origRun
}
log := logutil.WithAnalysisRun(origRun)
run := origRun.DeepCopy()
if run.Status.Status == "" {

if run.Status == nil {
run.Status = &v1alpha1.AnalysisRunStatus{
MetricResults: make([]v1alpha1.MetricResult, 0),
}
err := analysisutil.ValidateAnalysisTemplateSpec(run.Spec.AnalysisSpec)
if err != nil {
log.Warnf("analysis spec invalid: %v", err)
message := fmt.Sprintf("analysis spec invalid: %v", err)
log.Warn(message)
run.Status.Status = v1alpha1.AnalysisStatusError
run.Status.Message = message
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", run.Status.Status)
return run
}
return nil
}
tasks := generateMetricTasks(run)
log.Infof("taking %d measurements", len(tasks))
c.runMeasurements(run, tasks)

runMeasurements(run, tasks)

asssessRunStatus(run)
newStatus := c.asssessRunStatus(run)
if newStatus != run.Status.Status {
message := fmt.Sprintf("analysis transitioned from %s -> %s", run.Status.Status, newStatus)
if newStatus.Completed() {
switch newStatus {
case v1alpha1.AnalysisStatusError, v1alpha1.AnalysisStatusFailed:
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "analysis completed %s", newStatus)
default:
c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "analysis completed %s", newStatus)
}
}
log.Info(message)
run.Status.Status = newStatus
}

nextReconcileTime := calculateNextReconcileTime(run)
if nextReconcileTime != nil {
enqueueSeconds := nextReconcileTime.Sub(time.Now())
if enqueueSeconds < 0 {
enqueueSeconds = 0
}
log.Infof("enqueuing analysis after %v", enqueueSeconds)
c.enqueueAnalysisAfter(run, enqueueSeconds)
}
return nil
return run
}

// generateMetricTasks generates a list of metrics tasks needed to be measured as part of this
Expand Down Expand Up @@ -84,12 +120,20 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
log.WithField("metric", metric.Name).Infof("running initial measurement")
continue
}
if metric.Interval == nil {
// a measurement was already taken, and reoccurrence was not desired
metricResult := analysisutil.GetResult(run, metric.Name)
effectiveCount := metric.EffectiveCount()
if effectiveCount != nil && metricResult.Count >= *effectiveCount {
// we have reached desired count
continue
}
if time.Now().After(lastMeasurement.FinishedAt.Add(time.Duration(*metric.Interval) * time.Second)) {
// we are due for a measurement
// if we get here, we know we need to take a measurement (eventually). check last measurement
// to decide if it should be taken now. metric.Interval can be null because we may be
// retrying a metric due to error.
interval := DefaultErrorRetryInterval
if metric.Interval != nil {
interval = *metric.Interval
}
if time.Now().After(lastMeasurement.FinishedAt.Add(time.Duration(interval) * time.Second)) {
tasks = append(tasks, metricTask{metric: metric})
log.WithField("metric", metric.Name).Infof("running overdue measurement")
continue
Expand All @@ -99,26 +143,49 @@ func generateMetricTasks(run *v1alpha1.AnalysisRun) []metricTask {
}

// runMeasurements iterates a list of metric tasks, and runs or resumes measurements
func runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) {
func (c *AnalysisController) runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) {
var wg sync.WaitGroup
// resultsLock should be held whenever we are accessing or setting status.metricResults since
// we are performing queries in parallel
var resultsLock sync.Mutex
for _, task := range tasks {
wg.Add(1)
//var provider provider.MetricProvider
//provider = provider.NewProvider(task.metric)

//go func(p provider.Provider, t metricTask) {
go func(p interface{}, t metricTask) {
go func(t metricTask) {
defer wg.Done()

log := logutil.WithAnalysisRun(run).WithField("metric", t.metric.Name)

resultsLock.Lock()
metricResult := analysisutil.GetResult(run, t.metric.Name)
resultsLock.Unlock()
if metricResult == nil {
metricResult = &v1alpha1.MetricResult{}
}

var newMeasurement v1alpha1.Measurement
metricResult := run.Status.MetricResults[t.metric.Name]
if t.incompleteMeasurement == nil {
// newMeasurement = p.Run(metric)
metricResult.Measurements = append(metricResult.Measurements, newMeasurement)
provider, err := c.newProvider(*log, t.metric)
if err != nil {
if t.incompleteMeasurement != nil {
newMeasurement = *t.incompleteMeasurement
} else {
startedAt := metav1.Now()
newMeasurement.StartedAt = &startedAt
}
newMeasurement.Status = v1alpha1.AnalysisStatusError
} else {
// newMeasurement = p.Resume(metric, measurement)
metricResult.Measurements[len(metricResult.Measurements)-1] = newMeasurement
if t.incompleteMeasurement == nil {
newMeasurement, err = provider.Run(t.metric, run.Spec.Arguments)
} else {
newMeasurement, err = provider.Resume(t.metric, run.Spec.Arguments, *t.incompleteMeasurement)
}
}

if newMeasurement.Status.Completed() {
if newMeasurement.FinishedAt == nil {
finishedAt := metav1.Now()
newMeasurement.FinishedAt = &finishedAt
}
switch newMeasurement.Status {
case v1alpha1.AnalysisStatusSuccessful:
metricResult.Successful++
Expand All @@ -133,21 +200,50 @@ func runMeasurements(run *v1alpha1.AnalysisRun, tasks []metricTask) {
metricResult.Error++
}
}
}(nil, task)
//}(provider, task)
if t.incompleteMeasurement == nil {
metricResult.Measurements = append(metricResult.Measurements, newMeasurement)
} else {
metricResult.Measurements[len(metricResult.Measurements)-1] = newMeasurement
}
if err != nil {
metricResult.Message = err.Error()
} else {
metricResult.Message = ""
}
metricResult.Name = t.metric.Name
resultsLock.Lock()
analysisutil.SetResult(run, *metricResult)
resultsLock.Unlock()

}(task)
}
wg.Wait()
}

// asssessRunStatus assesses the overall status of this AnalysisRun
// If any metric is not yet completed, the AnalysisRun is still considered Running
// Once all metrics are complete, the worst status is used as the overall AnalysisRun status
func asssessRunStatus(run *v1alpha1.AnalysisRun) v1alpha1.AnalysisStatus {
func (c *AnalysisController) asssessRunStatus(run *v1alpha1.AnalysisRun) v1alpha1.AnalysisStatus {
var worstStatus v1alpha1.AnalysisStatus
terminating := analysisutil.IsTerminating(run)

for _, metric := range run.Spec.AnalysisSpec.Metrics {
if result, ok := run.Status.MetricResults[metric.Name]; ok {
metricStatus := assessMetricStatus(metric, result, terminating)
if result := analysisutil.GetResult(run, metric.Name); result != nil {
log := logutil.WithAnalysisRun(run).WithField("metric", metric.Name)
metricStatus := assessMetricStatus(metric, *result, terminating)
if result.Status != metricStatus {
log.Infof("metric transitioned from %s -> %s", result.Status, metricStatus)
if metricStatus.Completed() {
switch metricStatus {
case v1alpha1.AnalysisStatusError, v1alpha1.AnalysisStatusFailed:
c.recorder.Eventf(run, corev1.EventTypeWarning, EventReasonStatusFailed, "metric '%s' completed %s", metric.Name, metricStatus)
default:
c.recorder.Eventf(run, corev1.EventTypeNormal, EventReasonStatusCompleted, "metric '%s' completed %s", metric.Name, metricStatus)
}
}
result.Status = metricStatus
analysisutil.SetResult(run, *result)
}
if !metricStatus.Completed() {
// if any metric is not completed, then entire analysis run is considered running
return v1alpha1.AnalysisStatusRunning
Expand Down Expand Up @@ -204,7 +300,8 @@ func assessMetricStatus(metric v1alpha1.Metric, result v1alpha1.MetricResult, te
// Error and Failed counters are ignored because those checks have already been taken into
// consideration above, and we do not want to fail the metric if failures < maxFailures.
// TODO(jessesuen): may need to tweak this logic
if metric.Count > 0 && result.Count >= metric.Count {
effectiveCount := metric.EffectiveCount()
if effectiveCount != nil && result.Count >= *effectiveCount {
var status v1alpha1.AnalysisStatus
if result.Successful > result.Inconclusive {
status = v1alpha1.AnalysisStatusSuccessful
Expand Down Expand Up @@ -244,12 +341,27 @@ func calculateNextReconcileTime(run *v1alpha1.AnalysisRun) *time.Time {
// TODO(jessesuen) perhaps ask provider for an appropriate time to poll?
continue
}
if metric.Interval == nil {
// a measurement was already taken, and reoccurrence was not desired
metricResult := analysisutil.GetResult(run, metric.Name)
effectiveCount := metric.EffectiveCount()
if effectiveCount != nil && metricResult.Count >= *effectiveCount {
// we have reached desired count
continue
}
var interval int32
if metric.Interval != nil {
interval = *metric.Interval
} else if lastMeasurement.Status == v1alpha1.AnalysisStatusError {
interval = DefaultErrorRetryInterval
} else {
// if we get here, an interval was not set (meaning reoccurrence was not desired), and
// there was no error (meaning we don't need to retry). no need to requeue this metric.
// NOTE: we shouldn't ever get here since it means we are not doing proper bookkeeping
// of count.
log.WithField("metric", metric.Name).Warnf("skipping requeue. no interval or error (count: %d, effectiveCount: %d)", metricResult.Count, metric.EffectiveCount())
continue
}
// Take the earliest time of all metrics
metricReconcileTime := lastMeasurement.FinishedAt.Add(time.Duration(*metric.Interval) * time.Second)
metricReconcileTime := lastMeasurement.FinishedAt.Add(time.Duration(interval) * time.Second)
if reconcileTime == nil || reconcileTime.After(metricReconcileTime) {
reconcileTime = &metricReconcileTime
}
Expand Down
Loading

0 comments on commit c4690dd

Please sign in to comment.