Skip to content

Commit

Permalink
[BUG-5656] Annotate Jobs with parent ScaledJob generation (#5876)
Browse files Browse the repository at this point in the history
* Annotate Jobs with parent ScaledJob generation

Signed-off-by: Josef Karasek <josef@kedify.io>

* fix tests

Signed-off-by: Josef Karasek <josef@kedify.io>

* fix lint

Signed-off-by: Josef Karasek <josef@kedify.io>

* fix log message

Signed-off-by: Josef Karasek <josef@kedify.io>

* update changelog

Signed-off-by: Josef Karasek <josef@kedify.io>

* update changelog

Signed-off-by: Josef Karasek <josef@kedify.io>

* update changelog

Signed-off-by: Josef Karasek <josef@kedify.io>

---------

Signed-off-by: Josef Karasek <josef@kedify.io>
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
josefkarasek and zroubalik authored Jul 30, 2024
1 parent 1d51361 commit 7ca6708
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Here is an overview of all new **experimental** features:
- **Datadog Scaler**: Add support to use the Cluster Agent as source of metrics ([#5355](https://github.com/kedacore/keda/issues/5355))

### Improvements

- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114))
- **General**: Do not delete running Jobs on KEDA restart ([#5656](https://github.com/kedacore/keda/issues/5656))
- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
Expand Down
38 changes: 26 additions & 12 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,36 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
return "Cannot get list of Jobs owned by this scaledJob", err
}

if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items))
jobIndexes := make([]int, 0, len(jobs.Items))
scaledJobGeneration := strconv.FormatInt(scaledJob.Generation, 10)
for i, job := range jobs.Items {
if jobGen, ok := job.Annotations["scaledjob.keda.sh/generation"]; !ok {
// delete Jobs that don't have the generation annotation
jobIndexes = append(jobIndexes, i)
} else if jobGen != scaledJobGeneration {
// delete Jobs that have a different generation annotation
jobIndexes = append(jobIndexes, i)
}
}
for _, job := range jobs.Items {
job := job

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
if len(jobIndexes) == 0 {
logger.Info("RolloutStrategy: immediate, No jobs owned by the previous version of the scaledJob")
} else {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobIndexes))
for _, index := range jobIndexes {
job := jobs.Items[index]

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobIndexes)), nil
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
}

func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
if maxScale <= 0 {
logger.Info("No need to create jobs - all requested jobs already exist", "jobs", maxScale)
return
}
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
Expand Down Expand Up @@ -137,14 +141,21 @@ func (e *scaleExecutor) generateJobs(logger logr.Logger, scaledJob *kedav1alpha1
labels[key] = value
}

annotations := map[string]string{
"scaledjob.keda.sh/generation": strconv.FormatInt(scaledJob.Generation, 10),
}
for key, value := range scaledJob.ObjectMeta.Annotations {
annotations[key] = value
}

jobs := make([]*batchv1.Job, int(scaleTo))
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
Annotations: scaledJob.ObjectMeta.Annotations,
Annotations: annotations,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,11 @@ func TestCreateJobs(t *testing.T) {

func TestGenerateJobs(t *testing.T) {
var (
expectedAnnotations = map[string]string{"test": "test"}
expectedLabels = map[string]string{
expectedAnnotations = map[string]string{
"test": "test",
"scaledjob.keda.sh/generation": "0",
}
expectedLabels = map[string]string{
"app.kubernetes.io/managed-by": "keda-operator",
"app.kubernetes.io/name": "test",
"app.kubernetes.io/part-of": "test",
Expand Down

0 comments on commit 7ca6708

Please sign in to comment.