Skip to content

Commit

Permalink
Implements PendingJobCount to fix kedacore#1323
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Lamure <thomas.lamure@eridanis.com>
  • Loading branch information
thomas-lamure committed Dec 2, 2020
1 parent af2b36f commit c7311aa
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 16 deletions.
61 changes: 55 additions & 6 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

runningJobCount := e.getRunningJobCount(scaledJob)
pendingJobCount := e.getPendingJobCount(scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)

effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, scaledJob.MaxReplicaCount())
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())

if effectiveMaxScale < 0 {
effectiveMaxScale = 0
Expand Down Expand Up @@ -139,6 +141,53 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in
return runningJobs
}

func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
opts := []client.ListOption{
client.InNamespace(j.GetNamespace()),
client.MatchingLabels(map[string]string{"job-name": j.GetName()}),
}

pods := &corev1.PodList{}
err := e.client.List(context.TODO(), pods, opts...)

if err != nil {
return false
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return true
}
}

return false
}

func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 {
var pendingJobs int64

opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob": scaledJob.GetName()}),
}

jobs := &batchv1.JobList{}
err := e.client.List(context.TODO(), jobs, opts...)

if err != nil {
return 0
}

for _, job := range jobs.Items {
job := job
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++
}
}

return pendingJobs
}

// Clean up will delete the jobs that is exceed historyLimit
func (e *scaleExecutor) cleanUp(scaledJob *kedav1alpha1.ScaledJob) error {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
Expand Down Expand Up @@ -258,13 +307,13 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
return maxScale - runningJobCount
}

Expand All @@ -273,18 +322,18 @@ type customScalingStrategy struct {
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
}
return maxScale
return maxScale - pendingJobCount
}

func min(x, y int64) int64 {
Expand Down
26 changes: 16 additions & 10 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
}

func TestCustomScalingStrategy(t *testing.T) {
Expand All @@ -71,13 +72,14 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingRunningJobPercentage := "0.5"
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 10))
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 0, 10))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
Expand All @@ -89,21 +91,25 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 4))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 0, 4))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 0, 5))

// Test with 2 pending jobs
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 4, 2, 10))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(5, 4, 2, 5))
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
Expand Down

0 comments on commit c7311aa

Please sign in to comment.