Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG-5656] Annotate Jobs with parent ScaledJob generation #5876

Merged
merged 8 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Do not delete running Jobs on KEDA restart ([#5656](https://github.com/kedacore/keda/issues/5656))
- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
Expand Down
38 changes: 26 additions & 12 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,36 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
return "Cannot get list of Jobs owned by this scaledJob", err
}

if len(jobs.Items) > 0 {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobs.Items))
jobIndexes := make([]int, 0, len(jobs.Items))
scaledJobGeneration := strconv.FormatInt(scaledJob.Generation, 10)
for i, job := range jobs.Items {
if jobGen, ok := job.Annotations["scaledjob.keda.sh/generation"]; !ok {
// delete Jobs that don't have the generation annotation
jobIndexes = append(jobIndexes, i)
} else if jobGen != scaledJobGeneration {
// delete Jobs that have a different generation annotation
jobIndexes = append(jobIndexes, i)
}
}
for _, job := range jobs.Items {
job := job

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
if len(jobIndexes) == 0 {
logger.Info("RolloutStrategy: immediate, No jobs owned by the previous version of the scaledJob")
} else {
logger.Info("RolloutStrategy: immediate, Deleting jobs owned by the previous version of the scaledJob", "numJobsToDelete", len(jobIndexes))
for _, index := range jobIndexes {
job := jobs.Items[index]

propagationPolicy := metav1.DeletePropagationBackground
if scaledJob.Spec.Rollout.PropagationPolicy == "foreground" {
propagationPolicy = metav1.DeletePropagationForeground
}
err = r.Client.Delete(ctx, &job, client.PropagationPolicy(propagationPolicy))
if err != nil {
return "Not able to delete job: " + job.Name, err
}
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobIndexes)), nil
}
return fmt.Sprintf("RolloutStrategy: immediate, deleted jobs owned by the previous version of the scaleJob: %d jobs deleted", len(jobs.Items)), nil
}
return fmt.Sprintf("RolloutStrategy: %s", scaledJob.Spec.RolloutStrategy), nil
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
}

func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
if maxScale <= 0 {
logger.Info("No need to create jobs - all requested jobs already exist", "jobs", maxScale)
return
}
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
Expand Down Expand Up @@ -137,14 +141,21 @@ func (e *scaleExecutor) generateJobs(logger logr.Logger, scaledJob *kedav1alpha1
labels[key] = value
}

annotations := map[string]string{
"scaledjob.keda.sh/generation": strconv.FormatInt(scaledJob.Generation, 10),
}
for key, value := range scaledJob.ObjectMeta.Annotations {
annotations[key] = value
}

jobs := make([]*batchv1.Job, int(scaleTo))
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
Annotations: scaledJob.ObjectMeta.Annotations,
Annotations: annotations,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,11 @@ func TestCreateJobs(t *testing.T) {

func TestGenerateJobs(t *testing.T) {
var (
expectedAnnotations = map[string]string{"test": "test"}
expectedLabels = map[string]string{
expectedAnnotations = map[string]string{
"test": "test",
"scaledjob.keda.sh/generation": "0",
}
expectedLabels = map[string]string{
"app.kubernetes.io/managed-by": "keda-operator",
"app.kubernetes.io/name": "test",
"app.kubernetes.io/part-of": "test",
Expand Down
Loading