Skip to content

Commit

Permalink
Fix enqueue logic (argoproj#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen authored and dthomson25 committed Oct 30, 2019
1 parent 37bc1f7 commit c25c23f
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 12 deletions.
24 changes: 14 additions & 10 deletions experiments/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ func (ec *experimentContext) reconcile() *v1alpha1.ExperimentStatus {
ec.reconcileAnalysisRun(analysis)
}

if duration := calculateEnqueueDuration(ec.ex); duration != nil {
newStatus := ec.calculateStatus()
if duration := calculateEnqueueDuration(ec.ex, newStatus); duration != nil {
ec.log.Infof("Enqueueing Experiment in %s seconds", duration.String())
ec.enqueueExperimentAfter(ec.ex, *duration)
}
return ec.calculateStatus()
return newStatus
}

// reconcileTemplate reconciles a template to a ReplicaSet. Creates or scales them down as necessary
Expand Down Expand Up @@ -205,29 +206,32 @@ func (ec *experimentContext) reconcileTemplate(template v1alpha1.TemplateSpec) {
// * status.availableAt + spec.duration
// * status.templateStatuses[].lastTransitionTime + spec.progressDeadlineSeconds
// Returns nil if there is no need to requeue
func calculateEnqueueDuration(ex *v1alpha1.Experiment) *time.Duration {
if !experimentutil.HasStarted(ex) {
return nil
}
func calculateEnqueueDuration(ex *v1alpha1.Experiment, newStatus *v1alpha1.ExperimentStatus) *time.Duration {
ex = ex.DeepCopy()
ex.Status = *(newStatus.DeepCopy())
if experimentutil.IsTerminating(ex) {
return nil
}
var candidateDuration *time.Duration
if ex.Status.AvailableAt != nil && ex.Spec.Duration != nil {
// Set candidate duration to status.availableAt + duration
passedDuration, timeRemaining := experimentutil.PassedDurations(ex)
if passedDuration {
if !passedDuration {
candidateDuration = &timeRemaining
}
}
deadlineSeconds := defaults.GetExperimentProgressDeadlineSecondsOrDefault(ex)
now := time.Now()
for _, ts := range ex.Status.TemplateStatuses {
for _, template := range ex.Spec.Templates {
// Set candidate to the earliest of LastTransitionTime + progressDeadlineSeconds
if ts.Status != v1alpha1.TemplateStatusProgressing && ts.Status != v1alpha1.TemplateStatusRunning {
ts := experimentutil.GetTemplateStatus(ex.Status, template.Name)
if ts == nil || (ts.Status != v1alpha1.TemplateStatusProgressing && ts.Status != v1alpha1.TemplateStatusRunning) {
continue
}
if ts.LastTransitionTime != nil {
desiredReplicaCount := experimentutil.CalculateTemplateReplicasCount(ex, template)
// only requeue if we are not meeting our desired replicas, since if we are at our desired
// replicas, then theres nothing to check on
if ts.AvailableReplicas != desiredReplicaCount && ts.LastTransitionTime != nil {
progressDeadlineDuration := ts.LastTransitionTime.Add(time.Second * time.Duration(deadlineSeconds)).Sub(now)
if candidateDuration == nil || progressDeadlineDuration < *candidateDuration {
candidateDuration = &progressDeadlineDuration
Expand Down
69 changes: 67 additions & 2 deletions experiments/experiment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package experiments
import (
"errors"
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -203,18 +204,82 @@ func TestSuccessAfterDurationPasses(t *testing.T) {
assert.Equal(t, expectedPatch, patch)
}

// TestDontRequeueWithoutDuration verifies we don't enter a hot loop because we keep requeuing
// TestDontRequeueWithoutDuration verifies we don't requeue if an experiment does not have
// spec.duration set, and is running properly, since would cause a hot loop.
func TestDontRequeueWithoutDuration(t *testing.T) {
templates := generateTemplates("bar")
ex := newExperiment("foo", templates, nil)
ex.Status.AvailableAt = &metav1.Time{Time: metav1.Now().Add(-10 * time.Second)}
ex.Status.TemplateStatuses = []v1alpha1.TemplateStatus{
generateTemplatesStatus("bar", 1, 1, v1alpha1.TemplateStatusRunning, now()),
}
exCtx := newTestContext(ex)
rs1 := templateToRS(ex, ex.Spec.Templates[0], 1)
exCtx.templateRSs = map[string]*appsv1.ReplicaSet{
"bar": rs1,
}
fakeClient := exCtx.kubeclientset.(*k8sfake.Clientset)
fakeClient.Tracker().Add(rs1)
enqueueCalled := false
exCtx.enqueueExperimentAfter = func(obj interface{}, duration time.Duration) {
enqueueCalled = true
}
exCtx.reconcile()
newStatus := exCtx.reconcile()
assert.False(t, enqueueCalled)
assert.Equal(t, v1alpha1.AnalysisStatusRunning, newStatus.Status)
}

// TestRequeueAfterDuration verifies we requeue after an appropriate status.availableAt + spec.duration
func TestRequeueAfterDuration(t *testing.T) {
templates := generateTemplates("bar")
ex := newExperiment("foo", templates, nil)
ex.Spec.Duration = pointer.Int32Ptr(30)
ex.Status.AvailableAt = &metav1.Time{Time: metav1.Now().Add(-10 * time.Second)}
ex.Status.TemplateStatuses = []v1alpha1.TemplateStatus{
generateTemplatesStatus("bar", 1, 1, v1alpha1.TemplateStatusRunning, now()),
}
exCtx := newTestContext(ex)
rs1 := templateToRS(ex, ex.Spec.Templates[0], 1)
exCtx.templateRSs = map[string]*appsv1.ReplicaSet{
"bar": rs1,
}
enqueueCalled := false
exCtx.enqueueExperimentAfter = func(obj interface{}, duration time.Duration) {
enqueueCalled = true
// ensures we are enqueued around ~20 seconds
twentySeconds := time.Second * time.Duration(20)
delta := math.Abs(float64(twentySeconds - duration))
assert.True(t, delta < float64(100*time.Millisecond), "")
}
exCtx.reconcile()
assert.True(t, enqueueCalled)
}

// TestRequeueAfterProgressDeadlineSeconds verifies we requeue at an appropriate
// lastTransitionTime + spec.progressDeadlineSeconds
func TestRequeueAfterProgressDeadlineSeconds(t *testing.T) {
templates := generateTemplates("bar")
ex := newExperiment("foo", templates, nil)
ex.Status.TemplateStatuses = []v1alpha1.TemplateStatus{
generateTemplatesStatus("bar", 0, 0, v1alpha1.TemplateStatusProgressing, now()),
}
now := metav1.Now()
ex.Status.TemplateStatuses[0].LastTransitionTime = &now
exCtx := newTestContext(ex)
rs1 := templateToRS(ex, ex.Spec.Templates[0], 0)
exCtx.templateRSs = map[string]*appsv1.ReplicaSet{
"bar": rs1,
}
enqueueCalled := false
exCtx.enqueueExperimentAfter = func(obj interface{}, duration time.Duration) {
enqueueCalled = true
// ensures we are enqueued around 10 minutes
tenMinutes := time.Second * time.Duration(600)
delta := math.Abs(float64(tenMinutes - duration))
assert.True(t, delta < float64(100*time.Millisecond))
}
exCtx.reconcile()
assert.True(t, enqueueCalled)
}

func TestFailReplicaSetCreation(t *testing.T) {
Expand Down

0 comments on commit c25c23f

Please sign in to comment.