Skip to content

Commit

Permalink
Fixed Bind & Evict error handling issue.
Browse files Browse the repository at this point in the history
Signed-off-by: Da K. Ma <klaus1982.cn@gmail.com>
  • Loading branch information
k82cn committed Jul 1, 2018
1 parent a050a52 commit b2d1485
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 65 deletions.
4 changes: 4 additions & 0 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func NewJobInfo(uid JobID) *JobInfo {
}
}

func (ps *JobInfo) UnsetSchedulingSpec() {
ps.SchedSpec = nil
}

func (ps *JobInfo) SetSchedulingSpec(spec *arbv1.SchedulingSpec) {
ps.Name = spec.Name
ps.Namespace = spec.Namespace
Expand Down
30 changes: 17 additions & 13 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,13 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
}

func (ni *NodeInfo) Clone() *NodeInfo {
pods := make(map[TaskID]*TaskInfo, len(ni.Tasks))
res := NewNodeInfo(ni.Node)

for _, p := range ni.Tasks {
pods[PodKey(p.Pod)] = p.Clone()
res.AddTask(p)
}

return &NodeInfo{
Name: ni.Name,
Node: ni.Node,
Idle: ni.Idle.Clone(),
Used: ni.Used.Clone(),
Releasing: ni.Releasing.Clone(),
Allocatable: ni.Allocatable.Clone(),
Capability: ni.Capability.Clone(),

Tasks: pods,
}
return res
}

func (ni *NodeInfo) SetNode(node *v1.Node) {
Expand Down Expand Up @@ -183,3 +173,17 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {

return ni.AddTask(ti)
}

func (ni NodeInfo) String() string {
res := ""

i := 0
for _, task := range ni.Tasks {
res = res + fmt.Sprintf("\n\t %d: %v", i, task)
i++
}

return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>%s",
ni.Name, ni.Idle, ni.Used, ni.Releasing, res)

}
106 changes: 93 additions & 13 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/golang/glog"

Expand Down Expand Up @@ -60,7 +61,8 @@ type SchedulerCache struct {
Jobs map[arbapi.JobID]*arbapi.JobInfo
Nodes map[string]*arbapi.NodeInfo

errTasks *cache.FIFO
errTasks *cache.FIFO
deletedJobs *cache.FIFO
}

type defaultBinder struct {
Expand Down Expand Up @@ -98,11 +100,40 @@ func (de *defaultEvictor) Evict(p *v1.Pod) error {
return nil
}

func taskKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

task, ok := obj.(*arbapi.TaskInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(task.UID), nil
}

func jobKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

job, ok := obj.(*arbapi.JobInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(job.UID), nil
}

func newSchedulerCache(config *rest.Config, schedulerName string) *SchedulerCache {
sc := &SchedulerCache{
Jobs: make(map[arbapi.JobID]*arbapi.JobInfo),
Nodes: make(map[string]*arbapi.NodeInfo),
errTasks: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
Jobs: make(map[arbapi.JobID]*arbapi.JobInfo),
Nodes: make(map[string]*arbapi.NodeInfo),
errTasks: cache.NewFIFO(taskKey),
deletedJobs: cache.NewFIFO(jobKey),
}

sc.kubeclient = kubernetes.NewForConfigOrDie(config)
Expand Down Expand Up @@ -200,6 +231,9 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {

// Re-sync error tasks.
go sc.resync()

// Cleanup jobs.
go sc.cleanupJobs()
}

func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
Expand Down Expand Up @@ -255,7 +289,7 @@ func (sc *SchedulerCache) Evict(taskInfo *arbapi.TaskInfo) error {
go func() {
err := sc.Evictor.Evict(p)
if err != nil {
sc.enqueue(task)
sc.resyncTask(task)
}
}()

Expand Down Expand Up @@ -294,35 +328,81 @@ func (sc *SchedulerCache) Bind(taskInfo *arbapi.TaskInfo, hostname string) error

go func() {
if err := sc.Binder.Bind(p, hostname); err != nil {
sc.enqueue(task)
sc.resyncTask(task)
}
}()

return nil
}

func (sc *SchedulerCache) enqueue(task *arbapi.TaskInfo) {
sc.errTasks.AddIfNotPresent(task.Pod)
func (sc *SchedulerCache) deleteJob(job *arbapi.JobInfo) {
time.AfterFunc(5*time.Second, func() {
sc.deletedJobs.AddIfNotPresent(job)
})
}

func (sc *SchedulerCache) cleanupJob(job *arbapi.JobInfo) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

if job.SchedSpec == nil && len(job.Tasks) == 0 {
delete(sc.Jobs, job.UID)
return nil
}

return fmt.Errorf("Job is not ready to clean up")
}

func (sc *SchedulerCache) processCleanupJob() error {
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
job, ok := obj.(*arbapi.JobInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

if err := sc.cleanupJob(job); err != nil {
glog.Errorf("Failed to delete job <%v/%v>, retry ...", job.Namespace, job.Name)

sc.deleteJob(job)
return err
}
return nil
})

return err
}

func (sc *SchedulerCache) cleanupJobs() {
for {
err := sc.processCleanupJob()
if err != nil {
glog.Errorf("Failed to process job clean up: %v", err)
}
}
}

func (sc *SchedulerCache) resyncTask(task *arbapi.TaskInfo) {
sc.errTasks.AddIfNotPresent(task)
}

func (sc *SchedulerCache) resync() {
for {
err := sc.processWorkItem()
err := sc.processResyncTask()
if err != nil {
glog.Errorf("Failed to process resync: %v", err)
}
}
}

func (sc *SchedulerCache) processWorkItem() error {
func (sc *SchedulerCache) processResyncTask() error {
_, err := sc.errTasks.Pop(func(obj interface{}) error {
pod, ok := obj.(*v1.Pod)
task, ok := obj.(*arbapi.TaskInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

if err := sc.syncPod(pod); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>", pod.Namespace, pod.Name)
if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name)
return err
}
return nil
Expand Down
75 changes: 44 additions & 31 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ func isTerminated(status arbapi.TaskStatus) bool {
return status == arbapi.Succeeded || status == arbapi.Failed
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)

func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
if len(pi.Job) != 0 {
if _, found := sc.Jobs[pi.Job]; !found {
sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
Expand All @@ -50,9 +47,6 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
// client-go issue, we need to dig deeper for that.
sc.Jobs[pi.Job].DeleteTaskInfo(pi)
sc.Jobs[pi.Job].AddTaskInfo(pi)
} else {
glog.Warningf("The controller of pod %v/%v is empty, can not schedule it.",
pod.Namespace, pod.Name)
}

if len(pi.NodeName) != 0 {
Expand All @@ -71,22 +65,39 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
return nil
}

func (sc *SchedulerCache) syncPod(oldPod *v1.Pod) error {
// Assumes that lock is already acquired.
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)

return sc.addTask(pi)
}

func (sc *SchedulerCache) syncTask(oldTask *arbapi.TaskInfo) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

newPod, err := sc.kubeclient.CoreV1().Pods(oldPod.Namespace).Get(oldPod.Name, metav1.GetOptions{})
newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(oldTask.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
sc.deletePod(oldPod)
glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldPod.Namespace, oldPod.Name)
sc.deleteTask(oldTask)
glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)

return nil
}
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldPod.Namespace, oldPod.Name, err)
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
}

newTask := arbapi.NewTaskInfo(newPod)

return sc.updateTask(oldTask, newTask)
}

func (sc *SchedulerCache) updateTask(oldTask, newTask *arbapi.TaskInfo) error {
if err := sc.deleteTask(oldTask); err != nil {
return err
}

return sc.updatePod(oldPod, newPod)
return sc.addTask(newTask)
}

// Assumes that lock is already acquired.
Expand All @@ -97,17 +108,15 @@ func (sc *SchedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
return sc.addPod(newPod)
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)

func (sc *SchedulerCache) deleteTask(pi *arbapi.TaskInfo) error {
var jobErr, nodeErr error

if len(pi.Job) != 0 {
if job, found := sc.Jobs[pi.Job]; found {
jobErr = job.DeleteTaskInfo(pi)
} else {
jobErr = fmt.Errorf("failed to find Job for Task %v/%v", pi.Namespace, pi.Name)
jobErr = fmt.Errorf("failed to find Job <%v> for Task %v/%v",
pi.Job, pi.Namespace, pi.Name)
}
}

Expand All @@ -125,6 +134,12 @@ func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
return nil
}

// Assumes that lock is already acquired.
func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
pi := arbapi.NewTaskInfo(pod)
return sc.deleteTask(pi)
}

func (sc *SchedulerCache) AddPod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
Expand All @@ -140,6 +155,8 @@ func (sc *SchedulerCache) AddPod(obj interface{}) {
glog.Errorf("Failed to add pod <%s/%s> into cache: %v",
pod.Namespace, pod.Name, err)
return
} else {
glog.V(3).Infof("Added pod <%s/%v> into cache.", pod.Namespace, pod.Name)
}
return
}
Expand All @@ -164,6 +181,9 @@ func (sc *SchedulerCache) UpdatePod(oldObj, newObj interface{}) {
glog.Errorf("Failed to update pod %v in cache: %v", oldPod.Name, err)
return
}

glog.V(3).Infof("Updated pod <%s/%v> in cache.", oldPod.Namespace, oldPod.Name)

return
}

Expand Down Expand Up @@ -192,6 +212,8 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) {
glog.Errorf("Failed to delete pod %v from cache: %v", pod.Name, err)
return
}

glog.V(3).Infof("Deleted pod <%s/%v> from cache.", pod.Namespace, pod.Name)
return
}

Expand Down Expand Up @@ -326,20 +348,11 @@ func (sc *SchedulerCache) deleteSchedulingSpec(ss *arbv1.SchedulingSpec) error {
return fmt.Errorf("can not found job %v:%v/%v", jobID, ss.Namespace, ss.Name)
}

// Removed tasks from nodes.
for _, task := range job.Tasks {
if len(task.NodeName) != 0 {
if node, found := sc.Nodes[task.NodeName]; found {
node.RemoveTask(task)
} else {
glog.V(3).Infof("Failed to find node <%v> for task %v:%v/%v",
task.NodeName, task.UID, task.Namespace, task.Name)
}
}
}
// Unset SchedulingSpec
job.UnsetSchedulingSpec()

// Deleted Job from cache.
delete(sc.Jobs, jobID)
// TODO (k82cn): find another way to clean up Job.
sc.deleteJob(job)

return nil
}
Expand Down
Loading

0 comments on commit b2d1485

Please sign in to comment.