Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 15 additions & 53 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package job
import (
"context"
"fmt"
"hash/fnv"
"sync"
"time"

Expand Down Expand Up @@ -136,7 +135,7 @@ type jobcontroller struct {
queueSynced func() bool

// queue that need to sync up
queueList []workqueue.TypedRateLimitingInterface[any]
queue workqueue.TypedRateLimitingInterface[any]
commandQueue workqueue.TypedRateLimitingInterface[any]
cache jobcache.Cache
// Job Event recorder
Expand Down Expand Up @@ -170,7 +169,7 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

cc.informerFactory = sharedInformers
cc.queueList = make([]workqueue.TypedRateLimitingInterface[any], workers)
cc.queue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
Expand All @@ -181,11 +180,6 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.maxRequeueNum = -1
}

var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
}

factory := opt.VCSharedInformerFactory
cc.vcInformerFactory = factory
if utilfeature.DefaultFeatureGate.Enabled(features.VolcanoJobSupport) {
Expand Down Expand Up @@ -292,14 +286,13 @@ func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
go wait.Until(cc.handleCommands, 0, stopCh)
var i uint32
for i = 0; i < cc.workers; i++ {
go func(num uint32) {
wait.Until(
func() {
cc.worker(num)
},
time.Second,
stopCh)
}(i)
go wait.Until(
func() {
for cc.processNextReq() {
}
},
time.Second,
stopCh)
}

go cc.cache.Run(stopCh)
Expand All @@ -310,48 +303,17 @@ func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
klog.Infof("JobController is running ...... ")
}

func (cc *jobcontroller) worker(i uint32) {
klog.Infof("worker %d start ...... ", i)

for cc.processNextReq(i) {
}
}

func (cc *jobcontroller) belongsToThisRoutine(key string, count uint32) bool {
val := cc.genHash(key)
return val%cc.workers == count
}

func (cc *jobcontroller) getWorkerQueue(key string) workqueue.TypedRateLimitingInterface[any] {
val := cc.genHash(key)
queue := cc.queueList[val%cc.workers]
return queue
}

func (cc *jobcontroller) genHash(key string) uint32 {
hashVal := fnv.New32()
hashVal.Write([]byte(key))
return hashVal.Sum32()
}

func (cc *jobcontroller) processNextReq(count uint32) bool {
queue := cc.queueList[count]
obj, shutdown := queue.Get()
func (cc *jobcontroller) processNextReq() bool {
obj, shutdown := cc.queue.Get()
if shutdown {
klog.Errorf("Fail to pop item from queue")
return false
}

req := obj.(apis.Request)
defer queue.Done(req)
defer cc.queue.Done(req)

key := jobcache.JobKeyByReq(&req)
if !cc.belongsToThisRoutine(key, count) {
klog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count)
queueLocal := cc.getWorkerQueue(key)
queueLocal.Add(req)
return true
}

klog.V(3).Infof("Try to handle request <%v>", req)

Expand Down Expand Up @@ -393,12 +355,12 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {
action := GetStateAction(delayAct)

if err := st.Execute(action); err != nil {
cc.handleJobError(queue, req, st, err, delayAct.action)
cc.handleJobError(cc.queue, req, st, err, delayAct.action)
return true
}

// If no error, forget it.
queue.Forget(req)
cc.queue.Forget(req)

// If the action is not an internal action, cancel all delayed actions
if !isInternalAction(delayAct.action) {
Expand Down Expand Up @@ -496,7 +458,7 @@ func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayA
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return
}
queue := cc.getWorkerQueue(delayAct.jobKey)
queue := cc.queue

if err := st.Execute(GetStateAction(delayAct)); err != nil {
cc.handleJobError(queue, req, st, err, delayAct.action)
Expand Down
29 changes: 7 additions & 22 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
scheduling "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
)

Expand Down Expand Up @@ -68,9 +67,7 @@ func (cc *jobcontroller) addJob(obj interface{}) {
klog.Errorf("Failed to add job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}

func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -109,9 +106,7 @@ func (cc *jobcontroller) updateJob(oldObj, newObj interface{}) {
JobName: newJob.Name,
Event: bus.OutOfSyncEvent,
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}

func (cc *jobcontroller) deleteJob(obj interface{}) {
Expand Down Expand Up @@ -203,9 +198,7 @@ func (cc *jobcontroller) addPod(obj interface{}) {
klog.Errorf("Failed to add Pod <%s/%s>: %v to cache",
pod.Namespace, pod.Name, err)
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}

func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -317,9 +310,7 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}

func (cc *jobcontroller) deletePod(obj interface{}) {
Expand Down Expand Up @@ -396,9 +387,7 @@ func (cc *jobcontroller) deletePod(obj interface{}) {
pod.Namespace, pod.Name, err)
}

key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}

func (cc *jobcontroller) recordJobEvent(namespace, name string, event batch.JobEvent, message string) {
Expand Down Expand Up @@ -442,9 +431,7 @@ func (cc *jobcontroller) processNextCommand() bool {
Action: bus.Action(cmd.Action),
}

key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)

return true
}
Expand Down Expand Up @@ -485,9 +472,7 @@ func (cc *jobcontroller) updatePodGroup(oldObj, newObj interface{}) {
case scheduling.PodGroupUnknown:
req.Event = bus.JobUnknownEvent
}
key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
cc.queue.Add(req)
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestJobAddFunc(t *testing.T) {
if job == nil || err != nil {
t.Errorf("Error while Adding Job in case %d with error %s", i, err)
}
queue := controller.getWorkerQueue(key)
queue := controller.queue
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
Expand Down Expand Up @@ -614,8 +614,7 @@ func TestUpdatePodGroupFunc(t *testing.T) {
t.Run(testcase.Name, func(t *testing.T) {
controller := newController()
controller.updatePodGroup(testcase.oldPodGroup, testcase.newPodGroup)
key := fmt.Sprintf("%s/%s", testcase.oldPodGroup.Namespace, testcase.oldPodGroup.Name)
queue := controller.getWorkerQueue(key)
queue := controller.queue
len := queue.Len()
if testcase.ExpectValue != len {
t.Errorf("case %d (%s): expected: %v, got %v ", i, testcase.Name, testcase.ExpectValue, len)
Expand Down
Loading