-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Create a new Agent scheduler for Agent workload fast scheduling #4804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a new Agent scheduler for Agent workload fast scheduling #4804
Conversation
Summary of ChangesHello @qi-min, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a specialized Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new agent scheduler designed for fast-path scheduling of workloads like AI agents. This is a significant feature addition that includes a new scheduler binary, associated CRDs like NodeShard, and a conflict-aware binding mechanism to improve scheduling reliability. The implementation thoughtfully reuses and adapts components from the existing Volcano scheduler framework. Additionally, the PR refactors the predicates and nodeorder plugins to better align with the upstream Kubernetes scheduler framework, which is a commendable improvement for long-term maintainability. My review identified a couple of minor issues, including an ungraceful panic on initialization error and a minor copy-paste error in a file header. Overall, this is a well-structured and substantial contribution towards enhancing Volcano's scheduling capabilities.
91044dc to
4506d5e
Compare
cmd/agent-scheduler/app/server.go
Outdated
| "volcano.sh/apis/pkg/apis/helpers" | ||
| "volcano.sh/volcano/cmd/agent-scheduler/app/options" | ||
| scheduler "volcano.sh/volcano/pkg/agentscheduler" | ||
| "volcano.sh/volcano/pkg/agentscheduler/metrics" | ||
| "volcano.sh/volcano/pkg/kube" | ||
| "volcano.sh/volcano/pkg/signals" | ||
| commonutil "volcano.sh/volcano/pkg/util" | ||
|
|
||
| v1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/util/uuid" | ||
| clientset "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/kubernetes/scheme" | ||
| corev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
| "k8s.io/klog/v2" | ||
|
|
||
| utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
|
|
||
| restclient "k8s.io/client-go/rest" | ||
| "k8s.io/client-go/tools/leaderelection" | ||
| "k8s.io/client-go/tools/leaderelection/resourcelock" | ||
| "k8s.io/client-go/tools/record" | ||
| basecompatibility "k8s.io/component-base/compatibility" | ||
|
|
||
| // Register rest client metrics | ||
| _ "k8s.io/component-base/metrics/prometheus/restclient" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: first go std lib and seconde group thirdparty pkg together, and at last group volcano pkg together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I changed the order
| ) | ||
|
|
||
| type nodeOrderPlugin struct { | ||
| type NodeOrderPlugin struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just a refactor or any other changes, can you point it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For changes under scheduler/plugins are just refactor, like abstract functions, so that it can be used in both batch scheduler and agent scheduler.
Modification on line 69 is to allow this plugin resued in agent-scheduler package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems a refactor, if there are any other change, please point it to simplify review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change in predicate is redactor, to allow the functions can be used in both agent-scheduler and batch scheduler
| node, found := nodeMap[nodeName] | ||
| if !found { | ||
| klog.Errorf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) | ||
| node, err := pp.Handle.SnapshotSharedLister().NodeInfos().Get(nodeName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about this change, previously it get nodes from the session, now it get from the informer list.
Will it bring some caveats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/qi-min/volcano/blob/5e134b9d039065acea682f15132a49f3f05d16fe/pkg/scheduler/plugins/predicates/predicates.go#L190-L191 Actually we still use the nodemap in session here, there's a judgment here in NewFramework:
https://github.com/qi-min/volcano/blob/5e134b9d039065acea682f15132a49f3f05d16fe/pkg/scheduler/plugins/util/k8s/framework.go#L223-L227
so the logic of getting the nodemap from the session hasn't changed
pkg/agentscheduler/scheduler.go
Outdated
| } | ||
| } | ||
|
|
||
| cache := schedcache.New(config, opt.SchedulerNames, opt.NodeSelector, opt.NodeWorkerThreads, opt.ResyncPeriod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we donot need multiple SchedulerNames
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, changed it to use single name
pkg/agentscheduler/scheduler.go
Outdated
| worker := &Worker{} | ||
| worker.framework = framework.NewFramework(sched.actions, sched.tiers, sched.cache, sched.configurations) | ||
| index := i | ||
| go wait.Until(func() { worker.runOnce(index) }, 0, stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there are no any pods to schedule, wouldn't it run into deadloop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When no pod to schedule, runOnce will be blocked at Popping pod from queue until new Pending pod enqueue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the queue is blocking read, that's ok
| task := scheduleResult.SchedCtx.Task | ||
| // 1. Check conflict | ||
| node := binder.FindNonConflictingNode(scheduleResult) | ||
| if node == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many pods are scheduled once, when could node == nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One pod is scheduled in each worker at once. The scheduleResult(candicate nodes, default is 3) are sent to binder from worker. FindNonConflictingNode check whether theses nodes confilct with previous binding. If the version of node is not used in previsous binding, node will be used for binding. Otherwise node will be rejected. Nil is returned if all candidates nodes are rejected.
E.g. candidate nodes are n1(v1), n2(v1), n3(v2). If nodes with these versions are used in previous binding, then all candidates will be dropped and nil returned. If previsous binding on n3 is at version v1, then n3 will be used.
| default: | ||
| } | ||
| if len(binder.BindCheckChannel) == 0 { | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also wondering the cpu usage of not any pod need to be binded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function is updated, it will be blocked if no result sent to channel
| return | ||
| } | ||
|
|
||
| binder.schedulingQueue.Done(task.Pod.UID) // Mark previous processing as done to clean in-flight records, avoid memory leak |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done should be called in pair with Pop, otherwise it is very easy to miss calling Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although retries based on events are not yet implemented, if we call Done immediately after Pop,if scheduling fails, the pod will no longer be in the inflight list:
volcano/third_party/kubernetes/pkg/scheduler/backend/queue/active_queue.go
Lines 392 to 406 in 76c3253
| inFlightPod, ok := aq.inFlightPods[pInfo.Pod.UID] | |
| if !ok { | |
| return nil, fmt.Errorf("in flight Pod isn't found in the scheduling queue. If you see this error log, it's likely a bug in the scheduler") | |
| } | |
| var events []*clusterEvent | |
| for event := inFlightPod.Next(); event != nil; event = event.Next() { | |
| e, ok := event.Value.(*clusterEvent) | |
| if !ok { | |
| // Must be another in-flight Pod (*v1.Pod). Can be ignored. | |
| continue | |
| } | |
| events = append(events, e) | |
| } | |
| return events, nil |
Event changes that occur during the scheduling of this pod will prevent the pod from retrying quickly.
Currently, after Pop, Done is handled without omission. When scheduling fails, Done is called in
AddUnschedulableIfNotPresent, and Done is also called when CheckAndBind is reached after successful scheduling, so we didn't miss it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the different with pkg/scheduler/cache/cache.go. ducoment it and add a TODO: can we merge them or anstract the common parts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In agent scheduler, we used informers, nodeinfo, nodeList, Binder from scheduler cache and also add/modify several fields and function like confictAwareBinder, scheduling queue, etc. I will add a TODO to abstract common parts from agent/batch scheduler
cmd/agent-scheduler/main.go
Outdated
| _ "volcano.sh/volcano/pkg/agentscheduler/plugins" | ||
|
|
||
| // init assert | ||
| _ "volcano.sh/volcano/pkg/scheduler/util/assert" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove, this should only occur in test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| ) | ||
|
|
||
| // ServerOption is the main context object for the controller manager. | ||
| type ServerOption struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly copied from scheduler and RegisterOptions has some coupling with scheduler. set ServerOpts.EnableCSIStorage
Could we eliminate that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the plugins in batch scheduler depend on options.ServerOpts, but ServerOpts is a global variable at cmd/scheduler /app/options level. So it is not inited with flag values when agent scheduler startup. RegisterOptions pass options to options.ServerOpts so that it can pick up value from flags of agent scheduler.
In next pr for sharing, the logic is changed to pass all options of agented scheduler to batch scheduler, no need to specially specify EnableCSIStorage
voptions.ServerOpts = ServerOpts.ServerOption
|
|
||
| commonutil.LeaderElectionDefault(&s.LeaderElection) | ||
| s.LeaderElection.ResourceName = commonutil.GenerateComponentName(s.SchedulerNames) | ||
| componentbaseoptions.BindLeaderElectionFlags(&s.LeaderElection, fs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer moving the leader elector related to app.Run
| } | ||
|
|
||
| // Align default feature-gates with the connected cluster's version. | ||
| if err := setupComponentGlobals(config); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it is used to discover kube server version and set feature gate defaults to match the cluster
| return err | ||
| } | ||
|
|
||
| metrics.InitKubeSchedulerRelatedMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backoffqueue depend on the metrics, but i donot think we ever update them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the scheduling queue we imported in third-party path depend on the metrics, nil error is reported if metrics are not inited. I think we need a TODO decouple the kube metrics and record metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just found batch scheduler also call this InitKubeSchedulerRelatedMetrics to init metrics few weeks ago for #4731
| // Register registers or updates a binder for the given plugin name . The plugin can be such as preBinder or postBinder. | ||
| // It always overwrites the existing binder map to support plugin configuration updates | ||
| // during runtime, as plugins may be reconfigured without restarting the scheduler. | ||
| func (r *BinderRegistry) Register(name string, binder interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is not called, so volume binding check is not supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is called in OnPluginInit in predicate plugin
| OnActionInit(configurations []conf.Configuration) | ||
|
|
||
| // Initialize initializes the allocator plugins. | ||
| Initialize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hzxuzhonghu what need be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean OnActionInit and OnActionInit can be merged
| conf.EnabledActionMap[action.Name()] = true | ||
| } | ||
|
|
||
| schedCtx, err := worker.generateNextSchedulingContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add a for loop here instead of depending on wait.Until(func() { worker.runOnce(index) }, 0, stopCh) to do that in function granularity, which could be less efficient and produce confusing logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion, I will change it
pkg/agentscheduler/scheduler.go
Outdated
| worker := &Worker{} | ||
| worker.framework = framework.NewFramework(sched.actions, sched.tiers, sched.cache, sched.configurations) | ||
| index := i | ||
| go wait.Until(func() { worker.runOnce(index) }, 0, stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the queue is blocking read, that's ok
| SchedCtx: schedCtx, | ||
| BindContext: alloc.CreateBindContext(schedCtx), | ||
| } | ||
| alloc.EnqueueSchedulerResultForTask(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we need a more clear name, like sendToBinder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| agentapi "volcano.sh/volcano/pkg/agentscheduler/api" | ||
| "volcano.sh/volcano/pkg/agentscheduler/framework" | ||
| "volcano.sh/volcano/pkg/scheduler/api" | ||
| vcache "volcano.sh/volcano/pkg/scheduler/cache" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The agentscheduler replicates the implementations of cache and framework, but it also references the cache and framework under the scheduler directory here. What are the principles for distinguishing between the cache and framework components in these two directories?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some structure definition is reused from batch scheduler, that's why scheduler/cache is also imported. We need to abstract the common part of cache for batch/agent scheduler, so only common cache will be referenced in future. I will add a TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended to create an independent issue tracking system for optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| PrePredicateFns map[string]api.PrePredicateFn | ||
| NodeOrderFns map[string]api.NodeOrderFn | ||
| BatchNodeOrderFns map[string]api.BatchNodeOrderFn | ||
| NodeMapFns map[string]api.NodeMapFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the two callback functions NodeMapFns and NodeReduceFns have specific scenario requirements in the agent-scheduler context, or are they merely intended to align with the existing scheduling logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to align with current scheduling logic so that the existing plugins can be reused
|
|
||
| // Framework manages the scheduler plugins and their execution points. | ||
| type Framework struct { | ||
| *k8sutil.Framework // Embedding Framework to implement k8sframework.Handle interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The k8sutil package belongs to the common directory under scheduler/plugin, and it is currently intended to be referenced by multiple algorithm plugins. Is it appropriate for the current agentscheduler to reference it directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*k8sutil.Framework defined some new fields and functions that are also need in agent scheduler, that's why it is referenced
snapshot *Snapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a good way to depend on each other. Normally, plugins should be able to reference the cache and framework packages, but the framework should not depend on the plugin packages in return. It is recommended to register a separate issue for rectification and optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/agentscheduler/scheduler.go
Outdated
| sched.tiers = tiers | ||
| sched.configurations = configurations | ||
| sched.metricsConf = metricsConf | ||
| sched.mutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to use defer sched.mutex.Unlock() here, to avoid the risk of deadlock caused by the continuous addition of code in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Signed-off-by: Haoran <97868579@qq.com>
Signed-off-by: handan-yxh <yxh953167434@163.com>
… deploy Signed-off-by: qi-min <qim_34@163.com>
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
Signed-off-by: handan-yxh <yxh953167434@163.com>
Signed-off-by: qi-min <qim_34@163.com>
Signed-off-by: qi-min <qim_34@163.com>
394f15d to
f14eca0
Compare
|
@hzxuzhonghu @wangyang0616 review comments have been handled, would you please review again, thanks. |
f14eca0 to
74efa74
Compare
Signed-off-by: JesseStutler <chenzicong4@huawei.com>
74efa74 to
6363d12
Compare
|
/lgtm |
|
Please confirm the closure status of the review comments. @hzxuzhonghu |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hzxuzhonghu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What type of PR is this?
feature
What this PR does / why we need it:
Add a new scheduler to execute fast path scheduling for workload like AI agent. The Scheduler is focus fast scheduling and scheduling based on Sharding. This PR include basic function to schedule Agent.
For more details please refer to: proposal | design doc
Which issue(s) this PR fixes:
Fixes #
#4722
Special notes for your reviewer:
Does this PR introduce a user-facing change?