Skip to content

Commit

Permalink
Merge pull request #445 from k82cn/queue_refactor
Browse files Browse the repository at this point in the history
Queue refactor.
  • Loading branch information
volcano-sh-bot authored Sep 12, 2019
2 parents 51132ff + a91b066 commit 97217fd
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package queue

import (
"fmt"
"reflect"
"sync"

"github.com/golang/glog"
Expand Down Expand Up @@ -137,40 +139,50 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

var pending, running, unknown, inqueue int32
func (c *Controller) getPodGroups(key string) ([]string, error) {
c.pgMutex.RLock()
defer c.pgMutex.RUnlock()

if c.podGroups[key] == nil {
c.pgMutex.RUnlock()
glog.V(2).Infof("queue %s has not been seen or deleted", key)
return nil
return nil, fmt.Errorf("queue %s has not been seen or deleted", key)
}
podGroups := make([]string, 0, len(c.podGroups[key]))
for pgKey := range c.podGroups[key] {
podGroups = append(podGroups, pgKey)
}
c.pgMutex.RUnlock()

return podGroups, nil
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

podGroups, err := c.getPodGroups(key)
if err != nil {
return err
}

queueStatus := schedulingv1alpha2.QueueStatus{}

for _, pgKey := range podGroups {
// Ignore error here, tt can not occur.
ns, name, _ := cache.SplitMetaNamespaceKey(pgKey)

// TODO: check NotFound error and sync local cache.
pg, err := c.pgLister.PodGroups(ns).Get(name)
if err != nil {
return err
}

switch pg.Status.Phase {
case schedulingv1alpha2.PodGroupPending:
pending++
queueStatus.Pending++
case schedulingv1alpha2.PodGroupRunning:
running++
queueStatus.Running++
case schedulingv1alpha2.PodGroupUnknown:
unknown++
queueStatus.Unknown++
case schedulingv1alpha2.PodGroupInqueue:
inqueue++
queueStatus.Inqueue++
}
}

Expand All @@ -180,21 +192,17 @@ func (c *Controller) syncQueue(key string) error {
glog.V(2).Infof("queue %s has been deleted", key)
return nil
}
// TODO: do not retry to syncQueue for this error
return err
}

glog.V(4).Infof("queue %s jobs pending %d, running %d, unknown %d", key, pending, running, unknown)
// ignore update when status doesnot change
if pending == queue.Status.Pending && running == queue.Status.Running &&
unknown == queue.Status.Unknown && inqueue == queue.Status.Inqueue {
// ignore update when status does not change
if reflect.DeepEqual(queueStatus, queue.Status) {
return nil
}

newQueue := queue.DeepCopy()
newQueue.Status.Pending = pending
newQueue.Status.Running = running
newQueue.Status.Unknown = unknown
newQueue.Status.Inqueue = inqueue
newQueue.Status = queueStatus

if _, err := c.kbClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
glog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err)
Expand Down Expand Up @@ -225,20 +233,21 @@ func (c *Controller) deleteQueue(obj interface{}) {
}

c.pgMutex.Lock()
defer c.pgMutex.Unlock()
delete(c.podGroups, queue.Name)
c.pgMutex.Unlock()
}

func (c *Controller) addPodGroup(obj interface{}) {
pg := obj.(*schedulingv1alpha2.PodGroup)
key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
defer c.pgMutex.Unlock()

if c.podGroups[pg.Spec.Queue] == nil {
c.podGroups[pg.Spec.Queue] = make(map[string]struct{})
}
c.podGroups[pg.Spec.Queue][key] = struct{}{}
c.pgMutex.Unlock()

// enqueue
c.queue.Add(pg.Spec.Queue)
Expand All @@ -251,10 +260,8 @@ func (c *Controller) updatePodGroup(old, new interface{}) {
// Note: we have no use case update PodGroup.Spec.Queue
// So do not consider it here.
if oldPG.Status.Phase != newPG.Status.Phase {
// enqueue
c.queue.Add(newPG.Spec.Queue)
}

}

func (c *Controller) deletePodGroup(obj interface{}) {
Expand All @@ -275,8 +282,9 @@ func (c *Controller) deletePodGroup(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
defer c.pgMutex.Unlock()

delete(c.podGroups[pg.Spec.Queue], key)
c.pgMutex.Unlock()

c.queue.Add(pg.Spec.Queue)
}

0 comments on commit 97217fd

Please sign in to comment.