Skip to content

Job controller optimization: reduce work duration time & minimize cache locking #132305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ type Controller struct {
// A store of pods, populated by the podController
podStore corelisters.PodLister

// podIndexer allows looking up pods by ControllerRef UID
podIndexer cache.Indexer

// Jobs that need to be updated
queue workqueue.TypedRateLimitingInterface[string]

Expand Down Expand Up @@ -223,6 +226,9 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced

controller.AddPodControllerUIDIndexer(podInformer.Informer())
jm.podIndexer = podInformer.Informer().GetIndexer()

jm.updateStatusHandler = jm.updateJobStatus
jm.patchJobHandler = jm.patchJob
jm.syncHandler = jm.syncJob
Expand Down Expand Up @@ -758,12 +764,13 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())

// list all pods managed by this Job using the pod indexer
pods, err := jm.getJobPodsByIndexer(ctx, j)
if err != nil {
return nil, err
}

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
Expand Down Expand Up @@ -799,6 +806,27 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
return pods, err
}

// getJobPodsByIndexer returns the set of pods that this Job should manage.
func (jm *Controller) getJobPodsByIndexer(ctx context.Context, j *batch.Job) ([]*v1.Pod, error) {
podsForJob := []*v1.Pod{}
for _, key := range []string{string(j.UID), controller.OrphanPodIndexKey} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the orphan pods get populated in the index, can you provide some reference?

pods, err := jm.podIndexer.ByIndex(controller.PodControllerUIDIndex, key)
if err != nil {
return nil, err
}

for _, obj := range pods {
pod, ok := obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type in pod indexer: %v", obj))
continue
}
podsForJob = append(podsForJob, pod)
}
}
return podsForJob, nil
}

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
Expand Down