Skip to content

Commit

Permalink
Merge pull request volcano-sh#257 from k82cn/ka_256
Browse files Browse the repository at this point in the history
Added error handling for Bind & Evict.
  • Loading branch information
k82cn authored Jul 1, 2018
2 parents f367328 + 1a7a6c9 commit bf7e88c
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 95 deletions.
20 changes: 10 additions & 10 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}
tasks := pendingTasks[job.UID]

glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v:%v/%v>",
tasks.Len(), job.UID, job.Namespace, job.Name)
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)

for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)
Expand All @@ -83,19 +83,19 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
nodes = ssn.Nodes
}

glog.V(3).Infof("There are <%d> nodes for Job <%v:%v/%v>",
len(nodes), job.UID, job.Namespace, job.Name)
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
len(nodes), job.Namespace, job.Name)

for _, node := range nodes {
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Job, task.UID, node.Name, task.Resreq, node.Idle)
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// Allocate idle resource to the task.
if task.Resreq.LessEqual(node.Idle) {
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Job, task.UID, node.Name)
task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v",
task.UID, node.Name, ssn.ID)
task.UID, node.Name, ssn.UID)
continue
}
assigned = true
Expand All @@ -104,11 +104,11 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {

// Allocate releasing resource to the task if any.
if task.Resreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v:%v/%v> to node <%v> for <%v> on <%v>",
task.UID, task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.ID)
task.UID, node.Name, ssn.UID)
continue
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/decorate/decorate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (alloc *decorateAction) Execute(ssn *framework.Session) {

for _, job := range jobs {
job.Candidates = fetchMatchNodeForPodSet(job, nodes)
glog.V(3).Infof("Got %d candidate nodes for Job %v:%v/%v",
len(job.Candidates), job.UID, job.Namespace, job.Name)
glog.V(3).Infof("Got %d candidate nodes for Job %v/%v",
len(job.Candidates), job.Namespace, job.Name)
}
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
break
}

glog.V(3).Infof("The preemptor is %v:%v/%v, the preemptee is %v:%v/%v",
preemptorJob.UID, preemptorJob.Namespace, preemptorJob.Name,
preempteeJob.UID, preempteeJob.Namespace, preempteeJob.Name)
glog.V(3).Infof("The preemptor is %v/%v, the preemptee is %v/%v",
preemptorJob.Namespace, preemptorJob.Name,
preempteeJob.Namespace, preempteeJob.Name)

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
preemptee := preempteeTasks[preempteeJob.UID].Pop().(*api.TaskInfo)
Expand All @@ -108,14 +108,16 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

if ssn.Preemptable(preemptor, preemptee) {
if err := ssn.Preempt(preemptor, preemptee); err != nil {
glog.Errorf("Failed to evict task %v for task %v: %v", nil, nil, err)
glog.Errorf("Failed to evict task %v/%v for task %v/%v: %v",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name, err)
} else {
preempted = true
}
} else {
glog.V(3).Infof("Can not preempt task <%v:%v/%v> for task <%v:%v/%v>",
preemptee.UID, preemptee.Namespace, preemptee.Name,
preemptor.UID, preemptor.Namespace, preemptor.Name)
glog.V(3).Infof("Can not preempt task <%v/%v> for task <%v/%v>",
preemptee.Namespace, preemptee.Name,
preemptor.Namespace, preemptor.Name)
}

// If preempted resource, put it back to the queue.
Expand Down
27 changes: 27 additions & 0 deletions pkg/scheduler/api/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,30 @@ func AllocatedStatus(status TaskStatus) bool {
return false
}
}

func MergeErrors(errs ...error) error {
msg := "errors: "

foundErr := false
i := 1

for _, e := range errs {
if e != nil {
if foundErr {
msg = fmt.Sprintf("%s, %d: ", msg, i)
} else {
msg = fmt.Sprintf("%s %d: ", msg, i)
}

msg = fmt.Sprintf("%s%v", msg, e)
foundErr = true
i++
}
}

if foundErr {
return fmt.Errorf("%s", msg)
}

return nil
}
20 changes: 12 additions & 8 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,32 @@ func (ps *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
return nil
}

func (ps *JobInfo) deleteTaskIndex(pi *TaskInfo) {
if ts, found := ps.TaskStatusIndex[pi.Status]; found {
delete(ts, pi.UID)
func (ps *JobInfo) deleteTaskIndex(ti *TaskInfo) {
if tasks, found := ps.TaskStatusIndex[ti.Status]; found {
delete(tasks, ti.UID)

if len(ts) == 0 {
delete(ps.TaskStatusIndex, pi.Status)
if len(tasks) == 0 {
delete(ps.TaskStatusIndex, ti.Status)
}
}
}

func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) {
func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) error {
if task, found := ps.Tasks[pi.UID]; found {
ps.TotalRequest.Sub(task.Resreq)

if AllocatedStatus(task.Status) {
ps.Allocated.Sub(task.Resreq)
}

delete(ps.Tasks, pi.UID)
delete(ps.Tasks, task.UID)

ps.deleteTaskIndex(task)
return nil
}

ps.deleteTaskIndex(pi)
return fmt.Errorf("failed to find task <%v/%v> in job <%v/%v>",
pi.Namespace, pi.Name, ps.Namespace, ps.Name)
}

func (ps *JobInfo) Clone() *JobInfo {
Expand Down
57 changes: 35 additions & 22 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package api

import (
"github.com/golang/glog"
"fmt"

"k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -110,50 +110,56 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.Capability = NewResource(node.Status.Capacity)
}

func (ni *NodeInfo) PipelineTask(task *TaskInfo) {
func (ni *NodeInfo) PipelineTask(task *TaskInfo) error {
key := PodKey(task.Pod)
if _, found := ni.Tasks[key]; found {
glog.Errorf("Task <%v/%v> already on node <%v>, should not add again.",
return fmt.Errorf("task <%v/%v> already on node <%v>",
task.Namespace, task.Name, ni.Name)
return
}

ti := task.Clone()

if ni.Node != nil {
ni.Releasing.Sub(task.Resreq)
ni.Used.Add(task.Resreq)
ni.Releasing.Sub(ti.Resreq)
ni.Used.Add(ti.Resreq)
}

ni.Tasks[key] = task
ni.Tasks[key] = ti

return nil
}

func (ni *NodeInfo) AddTask(task *TaskInfo) {
func (ni *NodeInfo) AddTask(task *TaskInfo) error {
key := PodKey(task.Pod)
if _, found := ni.Tasks[key]; found {
glog.Errorf("Task <%v/%v> already on node <%v>, should not add again.",
return fmt.Errorf("task <%v/%v> already on node <%v>",
task.Namespace, task.Name, ni.Name)
return
}

// Node will hold a copy of task to make sure the status
// change will not impact resource in node.
ti := task.Clone()

if ni.Node != nil {
if task.Status == Releasing {
ni.Releasing.Add(task.Resreq)
if ti.Status == Releasing {
ni.Releasing.Add(ti.Resreq)
}
ni.Idle.Sub(task.Resreq)
ni.Used.Add(task.Resreq)
ni.Idle.Sub(ti.Resreq)
ni.Used.Add(ti.Resreq)
}

glog.V(3).Infof("After added Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)
ni.Tasks[key] = ti

ni.Tasks[key] = task
return nil
}

func (ni *NodeInfo) RemoveTask(ti *TaskInfo) {
func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
key := PodKey(ti.Pod)

task, found := ni.Tasks[key]
if !found {
return
return fmt.Errorf("failed to find task <%v/%v> on host <%v>",
ti.Namespace, ti.Name, ni.Name)
}

if ni.Node != nil {
Expand All @@ -165,8 +171,15 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) {
ni.Used.Sub(task.Resreq)
}

glog.V(3).Infof("After removed Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)

delete(ni.Tasks, key)

return nil
}

func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {
if err := ni.RemoveTask(ti); err != nil {
return err
}

return ni.AddTask(ti)
}
Loading

0 comments on commit bf7e88c

Please sign in to comment.