Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue refactor. #445

Merged
merged 1 commit into from
Sep 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package queue

import (
"fmt"
"reflect"
"sync"

"github.com/golang/glog"
Expand Down Expand Up @@ -137,40 +139,50 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

var pending, running, unknown, inqueue int32
func (c *Controller) getPodGroups(key string) ([]string, error) {
c.pgMutex.RLock()
defer c.pgMutex.RUnlock()

if c.podGroups[key] == nil {
c.pgMutex.RUnlock()
glog.V(2).Infof("queue %s has not been seen or deleted", key)
return nil
return nil, fmt.Errorf("queue %s has not been seen or deleted", key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is dangerous, think a queue is deleted before a pg. It will cause endless retry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's handle this in different PR, refer to the TODO in following codes :)

}
podGroups := make([]string, 0, len(c.podGroups[key]))
for pgKey := range c.podGroups[key] {
podGroups = append(podGroups, pgKey)
}
c.pgMutex.RUnlock()

return podGroups, nil
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

podGroups, err := c.getPodGroups(key)
if err != nil {
return err
}

queueStatus := schedulingv1alpha2.QueueStatus{}

for _, pgKey := range podGroups {
// Ignore error here, tt can not occur.
ns, name, _ := cache.SplitMetaNamespaceKey(pgKey)

// TODO: check NotFound error and sync local cache.
pg, err := c.pgLister.PodGroups(ns).Get(name)
if err != nil {
return err
}

switch pg.Status.Phase {
case schedulingv1alpha2.PodGroupPending:
pending++
queueStatus.Pending++
case schedulingv1alpha2.PodGroupRunning:
running++
queueStatus.Running++
case schedulingv1alpha2.PodGroupUnknown:
unknown++
queueStatus.Unknown++
case schedulingv1alpha2.PodGroupInqueue:
inqueue++
queueStatus.Inqueue++
}
}

Expand All @@ -180,21 +192,17 @@ func (c *Controller) syncQueue(key string) error {
glog.V(2).Infof("queue %s has been deleted", key)
return nil
}
// TODO: do not retry to syncQueue for this error
return err
}

glog.V(4).Infof("queue %s jobs pending %d, running %d, unknown %d", key, pending, running, unknown)
// ignore update when status doesnot change
if pending == queue.Status.Pending && running == queue.Status.Running &&
unknown == queue.Status.Unknown && inqueue == queue.Status.Inqueue {
// ignore update when status does not change
if reflect.DeepEqual(queueStatus, queue.Status) {
return nil
}

newQueue := queue.DeepCopy()
newQueue.Status.Pending = pending
newQueue.Status.Running = running
newQueue.Status.Unknown = unknown
newQueue.Status.Inqueue = inqueue
newQueue.Status = queueStatus

if _, err := c.kbClient.SchedulingV1alpha2().Queues().UpdateStatus(newQueue); err != nil {
glog.Errorf("Failed to update status of Queue %s: %v", newQueue.Name, err)
Expand Down Expand Up @@ -225,20 +233,21 @@ func (c *Controller) deleteQueue(obj interface{}) {
}

c.pgMutex.Lock()
defer c.pgMutex.Unlock()
delete(c.podGroups, queue.Name)
c.pgMutex.Unlock()
}

func (c *Controller) addPodGroup(obj interface{}) {
pg := obj.(*schedulingv1alpha2.PodGroup)
key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
defer c.pgMutex.Unlock()

if c.podGroups[pg.Spec.Queue] == nil {
c.podGroups[pg.Spec.Queue] = make(map[string]struct{})
}
c.podGroups[pg.Spec.Queue][key] = struct{}{}
c.pgMutex.Unlock()

// enqueue
c.queue.Add(pg.Spec.Queue)
Expand All @@ -251,10 +260,8 @@ func (c *Controller) updatePodGroup(old, new interface{}) {
// Note: we have no use case update PodGroup.Spec.Queue
// So do not consider it here.
if oldPG.Status.Phase != newPG.Status.Phase {
// enqueue
c.queue.Add(newPG.Spec.Queue)
}

}

func (c *Controller) deletePodGroup(obj interface{}) {
Expand All @@ -275,8 +282,9 @@ func (c *Controller) deletePodGroup(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)

c.pgMutex.Lock()
defer c.pgMutex.Unlock()

delete(c.podGroups[pg.Spec.Queue], key)
c.pgMutex.Unlock()

c.queue.Add(pg.Spec.Queue)
}