Skip to content

Commit

Permalink
Merge pull request #1637 from Thor-wl/0719-issue1425
Browse files Browse the repository at this point in the history
fix overused judgement when deal with allocate and proportion
  • Loading branch information
volcano-sh-bot authored Aug 31, 2021
2 parents 5349bcd + 791abb7 commit 012c2a2
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 9 deletions.
9 changes: 8 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (alloc *Action) Execute(ssn *framework.Session) {
delete(queueInNamespace, queueID)
continue
}

if jobs, found := queueInNamespace[currentQueue.UID]; found && jobs.Empty() {
continue
}
Expand Down Expand Up @@ -203,6 +202,14 @@ func (alloc *Action) Execute(ssn *framework.Session) {
for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)

// Check whether the queue is overused on dimension that the task requested
taskRequest := task.Resreq.ResourceNames()

if !ssn.UnderusedResources(queue).Contains(taskRequest) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
continue
}

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(nodes), job.Namespace, job.Name)

predicateNodes, fitErrors := util.PredicateNodes(task, nodes, predicateFn)
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (ra *Action) Execute(ssn *framework.Session) {
task = tasks.Pop().(*api.TaskInfo)
}

// Check whether the queue is overused on dimension that the task requested
taskRequest := task.Resreq.ResourceNames()
if !ssn.UnderusedResources(queue).Contains(taskRequest) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
continue
}

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
"volcano.sh/volcano/pkg/scheduler/util"
)

func TestReclaim(t *testing.T) {
framework.RegisterPluginBuilder("conformance", conformance.New)
framework.RegisterPluginBuilder("gang", gang.New)
framework.RegisterPluginBuilder("proportion", proportion.New)
defer framework.CleanupPluginBuilders()

tests := []struct {
Expand Down Expand Up @@ -164,6 +166,10 @@ func TestReclaim(t *testing.T) {
Name: "gang",
EnabledReclaimable: &trueValue,
},
{
Name: "proportion",
EnabledReclaimable: &trueValue,
},
},
},
}, nil)
Expand Down
48 changes: 41 additions & 7 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,21 @@ func (r *Resource) String() string {
}

// ResourceNames returns all resource types
func (r *Resource) ResourceNames() []v1.ResourceName {
resNames := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory}
func (r *Resource) ResourceNames() ResourceNameList {
resNames := ResourceNameList{}

for rName := range r.ScalarResources {
resNames = append(resNames, rName)
if r.MilliCPU >= minResource {
resNames = append(resNames, v1.ResourceCPU)
}

if r.Memory >= minResource {
resNames = append(resNames, v1.ResourceMemory)
}

for rName, rMount := range r.ScalarResources {
if rMount >= minResource {
resNames = append(resNames, rName)
}
}

return resNames
Expand Down Expand Up @@ -194,10 +204,10 @@ func (r *Resource) Sub(rr *Resource) *Resource {
r.MilliCPU -= rr.MilliCPU
r.Memory -= rr.Memory

if r.ScalarResources == nil {
return r
}
for rrName, rrQuant := range rr.ScalarResources {
if r.ScalarResources == nil {
return r
}
r.ScalarResources[rrName] -= rrQuant
}

Expand Down Expand Up @@ -566,3 +576,27 @@ func ParseResourceList(m map[string]string) (v1.ResourceList, error) {
}
return rl, nil
}

func GetMinResource() float64 {
return minResource
}

// ResourceNameList struct defines resource name collection
type ResourceNameList []v1.ResourceName

// Contains judges whether rr is subset of r
func (r ResourceNameList) Contains(rr ResourceNameList) bool {
for _, rrName := range ([]v1.ResourceName)(rr) {
isResourceExist := false
for _, rName := range ([]v1.ResourceName)(r) {
if rName == rrName {
isResourceExist = true
break
}
}
if !isResourceExist {
return false
}
}
return true
}
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,6 @@ type ReservedNodesFn func()

// VictimTasksFn is the func declaration used to select victim tasks
type VictimTasksFn func() []*TaskInfo

// UnderUsedResourceFn is the func declaration used to get under used resource list for queue
type UnderUsedResourceFn func(*QueueInfo) ResourceNameList
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Session struct {
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
underUsedFns map[string]api.UnderUsedResourceFn
jobReadyFns map[string]api.ValidateFn
jobPipelinedFns map[string]api.VoteFn
jobValidFns map[string]api.ValidateExFn
Expand Down Expand Up @@ -113,6 +114,7 @@ func openSession(cache cache.Cache) *Session {
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
underUsedFns: map[string]api.UnderUsedResourceFn{},
jobReadyFns: map[string]api.ValidateFn{},
jobPipelinedFns: map[string]api.VoteFn{},
jobValidFns: map[string]api.ValidateExFn{},
Expand Down
21 changes: 21 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
ssn.overusedFns[name] = fn
}

// AddUnderusedResourceFn add underused function
func (ssn *Session) AddUnderusedResourceFn(name string, fn api.UnderUsedResourceFn) {
ssn.underUsedFns[name] = fn
}

// AddJobValidFn add jobvalid function
func (ssn *Session) AddJobValidFn(name string, fn api.ValidateExFn) {
ssn.jobValidFns[name] = fn
Expand Down Expand Up @@ -257,6 +262,22 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool {
return false
}

// UnderusedResources invoke underused function of the plugins
func (ssn *Session) UnderusedResources(queue *api.QueueInfo) api.ResourceNameList {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
of, found := ssn.underUsedFns[plugin.Name]
if !found {
continue
}
underUsedResourceList := of(queue)
return underUsedResourceList
}
}

return api.ResourceNameList{}
}

// JobReady invoke jobready function of the plugins
func (ssn *Session) JobReady(obj interface{}) bool {
for _, tier := range ssn.Tiers {
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/plugins/drf/hdrf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/scheduler/actions/allocate"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/proportion"
"volcano.sh/volcano/pkg/scheduler/util"
)

Expand Down Expand Up @@ -54,6 +56,7 @@ func TestHDRF(t *testing.T) {
s.RegisterOptions()

framework.RegisterPluginBuilder(PluginName, New)
framework.RegisterPluginBuilder("proportion", proportion.New)
defer framework.CleanupPluginBuilders()

tests := []struct {
Expand Down Expand Up @@ -253,6 +256,12 @@ func TestHDRF(t *testing.T) {
EnabledQueueOrder: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "proportion",
EnabledJobEnqueued: &trueValue,
EnabledQueueOrder: &trueValue,
EnabledReclaimable: &trueValue,
},
},
},
}, nil)
Expand Down
24 changes: 23 additions & 1 deletion pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package proportion
import (
"reflect"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"

"volcano.sh/apis/pkg/apis/scheduling"
Expand Down Expand Up @@ -239,7 +240,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
queue := obj.(*api.QueueInfo)
attr := pp.queueOpts[queue.UID]

overused := !attr.allocated.LessEqual(attr.deserved, api.Zero)
overused := attr.deserved.LessEqual(attr.allocated, api.Zero)
metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>, share <%v>",
Expand All @@ -249,6 +250,27 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return overused
})

ssn.AddUnderusedResourceFn(pp.Name(), func(queue *api.QueueInfo) api.ResourceNameList {
underUsedResNames := api.ResourceNameList{}
attr := pp.queueOpts[queue.UID]

_, underUsedResource := attr.allocated.Diff(attr.deserved)
if underUsedResource.MilliCPU >= api.GetMinResource() {
underUsedResNames = append(underUsedResNames, v1.ResourceCPU)
}
if underUsedResource.Memory >= api.GetMinResource() {
underUsedResNames = append(underUsedResNames, v1.ResourceMemory)
}
for rName := range underUsedResource.ScalarResources {
underUsedResNames = append(underUsedResNames, rName)
}
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>, share <%v>, underUsedResName %v",
queue.Name, attr.deserved, attr.allocated, attr.share, underUsedResNames)

return underUsedResNames

})

ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int {
job := obj.(*api.JobInfo)
queueID := job.Queue
Expand Down

0 comments on commit 012c2a2

Please sign in to comment.