-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add network-topology-aware plugin and hyperNode score callback #3894
base: network-topology
Are you sure you want to change the base?
Conversation
Welcome @ecosysbin! |
/assign @Monokaix |
2f15078
to
947107e
Compare
pkg/scheduler/api/job_info.go
Outdated
|
||
// The RootHypernode property of a job is the hypernode that serves as the smallest root in the hypernode tree. | ||
// A job has multiple tasks, each belonging to a hypernode. This RootHypernode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job. | ||
RootHyperNode string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is RootHyperNode
assigned? I didn't find any codes assign the value to RootHyperNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when the task of job commit, the hyperNode of the task will assigned to job. it will add the code later.
pkg/scheduler/api/types.go
Outdated
@@ -300,6 +301,9 @@ type NodeReduceFn func(*TaskInfo, k8sframework.NodeScoreList) error | |||
// NodeOrderMapFn is the func declaration used to get priority score of all plugins for a node for a particular task. | |||
type NodeOrderMapFn func(*TaskInfo, *NodeInfo) (map[string]float64, float64, error) | |||
|
|||
// HyperNodeOrderFn is the func declaration used to score hyperNodes for job. | |||
type HyperNodeOrderFn func(*JobInfo, map[string][]*NodeInfo, []int, map[int][]string, map[string]sets.Set[string]) (map[string]float64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need to rebase this pr and factor your code:https://github.com/volcano-sh/volcano/pull/3874/files, the HyperNodeOrderFn structure has changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. l will do it later.
pkg/scheduler/framework/session.go
Outdated
// hyperNode can gain a better performance, the lower the tier of hyperNode, the better performance. | ||
HyperNodesListByTier map[int][]string | ||
|
||
HyperNodesTiers []int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, there are no so many fields now, may need to rebase and refactor: https://github.com/volcano-sh/volcano/pull/3874/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. According to the discussion, there will have a hyperNode tree here. it will do in another PR.
|
||
// Goals: | ||
// - The tier to which the rootHypernode of a job belongs should be as low as possible. | ||
func networkTopologyAwareScore(hyperNode string, job *api.JobInfo, hyperNodesTiers []int, hyperNodesListByTier map[int][]string, hyperNodesMap map[string]sets.Set[string]) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn’t get the point for this score func. Does this have a design plan or a picture? After refactoring based on this PR: https://github.com/volcano-sh/volcano/pull/3874/files, could you add more details or description about how to score hypernodes in your pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explanation:
The RootHypernode property of a job is the hypernode that serves as the smallest root in the hypernode
tree.
A job has multiple tasks, each belonging to a hypernode. This RootHypernode is the topmost and lowest common ancestor among the hypernodes of all tasks within the job.
Goals:
The tier to which the rootHypernode of a job belongs should be as low as possible.
I will change the name 'RootHypernode' to 'LCAHyperNode' later.
d1da751
to
414e547
Compare
414e547
to
5036976
Compare
889afed
to
b9a291b
Compare
// - Tasks under a job should be scheduled to one hyperNode as much as possible. | ||
func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 { | ||
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] | ||
// job fist first scheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //job is scheduled for the first time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// HyperNodeTree is the hypernode tree of all hypernodes in the cluster. | ||
// currentJobLCAHyperNode is the hypernode of the job's LCAHyperNode. | ||
var ( | ||
HyperNodeTree []map[string][]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to maintain a separate global variable here? And this data structure does not seem to be called HyperNodeTree, it does not reflect hierarchical relationships.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l have move the hyperNodeTree to session, According to the discussion, this variable will be replaced by Gu Peng's HyperNodeTree later.
return taskCount | ||
} | ||
|
||
// FindOutRootHyperNode find out the root hypernode of the job when the hypernode join the job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //FindLCAHyperNode finds out the common ancestor of the current hypernode and the hypernode where the job is scheduled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
func TestFindLCAHyperNode(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of the use cases specify currentJobLCAHyperNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
hyperNodesMap := make(map[string]sets.Set[string]) | ||
for i := 0; i < len(revertHyperNodeTree); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract a public function, has the same code in FindLCAHyperNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// PluginName indicates name of volcano scheduler plugin. | ||
PluginName = "networktopologyaware" | ||
BaseScore = 100 | ||
TaskBaseScore = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 10 points reasonable? need to discuss /cc @Monokaix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To ensure that the scores for the task num are distributed between 0 and 10, so that the scores for the tier level will be higher than those for the task num.
pkg/scheduler/framework/session.go
Outdated
@@ -127,8 +126,10 @@ func openSession(cache cache.Cache) *Session { | |||
|
|||
TotalResource: api.EmptyResource(), | |||
TotalGuarantee: api.EmptyResource(), | |||
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, | |||
|
|||
PodGroupCache: api.PodGroupCache{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to rename as PodGroupOldState
, which is used to record some old PodGroup data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/scheduler/cache/cache.go
Outdated
job.PodGroup = pg | ||
} | ||
|
||
sc.Jobs[job.UID] = job |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to update here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/scheduler/cache/cache.go
Outdated
@@ -1536,9 +1537,12 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b | |||
if err != nil { | |||
return nil, err | |||
} | |||
sc.Mutex.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no need to add lock? session --> UpdateJobStatus is still in the same goroutine, and there is no other goroutine will read or write this map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the discussion, locks need to be added for all operations on the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is to update podgroup to apiserver. If you just want to update cache, should not be here .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, l need to update podgroup to apiserver.
@@ -245,6 +247,14 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu | |||
break | |||
} | |||
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] { | |||
if jobHyperNode == "" { | |||
// job first scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> //job is scheduled for the first time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
2e8f63b
to
a82f026
Compare
2f44b62
to
325f845
Compare
c9c6f67
to
6cd178c
Compare
6cd178c
to
2e934b2
Compare
return nodeScores, nil | ||
} | ||
|
||
ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AddHyperNodeOrederFn
has a typo, it should be AddHyperNodeOrderFn
, could you also help to fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad to do it,done.
pkg/scheduler/framework/session.go
Outdated
@@ -202,6 +204,7 @@ func openSession(cache cache.Cache) *Session { | |||
ssn.NodeList = util.GetNodeList(snapshot.Nodes, snapshot.NodeList) | |||
ssn.HyperNodes = snapshot.HyperNodes | |||
ssn.HyperNodesSetByTier = snapshot.HyperNodesSetByTier | |||
parseHyperNodesTiers(ssn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to set parseHyperNodesTiers
as one of the session's methods? This seems to be semantically consistent, like ssn.parseHyperNodesTiers()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree,done
klog.V(5).Infof("Leaving networkTopologyAware plugin ...") | ||
}() | ||
|
||
weight := calculateWeight(nta.pluginArguments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to set weight in New
method of networktopologyawareplugin, and set weight as one of the fields of networktopologyawareplugin, just as the binpack plugin did:
volcano/pkg/scheduler/plugins/binpack/binpack.go
Lines 89 to 92 in bff75bb
func New(aruguments framework.Arguments) framework.Plugin { | |
weight := calculateWeight(aruguments) | |
return &binpackPlugin{weight: weight} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done.
return taskNumScore | ||
} | ||
|
||
func scoreHyperNodeWithTier(tier int, minTier int, maxTier int) float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not take this func as one of networkTopologyAwarePlugin's methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I have done it
return PluginName | ||
} | ||
|
||
func calculateWeight(args framework.Arguments) int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an example conf to show users what does the weight means and how to configure the weight, just as binpack did:
volcano/pkg/scheduler/plugins/binpack/binpack.go
Lines 95 to 110 in bff75bb
/* | |
User Should give priorityWeight in this format(binpack.weight, binpack.cpu, binpack.memory). | |
Support change the weight about cpu, memory and additional resource by arguments. | |
actions: "enqueue, reclaim, allocate, backfill, preempt" | |
tiers: | |
- plugins: | |
- name: binpack | |
arguments: | |
binpack.weight: 10 | |
binpack.cpu: 5 | |
binpack.memory: 1 | |
binpack.resources: nvidia.com/gpu, example.com/foo | |
binpack.resources.nvidia.com/gpu: 2 | |
binpack.resources.example.com/foo: 3 | |
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, done
} | ||
// Calculate score based on the number of tasks scheduled for the job when max score of hyperNode has more than one. | ||
if maxScore != ZeroScore && len(scoreHyperNode[maxScore]) > 1 { | ||
reScoreHyperNodes := scoreHyperNode[maxScore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name reScoreHyperNodes
is strange, better to rename as candicates
or candidateHyperNodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,done
return hyperNodeScores, nil | ||
} | ||
|
||
nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this nodeFn added for soft mode? Doesn’t the hard mode count twice? The HyperNode score is counted once, and the Node score is counted again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have modify it. you can check again.
} | ||
// The job still has remaining tasks to be scheduled, calculate score based on LCAHyperNode tier. | ||
maxScore := ZeroScore | ||
scoreNodes := map[float64][]string{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same to rename scoreNodes
and reScoreNodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
nodeScores := make(map[string]float64) | ||
taskJob := ssn.Jobs[task.Job] | ||
|
||
jobAllocatedHyperNode := taskJob.PodGroup.GetAnnotations()[api.JobAllocatedHyperNode] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In soft mode, we still lack the update of this annotation. For example, if a job is scheduled for the first time, then when the for loop traverses each pod in the job, if the pod has been allocated previously, then our subsequent pod scheduling should be scored based on the number of pods in the hypernode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I have modify it, you can check again.
} | ||
} | ||
|
||
func TestNetworkTopologyAwareHyperNodeScore(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need to add testcases of hypernode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, done.
c0711fc
to
2fa0ab7
Compare
Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com>
2fa0ab7
to
d1b5a92
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Overview
This PR adds a new network-topology-aware scheduling plugin with a hyperNode score callback and integrates it into the Volcano scheduler. Key changes include:
- Introduction of the networktopologyaware plugin in pkg/scheduler/plugins/networktopologyaware.
- Updates and additions in scheduler utilities and tests to support network topology and hyperNode operations.
- Modifications in the allocation and session code to incorporate hyperNode-based scheduling and scoring.
Reviewed Changes
File | Description |
---|---|
pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go | New plugin implementation with network topology scoring functions. |
pkg/scheduler/util/scheduler_helper_test.go | New tests for hyperNode matching and task counts. |
pkg/scheduler/util/scheduler_helper.go | Consolidation of hyperNode helper functions. |
pkg/scheduler/actions/allocate/allocate.go | Updates to use session’s hyperNodesTiers and hyperNode selection logic. |
pkg/scheduler/framework/session.go | Added parsing of hyper node tiers into the session object. |
pkg/scheduler/conf/scheduler_conf.go | New configuration options for network topology. |
pkg/scheduler/api/job_info.go | Added JobAllocatedHyperNode field in job annotations. |
pkg/scheduler/framework/session_plugins.go | Fixed a typographical error in the hyperNode order function name. |
pkg/scheduler/uthelper/helper.go | Updated test common structure to include RealNodesList. |
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes) | ||
if bestNode == nil { | ||
continue | ||
} | ||
|
||
if hyperNode == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable 'hyperNode' is used without being declared or initialized. Please declare it before this block, for example initializing it as an empty string.
Copilot is powered by AI, so mistakes are possible. Review output carefully before use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'hyperNode' is a function parameter.
@@ -220,6 +223,20 @@ func openSession(cache cache.Cache) *Session { | |||
return ssn | |||
} | |||
|
|||
func (ssn *Session) parseHyperNodesTiers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move to ssn?
@@ -0,0 +1,224 @@ | |||
/* | |||
Copyright 2019 The Volcano Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2019->2025
@@ -0,0 +1,224 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
network-topology-aware is better.
What type of PR is this?
/kind feature
/area scheduling
What this PR does / why we need it:
Add network-topology-aware plugin and hyperNode score callback
Which issue(s) this PR fixes:
#3885