Skip to content

Commit

Permalink
Implements PendingJobCount to fix #1323 (#1391)
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Lamure <thomas.lamure@eridanis.com>
  • Loading branch information
thomas-lamure authored Dec 11, 2020
1 parent 181fc41 commit 85e90e8
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Bug fix in aws_iam_authorization to utilize correct secret from env key name ([PR #1332](https://github.com/kedacore/keda/pull/1332))
- Add metricName field to postgres scaler and auto generate if not defined. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323))

### Breaking Changes

Expand Down
61 changes: 55 additions & 6 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

runningJobCount := e.getRunningJobCount(scaledJob)
pendingJobCount := e.getPendingJobCount(scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)

effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, scaledJob.MaxReplicaCount())
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())

if effectiveMaxScale < 0 {
effectiveMaxScale = 0
Expand Down Expand Up @@ -142,6 +144,53 @@ func (e *scaleExecutor) getRunningJobCount(scaledJob *kedav1alpha1.ScaledJob) in
return runningJobs
}

func (e *scaleExecutor) isAnyPodRunningOrCompleted(j *batchv1.Job) bool {
opts := []client.ListOption{
client.InNamespace(j.GetNamespace()),
client.MatchingLabels(map[string]string{"job-name": j.GetName()}),
}

pods := &corev1.PodList{}
err := e.client.List(context.TODO(), pods, opts...)

if err != nil {
return false
}

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return true
}
}

return false
}

func (e *scaleExecutor) getPendingJobCount(scaledJob *kedav1alpha1.ScaledJob) int64 {
var pendingJobs int64

opts := []client.ListOption{
client.InNamespace(scaledJob.GetNamespace()),
client.MatchingLabels(map[string]string{"scaledjob": scaledJob.GetName()}),
}

jobs := &batchv1.JobList{}
err := e.client.List(context.TODO(), jobs, opts...)

if err != nil {
return 0
}

for _, job := range jobs.Items {
job := job
if !e.isJobFinished(&job) && !e.isAnyPodRunningOrCompleted(&job) {
pendingJobs++
}
}

return pendingJobs
}

// Clean up will delete the jobs that is exceed historyLimit
func (e *scaleExecutor) cleanUp(scaledJob *kedav1alpha1.ScaledJob) error {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
Expand Down Expand Up @@ -261,13 +310,13 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
return maxScale - runningJobCount
}

Expand All @@ -276,18 +325,18 @@ type customScalingStrategy struct {
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, maxReplicaCount int64) int64 {
func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
}
return maxScale
return maxScale - pendingJobCount
}

func min(x, y int64) int64 {
Expand Down
26 changes: 16 additions & 10 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
}

func TestCustomScalingStrategy(t *testing.T) {
Expand All @@ -75,13 +76,14 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingRunningJobPercentage := "0.5"
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 10))
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 0, 10))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 5))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
Expand All @@ -93,21 +95,25 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 4))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 0, 4))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 0, 5))

// Test with 2 pending jobs
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 4, 2, 10))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(5, 4, 2, 5))
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
Expand Down

0 comments on commit 85e90e8

Please sign in to comment.