Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network topology scheduling implementations of volcano scheduler #3874

Merged
merged 3 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Network topology implements of scheduler
Signed-off-by: Monokaix <changxuzheng@huawei.com>
  • Loading branch information
Monokaix committed Dec 26, 2024
commit 90d1c5245cb1d05965b935c778007ed57dd33834
231 changes: 190 additions & 41 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package allocate

import (
"sort"
"time"

"k8s.io/klog/v2"
Expand All @@ -33,11 +34,13 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
hyperNodesTiers []int
}

func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
hyperNodesTiers: []int{},
}
}

Expand All @@ -52,11 +55,26 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
}

func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
if ssn.HyperNodesListByTier == nil || len(ssn.HyperNodesListByTier) == 0 {
return
}

// sort to guarantee the traverse order is from down to top.
var tiers []int
for tier := range ssn.HyperNodesListByTier {
tiers = append(tiers, tier)
}
sort.Ints(tiers)
alloc.hyperNodesTiers = tiers
}

func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")

alloc.parseArguments(ssn)
alloc.parseHyperNodesTiers(ssn)

// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
Expand Down Expand Up @@ -176,26 +194,155 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
klog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)

alloc.allocateResourcesForTasks(tasks, job, jobs, queue, allNodes)
hardMode, highestAllowedTier := job.HasTopologyHardConstrain()
var stmt *framework.Statement
var tasksQueue *util.PriorityQueue
if hardMode {
stmt, tasksQueue = alloc.allocateResourceForTasksWithTopology(tasks, job, queue, highestAllowedTier)
// There are still left tasks that need to be allocated when min available < replicas, put the job back and set pending tasks.
if tasksQueue != nil {
jobs.Push(job)
pendingTasks[job.UID] = tasksQueue
}
} else {
stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
// There are still left tasks that need to be allocated when min available < replicas, put the job back
if tasks.Len() > 0 {
jobs.Push(job)
}
}

if stmt != nil {
stmt.Commit()
}

// Put back the queue to priority queue after job's resource allocating finished,
// To ensure that the priority of the queue is calculated based on the latest resource allocation situation.
queues.Push(queue)
}
}

func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQueue, job *api.JobInfo, jobs *util.PriorityQueue, queue *api.QueueInfo, allNodes []*api.NodeInfo) {
func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQueue, job *api.JobInfo, queue *api.QueueInfo, highestAllowedTier int) (*framework.Statement, *util.PriorityQueue) {
jobStmtsByTier := make(map[int]map[string]*framework.Statement)
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
if index+1 > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
}
if len(jobStmtsByTier) > 0 {
klog.V(4).InfoS("Skip search for higher tier cause has found a suitable one", "tier", tier)
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
continue
}

// Clone tasks queue and rest job's fit err to make sure it's a clean cache when everytime filter a hyperNode and do not affect each other between hyperNodes.
tasksQueue := tasks.Clone()
job.ResetFitErr()
klog.V(3).InfoS("Try to allocate resource for job in hyperNode", "jobName", job.UID, "hyperNodeName", hyperNodeName, "tier", tier)
stmt := alloc.allocateResourcesForTasks(tasksQueue, job, queue, nodes, hyperNodeName)
if stmt == nil {
klog.V(4).InfoS("Cannot allocate resources for job with network topology constrains", "jobName", job.UID, "hyperNodeName", hyperNodeName, "tier", tier)
continue
}

// Find an available hyperNode.
if _, ok = jobStmtsByTier[tier]; !ok {
jobStmtsByTier[tier] = make(map[string]*framework.Statement)
}
selectedTier = tier
// Just cache the allocation result because we haven't chosen the best hyperNode.
jobStmtsByTier[tier][hyperNodeName] = stmt.SaveOperations()
// Rollback current statement and try next hyperNode.
stmt.Discard()

// If there are still unallocated tasks in the task queue, return and continue scheduling later.
if tasksQueue.Len() > 0 {
hyperNodesWithLeftTasks[hyperNodeName] = tasksQueue
}
}
}

if len(jobStmtsByTier) > 0 {
hyperNodes := make([]string, 0, len(jobStmtsByTier[selectedTier]))
for hyperNodeName := range jobStmtsByTier[selectedTier] {
hyperNodes = append(hyperNodes, hyperNodeName)
}
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

// selectBestStmt return a stmt and best hyperNode related to the stmt, it will
// score and select the best hyperNode among all available hyperNodes.
func (alloc *Action) selectBestHyperNode(jobStmts map[string]*framework.Statement, job *api.JobInfo) (*framework.Statement, string) {
var bestStmt *framework.Statement
bestHyperNodeName := ""
ssn := alloc.session

switch {
case len(jobStmts) == 0:
klog.V(3).InfoS("Failed to allocate resource for job, no available hyperNode is under highest allowed tier", "jobName", job.UID)
return nil, bestHyperNodeName
case len(jobStmts) == 1:
for hyperNodeName, stmt := range jobStmts {
bestStmt = stmt
bestHyperNodeName = hyperNodeName
break
}
case len(jobStmts) > 1:
candidateHyperNodeGroups := make(map[string][]*api.NodeInfo)
for hyperNodeName := range jobStmts {
candidateHyperNodeGroups[hyperNodeName] = ssn.HyperNodes[hyperNodeName]
}

hyperNodeScores, err := util.PrioritizeHyperNodes(candidateHyperNodeGroups, job, ssn.HyperNodeOrderMapFn)
if err != nil {
klog.V(3).ErrorS(err, "Failed to allocate resource for job", "jobName", job.UID)
return nil, bestHyperNodeName
}

bestHyperNodeName = util.SelectBestHyperNode(hyperNodeScores)

var exists bool
bestStmt, exists = jobStmts[bestHyperNodeName]
if !exists {
klog.ErrorS(nil, "Couldn't find best hyperNode in statements", "jobName", job.UID, "hyperNode", bestHyperNodeName)
return nil, bestHyperNodeName
}
}

// Recover the stmt and return.
if bestStmt == nil || bestHyperNodeName == "" {
return nil, bestHyperNodeName
}
finalStmt := framework.NewStatement(ssn)
err := finalStmt.RecoverOperations(bestStmt)
if err != nil {
klog.ErrorS(err, "Failed to recover operations", "jobName", job.UID, "hyperNode", bestHyperNodeName)
return nil, bestHyperNodeName
}
klog.V(3).InfoS("Allocate job to hyperNode", "jobName", job.UID, "hyperNode", bestHyperNodeName)
return finalStmt, bestHyperNodeName
}

func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *api.JobInfo, jobs *util.PriorityQueue, queue *api.QueueInfo, allNodes []*api.NodeInfo) {
func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *api.JobInfo, queue *api.QueueInfo, allNodes []*api.NodeInfo, hyperNode string) *framework.Statement {
ssn := alloc.session
stmt := framework.NewStatement(ssn)
ph := util.NewPredicateHelper()

for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)

if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
continue
Expand Down Expand Up @@ -280,54 +427,56 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, bestNode.Name)
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, bestNode.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
if err := stmt.Pipeline(task, bestNode.Name, false); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, err)
if rollbackErr := stmt.UnPipeline(task); rollbackErr != nil {
klog.Errorf("Failed to unpipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
}
}
alloc.allocateResourcesForTask(stmt, task, bestNode, job)

if ssn.JobReady(job) && !tasks.Empty() {
jobs.Push(job)
break
}
}

if ssn.JobReady(job) {
stmt.Commit()
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
return stmt
} else {
if !ssn.JobPipelined(job) {
stmt.Discard()
}
return nil
}
}

func (alloc *Action) allocateResourcesForTask(stmt *framework.Statement, task *api.TaskInfo, node *api.NodeInfo, job *api.JobInfo) {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, alloc.session.UID, err)
if rollbackErr := stmt.UnAllocate(task); rollbackErr != nil {
klog.Errorf("Failed to unallocate Task %v on %v in Session %v for %v.",
task.UID, node.Name, alloc.session.UID, rollbackErr)
}
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
return
}

klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name, false); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, alloc.session.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
}
}

Expand Down
Loading