@@ -43,7 +43,8 @@ type DaemonSetJobControllerConfig struct {
43
43
44
44
// DaemonSetJobLabelKey defines the key of label to be injected by DaemonSetJob controller
45
45
const (
46
- DaemonSetJobLabelKey = "kubernetes.io/DaemonSetJob"
46
+ DaemonSetJobLabelKey = "kubernetes.io/DaemonSetJob"
47
+ DaemonSetJobMD5AnnotationKey = "workflow.k8s.io/jobspec-md5"
47
48
)
48
49
49
50
// DaemonSetJobController represents the DaemonSetJob controller
@@ -288,6 +289,11 @@ func (d *DaemonSetJobController) manageDaemonSetJob(daemonsetjob *dapi.DaemonSet
288
289
daemonsetjobComplete := false
289
290
daemonsetjobFailed := false
290
291
292
+ md5JobSpec , err := generateMD5JobSpec (& daemonsetjob .Spec .JobTemplate .Spec )
293
+ if err != nil {
294
+ return fmt .Errorf ("unable to generates the JobSpec MD5, %v" , err )
295
+ }
296
+
291
297
jobSelector := labels.Set {DaemonSetJobLabelKey : daemonsetjob .Name }
292
298
jobs , err := d .JobLister .List (jobSelector .AsSelectorPreValidated ())
293
299
jobsByNodeName := make (map [string ]* batch.Job )
@@ -306,20 +312,40 @@ func (d *DaemonSetJobController) manageDaemonSetJob(daemonsetjob *dapi.DaemonSet
306
312
}
307
313
errs := []error {}
308
314
allJobs := []* batch.Job {}
315
+ jobToDelete := []* batch.Job {} // corresponding to the all DaemonSetJob Spec
309
316
for _ , node := range nodes {
317
+ newJobCreation := false
310
318
job , ok := jobsByNodeName [node .Name ]
311
319
if ! ok {
320
+ glog .V (6 ).Infof ("Job for the node %s not found %s/%s" , node .Name , daemonsetjob .Namespace , daemonsetjob .Name )
321
+ newJobCreation = true
322
+ } else if job != nil {
323
+ if ! compareJobSpecMD5Hash (md5JobSpec , job ) {
324
+ glog .V (6 ).Infof ("JobTemplateSpec has changed, %s/%s, previousMD5:%s, current:%s" , job .Namespace , job .Name , md5JobSpec , job .GetAnnotations ()[DaemonSetJobMD5AnnotationKey ])
325
+ jobToDelete = append (jobToDelete , job )
326
+ newJobCreation = true
327
+ } else {
328
+ allJobs = append (allJobs , job )
329
+ }
330
+ }
331
+
332
+ if newJobCreation {
312
333
job , err = d .JobControl .CreateJobFromDaemonSetJob (daemonsetjob .Namespace , daemonsetjob .Spec .JobTemplate , daemonsetjob , node .Name )
313
334
if err != nil {
314
335
errs = append (errs , err )
315
336
}
316
337
daemonsetjobToBeUpdated = true
317
338
}
339
+ }
318
340
319
- if job != nil {
320
- allJobs = append (allJobs , job )
341
+ // Delete old jobs
342
+ for _ , job := range jobToDelete {
343
+ // TODO maybe wait that the job is finished (success or failure) before deleting, need to define policy configuration
344
+ // Or maybe wait that the job is finished before creating the new job
345
+ err = d .JobControl .DeleteJob (job .Namespace , job .Name , job )
346
+ if err != nil {
347
+ errs = append (errs , err )
321
348
}
322
-
323
349
}
324
350
325
351
// build status
@@ -337,7 +363,6 @@ func (d *DaemonSetJobController) manageDaemonSetJob(daemonsetjob *dapi.DaemonSet
337
363
daemonsetjob .Status .Failed = failedJobs
338
364
daemonsetjobToBeUpdated = true
339
365
}
340
-
341
366
updateDaemonSetJobStatusConditions (& daemonsetjob .Status , now , daemonsetjobComplete , daemonsetjobFailed )
342
367
if daemonsetjobComplete {
343
368
glog .Infof ("Workflow %s/%s complete." , daemonsetjob .Namespace , daemonsetjob .Name )
@@ -541,6 +566,13 @@ func IsDaemonSetJobFinished(d *dapi.DaemonSetJob) bool {
541
566
return false
542
567
}
543
568
569
+ // InferDaemonSetJobLabelSelectorForJobs returns labels.Selector corresponding to the associated Jobs
570
+ func InferDaemonSetJobLabelSelectorForJobs (daemonsetjob * dapi.DaemonSetJob ) labels.Selector {
571
+ set := fetchLabelsSetFromLabelSelector (daemonsetjob .Spec .Selector )
572
+ set [DaemonSetJobLabelKey ] = daemonsetjob .Name
573
+ return labels .SelectorFromSet (set )
574
+ }
575
+
544
576
// get daemonsetjob by key method
545
577
func (d * DaemonSetJobController ) getDaemonSetJobByKey (key string ) (* dapi.DaemonSetJob , error ) {
546
578
namespace , name , err := cache .SplitMetaNamespaceKey (key )
@@ -590,7 +622,7 @@ func (d *DaemonSetJobController) pastActiveDeadline(daemonsetjob *dapi.DaemonSet
590
622
func (d * DaemonSetJobController ) deleteDaemonSetJobJobs (daemonsetjob * dapi.DaemonSetJob ) error {
591
623
glog .V (6 ).Infof ("deleting all jobs for daemonsetjob %s/%s" , daemonsetjob .Namespace , daemonsetjob .Name )
592
624
593
- jobsSelector := inferDaemonSetJobLabelSelectorForJobs (daemonsetjob )
625
+ jobsSelector := InferDaemonSetJobLabelSelectorForJobs (daemonsetjob )
594
626
return deleteJobsFromLabelSelector (daemonsetjob .Namespace , jobsSelector , d .JobLister , d .JobControl )
595
627
}
596
628
@@ -656,9 +688,3 @@ func newDaemonSetJobStatusCondition(conditionType dapi.DaemonSetJobConditionType
656
688
Message : message ,
657
689
}
658
690
}
659
-
660
- func inferDaemonSetJobLabelSelectorForJobs (daemonsetjob * dapi.DaemonSetJob ) labels.Selector {
661
- set := fetchLabelsSetFromLabelSelector (daemonsetjob .Spec .Selector )
662
- set [DaemonSetJobLabelKey ] = daemonsetjob .Name
663
- return labels .SelectorFromSet (set )
664
- }
0 commit comments