Skip to content

Commit

Permalink
Upgrade kube-batch to 0.4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaus Ma authored and k82cn committed Mar 31, 2019
1 parent 57b990b commit 95a1ac3
Show file tree
Hide file tree
Showing 26 changed files with 357 additions and 102 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ required = [

[[constraint]]
name = "github.com/kubernetes-sigs/kube-batch"
version = "0.4.1"
version = "0.4.2"

[[constraint]]
name = "github.com/onsi/ginkgo"
Expand Down
22 changes: 13 additions & 9 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
job.Namespace, job.Name, job.Queue)
continue
}

if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}

glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
Expand Down Expand Up @@ -144,12 +148,12 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
selectedNodes := util.SelectBestNode(nodeScores)
for _, node := range selectedNodes {
// Allocate idle resource to the task.
if task.Resreq.LessEqual(node.Idle) {
if task.InitResreq.LessEqual(node.Idle) {
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v",
task.UID, node.Name, ssn.UID)
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
continue
}
assigned = true
Expand All @@ -163,9 +167,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}

// Allocate releasing resource to the task if any.
if task.Resreq.LessEqual(node.Releasing) {
if task.InitResreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.UID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
for _, task := range job.TaskStatusIndex[api.Pending] {
if task.Resreq.IsEmpty() {
if task.InitResreq.IsEmpty() {
// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
Expand Down
17 changes: 12 additions & 5 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func preempt(

var preemptees []*api.TaskInfo
preempted := api.EmptyResource()
resreq := preemptor.Resreq.Clone()
resreq := preemptor.InitResreq.Clone()

for _, task := range node.Tasks {
if filter == nil {
Expand All @@ -221,8 +221,15 @@ func preempt(
continue
}

// Preempt victims for tasks.
for _, preemptee := range victims {
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
return !ssn.TaskOrderFn(l, r)
})
for _, victim := range victims {
victimsQueue.Push(victim)
}
// Preempt victims for tasks, pick lowest priority task first.
for !victimsQueue.Empty() {
preemptee := victimsQueue.Pop().(*api.TaskInfo)
glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
Expand All @@ -239,9 +246,9 @@ func preempt(

metrics.RegisterPreemptionAttempts()
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)

if preemptor.Resreq.LessEqual(preempted) {
if preemptor.InitResreq.LessEqual(preempted) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
Expand Down
9 changes: 4 additions & 5 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
}

assigned := false

for _, n := range ssn.Nodes {
// If predicates failed, next node.
if err := ssn.PredicateFn(task, n); err != nil {
continue
}

resreq := task.Resreq.Clone()
resreq := task.InitResreq.Clone()
reclaimed := api.EmptyResource()

glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
Expand Down Expand Up @@ -172,11 +171,11 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
}

glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
reclaimed, task.Namespace, task.Name, task.Resreq)
reclaimed, task.Namespace, task.Name, task.InitResreq)

if task.Resreq.LessEqual(reclaimed) {
if task.InitResreq.LessEqual(reclaimed) {
if err := ssn.Pipeline(task, n.Name); err != nil {
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
task.Namespace, task.Name, n.Name)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/plugins/conformance/conformance.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
)

type conformancePlugin struct {
// Arguments given for the plugin
pluginArguments map[string]string
}

func New() framework.Plugin {
return &conformancePlugin{}
func New(arguments map[string]string) framework.Plugin {
return &conformancePlugin{pluginArguments: arguments}
}

func (pp *conformancePlugin) Name() string {
Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/plugins/drf/drf.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ type drfPlugin struct {

// Key is Job ID
jobOpts map[api.JobID]*drfAttr

// Arguments given for the plugin
pluginArguments map[string]string
}

func New() framework.Plugin {
func New(arguments map[string]string) framework.Plugin {
return &drfPlugin{
totalResource: api.EmptyResource(),
jobOpts: map[api.JobID]*drfAttr{},
totalResource: api.EmptyResource(),
jobOpts: map[api.JobID]*drfAttr{},
pluginArguments: arguments,
}
}

Expand Down Expand Up @@ -110,8 +114,8 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)

glog.V(4).Infof("DRF JobOrderFn: <%v/%v> is ready: %d, <%v/%v> is ready: %d",
lv.Namespace, lv.Name, lv.Priority, rv.Namespace, rv.Name, rv.Priority)
glog.V(4).Infof("DRF JobOrderFn: <%v/%v> share state: %d, <%v/%v> share state: %d",
lv.Namespace, lv.Name, drf.jobOpts[lv.UID].share, rv.Namespace, rv.Name, drf.jobOpts[rv.UID].share)

if drf.jobOpts[lv.UID].share == drf.jobOpts[rv.UID].share {
return 0
Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (
)

type gangPlugin struct {
// Arguments given for the plugin
pluginArguments map[string]string
}

func New() framework.Plugin {
return &gangPlugin{}
func New(arguments map[string]string) framework.Plugin {
return &gangPlugin{pluginArguments: arguments}
}

func (gp *gangPlugin) Name() string {
Expand Down
118 changes: 113 additions & 5 deletions pkg/scheduler/plugins/nodeorder/nodeorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nodeorder

import (
"fmt"
"strconv"

"github.com/golang/glog"

Expand All @@ -32,7 +33,20 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
)

const (
// NodeAffinityWeight is the key for providing Node Affinity Priority Weight in YAML
NodeAffinityWeight = "nodeaffinity.weight"
// PodAffinityWeight is the key for providing Pod Affinity Priority Weight in YAML
PodAffinityWeight = "podaffinity.weight"
// LeastRequestedWeight is the key for providing Least Requested Priority Weight in YAML
LeastRequestedWeight = "leastrequested.weight"
// BalancedResourceWeight is the key for providing Balanced Resource Priority Weight in YAML
BalancedResourceWeight = "balancedresource.weight"
)

type nodeOrderPlugin struct {
// Arguments given for the plugin
pluginArguments map[string]string
}

func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.HostPriorityList) int {
Expand Down Expand Up @@ -145,17 +159,100 @@ func (nl *nodeLister) List() ([]*v1.Node, error) {
}

//New function returns prioritizePlugin object
func New() framework.Plugin {
return &nodeOrderPlugin{}
func New(aruguments map[string]string) framework.Plugin {
return &nodeOrderPlugin{pluginArguments: aruguments}
}

func (pp *nodeOrderPlugin) Name() string {
return "nodeorder"
}

type priorityWeight struct {
leastReqWeight int
nodeAffinityWeight int
podAffinityWeight int
balancedRescourceWeight int
}

func calculateWeight(args map[string]string) priorityWeight {
/*
User Should give priorityWeight in this format(nodeaffinity.weight, podaffinity.weight, leastrequested.weight, balancedresource.weight).
Currently supported only for nodeaffinity, podaffinity, leastrequested, balancedresouce priorities.
actions: "reclaim, allocate, backfill, preempt"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
arguments:
nodeaffinity.weight: 2
podaffinity.weight: 2
leastrequested.weight: 2
balancedresource.weight: 2
*/

// Values are initialized to 1.
weight := priorityWeight{
leastReqWeight: 1,
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedRescourceWeight: 1,
}

// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
if args[NodeAffinityWeight] != "" {
val, err := strconv.Atoi(args[NodeAffinityWeight])
if err != nil {
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[NodeAffinityWeight], err)
} else {
weight.nodeAffinityWeight = val
}
}

// Checks whether podaffinity.weight is provided or not, if given, modifies the value in weight struct.
if args[PodAffinityWeight] != "" {
val, err := strconv.Atoi(args[PodAffinityWeight])
if err != nil {
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[PodAffinityWeight], err)
} else {
weight.podAffinityWeight = val
}
}

// Checks whether leastrequested.weight is provided or not, if given, modifies the value in weight struct.
if args[LeastRequestedWeight] != "" {
val, err := strconv.Atoi(args[LeastRequestedWeight])
if err != nil {
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[LeastRequestedWeight], err)
} else {
weight.leastReqWeight = val
}
}

// Checks whether balancedresource.weight is provided or not, if given, modifies the value in weight struct.
if args[BalancedResourceWeight] != "" {
val, err := strconv.Atoi(args[BalancedResourceWeight])
if err != nil {
glog.Warningf("Not able to Parse Weight for %v because of error: %v", args[BalancedResourceWeight], err)
} else {
weight.balancedRescourceWeight = val
}
}

return weight
}

func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (int, error) {

weight := calculateWeight(pp.pluginArguments)

pl := &podLister{
session: ssn,
}
Expand Down Expand Up @@ -186,14 +283,24 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
glog.Warningf("Least Requested Priority Failed because of Error: %v", err)
return 0, err
}
score = score + host.Score
// If leastReqWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
score = score + (host.Score * weight.leastReqWeight)

host, err = priorities.BalancedResourceAllocationMap(task.Pod, nil, nodeInfo)
if err != nil {
glog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", err)
return 0, err
}
// If balancedRescourceWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
score = score + (host.Score * weight.balancedRescourceWeight)

host, err = priorities.CalculateNodeAffinityPriorityMap(task.Pod, nil, nodeInfo)
if err != nil {
glog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", err)
return 0, err
}
score = score + host.Score
// If nodeAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
score = score + (host.Score * weight.nodeAffinityWeight)

mapFn := priorities.NewInterPodAffinityPriority(cn, nl, pl, v1.DefaultHardPodAffinitySymmetricWeight)
interPodAffinityScore, err = mapFn(task.Pod, nodeMap, nodeSlice)
Expand All @@ -202,7 +309,8 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
return 0, err
}
hostScore := getInterPodAffinityScore(node.Name, interPodAffinityScore)
score = score + hostScore
// If podAffinityWeight in provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
score = score + (hostScore * weight.podAffinityWeight)

glog.V(4).Infof("Total Score for that node is: %d", score)
return score, nil
Expand Down
Loading

0 comments on commit 95a1ac3

Please sign in to comment.