-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Create a new Agent scheduler for Agent workload fast scheduling #4804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
volcano-sh-bot
merged 8 commits into
volcano-sh:master
from
qi-min:agent-scheduler-development
Dec 22, 2025
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
4e771a9
agent scheduler: create nodeshard CRD
0YHR0 c2a01b1
agent scheduler: setup agent scheduler application
handan-yxh 2535030
agent scheduler: update build and deploy scripts to support build and…
qi-min e541a0f
agent scheduler: setup scheduler framework
JesseStutler eb01ce6
agent scheduler: support clone nodes from cache to snapshot in framework
handan-yxh f353059
agent scheduler: add action and support multi worker scheduling
qi-min 603f68e
agent-scheduler: updated based on comments
qi-min 6363d12
agent-scheduler: updated based on 2nd round review comments
JesseStutler File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,204 @@ | ||
| /* | ||
| Copyright 2017 The Kubernetes Authors. | ||
| Copyright 2025 The Volcano Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package options | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "os" | ||
| "time" | ||
|
|
||
| "github.com/spf13/pflag" | ||
| "k8s.io/apimachinery/pkg/util/validation/field" | ||
| "k8s.io/component-base/config" | ||
| componentbaseconfigvalidation "k8s.io/component-base/config/validation" | ||
|
|
||
| voptions "volcano.sh/volcano/cmd/scheduler/app/options" | ||
| "volcano.sh/volcano/pkg/kube" | ||
| ) | ||
|
|
||
| const ( | ||
| agentSchedulerName = "agent-scheduler" | ||
| defaultSchedulerPeriod = time.Second | ||
| defaultResyncPeriod = 0 | ||
| defaultQueue = "default" | ||
| defaultListenAddress = ":8080" | ||
| defaultHealthzAddress = ":11251" | ||
| defaultPluginsDir = "" | ||
|
|
||
| defaultQPS = 2000.0 | ||
| defaultBurst = 2000 | ||
|
|
||
| // Default parameters to control the number of feasible nodes to find and score | ||
| defaultMinPercentageOfNodesToFind = 5 | ||
| defaultMinNodesToFind = 100 | ||
| defaultPercentageOfNodesToFind = 0 | ||
| defaultLockObjectNamespace = "volcano-system" | ||
| defaultNodeWorkers = 20 | ||
| defaultScheduleWorkerCount = 1 | ||
| ) | ||
|
|
||
| // ServerOption is the main context object for the controller manager. | ||
| type ServerOption struct { | ||
| KubeClientOptions kube.ClientOptions | ||
| CertFile string | ||
| KeyFile string | ||
| CaCertFile string | ||
| CertData []byte | ||
| KeyData []byte | ||
| CaCertData []byte | ||
| SchedulerName string | ||
| SchedulerConf string | ||
| ResyncPeriod time.Duration | ||
| // leaderElection defines the configuration of leader election. | ||
| LeaderElection config.LeaderElectionConfiguration | ||
| // Deprecated: use ResourceNamespace instead. | ||
| LockObjectNamespace string | ||
| PrintVersion bool | ||
| EnableMetrics bool | ||
| ListenAddress string | ||
| EnableCSIStorage bool | ||
| EnableHealthz bool | ||
| // HealthzBindAddress is the IP address and port for the health check server to serve on | ||
| // defaulting to :11251 | ||
| HealthzBindAddress string | ||
| // Parameters for scheduling tuning: the number of feasible nodes to find and score | ||
| MinNodesToFind int32 | ||
| MinPercentageOfNodesToFind int32 | ||
| PercentageOfNodesToFind int32 | ||
|
|
||
| NodeSelector []string | ||
| CacheDumpFileDir string | ||
| EnableCacheDumper bool | ||
| NodeWorkerThreads uint32 | ||
|
|
||
| // DisableDefaultSchedulerConfig indicates if the scheduler should fallback to default | ||
| // config if the current scheduler config is invalid | ||
| DisableDefaultSchedulerConfig bool | ||
|
|
||
| //Count of workers for scheduling | ||
| ScheduleWorkerCount uint32 | ||
| } | ||
|
|
||
| // DecryptFunc is custom function to parse ca file | ||
| type DecryptFunc func(c *ServerOption) error | ||
|
|
||
| // ServerOpts server options. | ||
| var ServerOpts *ServerOption | ||
|
|
||
| // NewServerOption creates a new CMServer with a default config. | ||
| func NewServerOption() *ServerOption { | ||
| return &ServerOption{} | ||
| } | ||
|
|
||
| // AddFlags adds flags for a specific CMServer to the specified FlagSet. | ||
| func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { | ||
| fs.StringVar(&s.KubeClientOptions.Master, "master", s.KubeClientOptions.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") | ||
| fs.StringVar(&s.KubeClientOptions.KubeConfig, "kubeconfig", s.KubeClientOptions.KubeConfig, "Path to kubeconfig file with authorization and master location information") | ||
| fs.StringVar(&s.CaCertFile, "ca-cert-file", s.CaCertFile, "File containing the x509 Certificate for HTTPS.") | ||
| fs.StringVar(&s.CertFile, "tls-cert-file", s.CertFile, ""+ | ||
| "File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+ | ||
| "after server cert).") | ||
| fs.StringVar(&s.KeyFile, "tls-private-key-file", s.KeyFile, "File containing the default x509 private key matching --tls-cert-file.") | ||
| fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", "", "Define the namespace of the lock object; it is volcano-system by default.") | ||
| fs.MarkDeprecated("lock-object-namespace", "This flag is deprecated and will be removed in a future release. Please use --leader-elect-resource-namespace instead.") | ||
| // volcano scheduler will ignore pods with scheduler names other than specified with the option | ||
| fs.StringVar(&s.SchedulerName, "scheduler-name", agentSchedulerName, "vc-agent-scheduler will handle pods whose .spec.SchedulerName is same as scheduler-name") | ||
| fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file") | ||
| fs.DurationVar(&s.ResyncPeriod, "resync-period", defaultResyncPeriod, "The default resync period for k8s native informer factory") | ||
| fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") | ||
| fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") | ||
| fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") | ||
| fs.Float32Var(&s.KubeClientOptions.QPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver") | ||
| fs.IntVar(&s.KubeClientOptions.Burst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver") | ||
|
|
||
| // Minimum number of feasible nodes to find and score | ||
| fs.Int32Var(&s.MinNodesToFind, "minimum-feasible-nodes", defaultMinNodesToFind, "The minimum number of feasible nodes to find and score") | ||
|
|
||
| // Minimum percentage of nodes to find and score | ||
| fs.Int32Var(&s.MinPercentageOfNodesToFind, "minimum-percentage-nodes-to-find", defaultMinPercentageOfNodesToFind, "The minimum percentage of nodes to find and score") | ||
|
|
||
| // The percentage of nodes that would be scored in each scheduling cycle; if <= 0, an adaptive percentage will be calculated | ||
| fs.Int32Var(&s.PercentageOfNodesToFind, "percentage-nodes-to-find", defaultPercentageOfNodesToFind, "The percentage of nodes to find and score, if <=0 will be calculated based on the cluster size") | ||
|
|
||
| fs.BoolVar(&s.EnableCSIStorage, "csi-storage", false, | ||
| "Enable tracking of available storage capacity that CSI drivers provide; it is false by default") | ||
| fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") | ||
| fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default") | ||
| fs.StringSliceVar(&s.NodeSelector, "node-selector", nil, "volcano only work with the labeled node, like: --node-selector=volcano.sh/role:train --node-selector=volcano.sh/role:serving") | ||
| fs.BoolVar(&s.EnableCacheDumper, "cache-dumper", true, "Enable the cache dumper, it's true by default") | ||
| fs.StringVar(&s.CacheDumpFileDir, "cache-dump-dir", "/tmp", "The target dir where the json file put at when dump cache info to json file") | ||
| fs.Uint32Var(&s.NodeWorkerThreads, "node-worker-threads", defaultNodeWorkers, "The number of threads syncing node operations.") | ||
| fs.BoolVar(&s.DisableDefaultSchedulerConfig, "disable-default-scheduler-config", false, "The flag indicates whether the scheduler should avoid using the default configuration if the provided scheduler configuration is invalid.") | ||
| fs.Uint32Var(&s.ScheduleWorkerCount, "worker-count", defaultScheduleWorkerCount, "The flag indicates the number of worker threads for scheduling.") | ||
| } | ||
|
|
||
| // CheckOptionOrDie check leader election flag when LeaderElection is enabled. | ||
| func (s *ServerOption) CheckOptionOrDie() error { | ||
| return componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate() | ||
| } | ||
|
|
||
| // RegisterOptions registers options. | ||
| func (s *ServerOption) RegisterOptions() { | ||
| ServerOpts = s | ||
| //some package from scheduler pkg rely on options defined in scheduler pkg | ||
| voptions.ServerOpts = voptions.NewServerOption() | ||
| } | ||
|
|
||
| // readCAFiles read data from ca file path | ||
| func (s *ServerOption) readCAFiles() error { | ||
| var err error | ||
|
|
||
| s.CaCertData, err = os.ReadFile(s.CaCertFile) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read cacert file (%s): %v", s.CaCertFile, err) | ||
| } | ||
|
|
||
| s.CertData, err = os.ReadFile(s.CertFile) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read cert file (%s): %v", s.CertFile, err) | ||
| } | ||
|
|
||
| s.KeyData, err = os.ReadFile(s.KeyFile) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read key file (%s): %v", s.KeyFile, err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // ParseCAFiles parse ca file by decryptFunc | ||
| func (s *ServerOption) ParseCAFiles(decryptFunc DecryptFunc) error { | ||
| if err := s.readCAFiles(); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // users can add one function to decrypt tha data by their own way if CA data is encrypted | ||
| if decryptFunc != nil { | ||
| return decryptFunc(s) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Default new and registry a default one | ||
| func Default() *ServerOption { | ||
| s := NewServerOption() | ||
| s.AddFlags(pflag.CommandLine) | ||
| s.RegisterOptions() | ||
| return s | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| /* | ||
| Copyright 2019 The Kubernetes Authors. | ||
| Copyright 2025 The Volcano Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package options | ||
|
|
||
| import ( | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/spf13/pflag" | ||
| "github.com/stretchr/testify/assert" | ||
| "k8s.io/apimachinery/pkg/api/equality" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
| "k8s.io/client-go/tools/leaderelection/resourcelock" | ||
| "k8s.io/component-base/config" | ||
| componentbaseoptions "k8s.io/component-base/config/options" | ||
| "k8s.io/component-base/featuregate" | ||
|
|
||
| "volcano.sh/volcano/pkg/features" | ||
| "volcano.sh/volcano/pkg/kube" | ||
| commonutil "volcano.sh/volcano/pkg/util" | ||
| ) | ||
|
|
||
| func TestAddFlags(t *testing.T) { | ||
| fs := pflag.NewFlagSet("addflagstest", pflag.ExitOnError) | ||
| s := NewServerOption() | ||
| commonutil.LeaderElectionDefault(&s.LeaderElection) | ||
| componentbaseoptions.BindLeaderElectionFlags(&s.LeaderElection, fs) | ||
| s.AddFlags(fs) | ||
| utilfeature.DefaultMutableFeatureGate.AddFlag(fs) | ||
|
|
||
| args := []string{ | ||
| "--resync-period=0", | ||
| "--cache-dumper=false", | ||
| "--leader-elect-lease-duration=60s", | ||
| "--leader-elect-renew-deadline=20s", | ||
| "--leader-elect-retry-period=10s", | ||
| "--feature-gates=PodDisruptionBudgetsSupport=false,VolcanoJobSupport=true", | ||
| } | ||
| fs.Parse(args) | ||
|
|
||
| // This is a snapshot of expected options parsed by args. | ||
| expected := &ServerOption{ | ||
| SchedulerName: agentSchedulerName, | ||
| ResyncPeriod: 0, | ||
| LeaderElection: config.LeaderElectionConfiguration{ | ||
| LeaderElect: true, | ||
| LeaseDuration: metav1.Duration{Duration: 60 * time.Second}, | ||
| RenewDeadline: metav1.Duration{Duration: 20 * time.Second}, | ||
| RetryPeriod: metav1.Duration{Duration: 10 * time.Second}, | ||
| ResourceLock: resourcelock.LeasesResourceLock, | ||
| ResourceNamespace: defaultLockObjectNamespace, | ||
| }, | ||
| ListenAddress: defaultListenAddress, | ||
| KubeClientOptions: kube.ClientOptions{ | ||
| Master: "", | ||
| KubeConfig: "", | ||
| QPS: defaultQPS, | ||
| Burst: defaultBurst, | ||
| }, | ||
| HealthzBindAddress: ":11251", | ||
| MinNodesToFind: defaultMinNodesToFind, | ||
| MinPercentageOfNodesToFind: defaultMinPercentageOfNodesToFind, | ||
| PercentageOfNodesToFind: defaultPercentageOfNodesToFind, | ||
| NodeWorkerThreads: defaultNodeWorkers, | ||
| CacheDumpFileDir: "/tmp", | ||
| DisableDefaultSchedulerConfig: false, | ||
| ScheduleWorkerCount: defaultScheduleWorkerCount, | ||
| } | ||
| expectedFeatureGates := map[featuregate.Feature]bool{ | ||
| features.PodDisruptionBudgetsSupport: false, | ||
| features.VolcanoJobSupport: true, | ||
| } | ||
|
|
||
| if !equality.Semantic.DeepEqual(expected, s) { | ||
| t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected) | ||
| } | ||
| for k, v := range expectedFeatureGates { | ||
| assert.Equal(t, v, utilfeature.DefaultFeatureGate.Enabled(k)) | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly copied from scheduler and
RegisterOptionshas some coupling with scheduler. setServerOpts.EnableCSIStorageCould we eliminate that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the plugins in batch scheduler depend on options.ServerOpts, but ServerOpts is a global variable at cmd/scheduler /app/options level. So it is not inited with flag values when agent scheduler startup.
RegisterOptionspass options tooptions.ServerOptsso that it can pick up value from flags of agent scheduler.In next pr for sharing, the logic is changed to pass all options of agented scheduler to batch scheduler, no need to specially specify
EnableCSIStoragevoptions.ServerOpts = ServerOpts.ServerOption