diff --git a/go.mod b/go.mod index 03ccc2651e..54a96a69c8 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/onsi/ginkgo/v2 v2.0.0 github.com/onsi/gomega v1.17.0 github.com/prometheus/client_golang v1.12.0 + github.com/prometheus/common v0.32.1 github.com/spf13/cobra v1.2.1 github.com/spf13/pflag v1.0.5 go.uber.org/automaxprocs v1.4.0 @@ -72,7 +73,6 @@ require ( github.com/opencontainers/selinux v1.8.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect - github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect go.etcd.io/etcd/api/v3 v3.5.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect diff --git a/installer/README.md b/installer/README.md index 66309b8f8a..8567269b76 100644 --- a/installer/README.md +++ b/installer/README.md @@ -79,6 +79,10 @@ The following are the list configurable parameters of Volcano Chart and their de |`basic.admission_app_name`|Admission Controller App Name|`volcano-admission`| |`basic.controller_app_name`|Controller App Name|`volcano-controller`| |`basic.scheduler_app_name`|Scheduler App Name|`volcano-scheduler`| +|`custom.metrics_enable`|Whether to Enable Metrics|`false`| +|`custom.admission_enable`|Whether to Enable Admission|`true`| +|`custom.controller_enable`|Whether to Enable Controller|`true`| +|`custom.scheduler_enable`|Whether to Enable Scheduler|`true`| Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. For example, diff --git a/installer/helm/chart/volcano/templates/admission.yaml b/installer/helm/chart/volcano/templates/admission.yaml index 5700b5a5bc..50256f61d6 100644 --- a/installer/helm/chart/volcano/templates/admission.yaml +++ b/installer/helm/chart/volcano/templates/admission.yaml @@ -1,3 +1,4 @@ +{{- if .Values.custom.admission_enable }} apiVersion: v1 kind: ConfigMap metadata: @@ -153,3 +154,4 @@ spec: imagePullPolicy: IfNotPresent command: ["./gen-admission-secret.sh", "--service", "{{ .Release.Name }}-admission-service", "--namespace", "{{ .Release.Namespace }}", "--secret", "{{.Values.basic.admission_secret_name}}"] +{{- end }} \ No newline at end of file diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 70db686ca6..5354fa06b4 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -1,3 +1,4 @@ +{{- if .Values.custom.controller_enable }} apiVersion: v1 kind: ServiceAccount metadata: @@ -105,3 +106,4 @@ spec: - -v=4 - 2>&1 imagePullPolicy: "IfNotPresent" +{{- end }} \ No newline at end of file diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index a382b57891..650390bff5 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -1,3 +1,4 @@ +{{- if .Values.custom.scheduler_enable }} apiVersion: v1 kind: ConfigMap metadata: @@ -145,4 +146,5 @@ spec: targetPort: 8080 selector: app: volcano-scheduler - type: ClusterIP \ No newline at end of file + type: ClusterIP +{{- end }} \ No newline at end of file diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml index eeba4f01a0..dec749edf9 100644 --- a/installer/helm/chart/volcano/values.yaml +++ b/installer/helm/chart/volcano/values.yaml @@ -6,8 +6,10 @@ basic: admission_secret_name: "volcano-admission-secret" admission_config_file: "config/volcano-admission.conf" scheduler_config_file: "config/volcano-scheduler.conf" - image_pull_secret: "" admission_port: 8443 custom: metrics_enable: false + admission_enable: true + controller_enable: true + scheduler_enable: true diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index ffb2e76d12..080b6d4e8c 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -17,6 +17,8 @@ limitations under the License. package preempt import ( + "fmt" + "k8s.io/klog" "volcano.sh/volcano/pkg/scheduler/api" @@ -209,6 +211,14 @@ func preempt( nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) selectedNodes := util.SortNodes(nodeScores) + + job, found := ssn.Jobs[preemptor.Job] + if !found { + return false, fmt.Errorf("Job %s not found in SSN", preemptor.Job) + } + + currentQueue := ssn.Queues[job.Queue] + for _, node := range selectedNodes { klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", preemptor.Namespace, preemptor.Name, node.Name) @@ -240,7 +250,8 @@ func preempt( for !victimsQueue.Empty() { // If reclaimed enough resources, break loop to avoid Sub panic. - if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { + // If preemptor's queue is overused, it means preemptor can not be allcated. So no ned care about the node idle resourace + if !ssn.Overused(currentQueue) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { break } preemptee := victimsQueue.Pop().(*api.TaskInfo) @@ -258,7 +269,8 @@ func preempt( klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.", preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq) - if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { + // If preemptor's queue is overused, it means preemptor can not be allcated. So no ned care about the node idle resourace + if !ssn.Overused(currentQueue) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { if err := stmt.Pipeline(preemptor, node.Name); err != nil { klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index b1a55f8478..1ffed8719b 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -19,6 +19,7 @@ package api import ( "fmt" "strconv" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog" @@ -327,6 +328,11 @@ func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) { for i := 0; i < int(gpuNumber); i++ { ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard) } + unhealthyGPUs := ni.getUnhealthyGPUs(node) + for i := range unhealthyGPUs { + klog.V(4).Infof("delete unhealthy gpu id %d from GPUDevices", unhealthyGPUs[i]) + delete(ni.GPUDevices, unhealthyGPUs[i]) + } } // SetNode sets kubernetes node object to nodeInfo object @@ -580,3 +586,24 @@ func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) { } } } + +// getUnhealthyGPUs returns all the unhealthy GPU id. +func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) { + unhealthyGPUs = []int{} + devicesStr, ok := node.Annotations[UnhealthyGPUIDs] + + if !ok { + return + } + + idsStr := strings.Split(devicesStr, ",") + for _, sid := range idsStr { + id, err := strconv.Atoi(sid) + if err != nil { + klog.Warningf("Failed to parse unhealthy gpu id %s due to %v", sid, err) + } else { + unhealthyGPUs = append(unhealthyGPUs, id) + } + } + return +} diff --git a/pkg/scheduler/api/well_known_labels.go b/pkg/scheduler/api/well_known_labels.go index 52197f88fa..2e1db71c74 100644 --- a/pkg/scheduler/api/well_known_labels.go +++ b/pkg/scheduler/api/well_known_labels.go @@ -28,6 +28,9 @@ const ( // GPUIndex is the key of gpu index GPUIndex = "volcano.sh/gpu-index" + // UnhealthyGPUIDs list of unhealthy gpu ids + UnhealthyGPUIDs = "volcano.sh/gpu-unhealthy-ids" + // OversubscriptionNode is the key of node oversubscription OversubscriptionNode = "volcano.sh/oversubscription" // OversubscriptionCPU is the key of cpu oversubscription diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 60a660cd74..289b8228ea 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/api" prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1" + pmodel "github.com/prometheus/common/model" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" @@ -1225,8 +1226,17 @@ func (sc *SchedulerCache) GetMetricsData() { if len(warnings) > 0 { klog.V(3).Infof("Warning querying Prometheus: %v", warnings) } - - rowValues := strings.Split(strings.TrimSpace(res.String()), "=>") + if res == nil || res.String() == "" { + klog.Warningf("Warning querying Prometheus: no data found for %s", queryStr) + continue + } + // plugin.usage only need type pmodel.ValVector in Prometheus.rulues + if res.Type() != pmodel.ValVector { + continue + } + // only method res.String() can get data, dataType []pmodel.ValVector, eg: "{k1:v1, ...} => #[value] @#[timespace]\n {k2:v2, ...} => ..." + firstRowValVector := strings.Split(res.String(), "\n")[0] + rowValues := strings.Split(strings.TrimSpace(firstRowValVector), "=>") value := strings.Split(strings.TrimSpace(rowValues[1]), " ") switch metric { case cpuUsageAvg: @@ -1247,9 +1257,11 @@ func (sc *SchedulerCache) GetMetricsData() { func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) { sc.Mutex.Lock() defer sc.Mutex.Unlock() + for k := range usageInfo { nodeInfo, ok := sc.Nodes[k] if ok { + klog.V(3).Infof("node: %s, ResourceUsage: %+v => %+v", k, *nodeInfo.ResourceUsage, *usageInfo[k]) nodeInfo.ResourceUsage = usageInfo[k] } } diff --git a/pkg/webhooks/admission/jobs/mutate/mutate_job.go b/pkg/webhooks/admission/jobs/mutate/mutate_job.go index f96cc8967f..374d729d5b 100644 --- a/pkg/webhooks/admission/jobs/mutate/mutate_job.go +++ b/pkg/webhooks/admission/jobs/mutate/mutate_job.go @@ -98,8 +98,10 @@ func Jobs(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { Allowed: true, Patch: patchBytes, } - pt := admissionv1.PatchTypeJSONPatch - reviewResponse.PatchType = &pt + if len(patchBytes) > 0 { + pt := admissionv1.PatchTypeJSONPatch + reviewResponse.PatchType = &pt + } return &reviewResponse } diff --git a/pkg/webhooks/admission/podgroups/mutate/mutate_podgroup.go b/pkg/webhooks/admission/podgroups/mutate/mutate_podgroup.go index c65acc8b86..c7b8864cde 100644 --- a/pkg/webhooks/admission/podgroups/mutate/mutate_podgroup.go +++ b/pkg/webhooks/admission/podgroups/mutate/mutate_podgroup.go @@ -87,12 +87,15 @@ func PodGroups(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { } } - pt := admissionv1.PatchTypeJSONPatch - return &admissionv1.AdmissionResponse{ - Allowed: true, - Patch: patchBytes, - PatchType: &pt, + reviewResponse := admissionv1.AdmissionResponse{ + Allowed: true, + Patch: patchBytes, } + if len(patchBytes) > 0 { + pt := admissionv1.PatchTypeJSONPatch + reviewResponse.PatchType = &pt + } + return &reviewResponse } func createPodGroupPatch(podgroup *schedulingv1beta1.PodGroup) ([]byte, error) { diff --git a/pkg/webhooks/admission/pods/mutate/mutate_pod.go b/pkg/webhooks/admission/pods/mutate/mutate_pod.go index cd1d084280..309390ccf5 100644 --- a/pkg/webhooks/admission/pods/mutate/mutate_pod.go +++ b/pkg/webhooks/admission/pods/mutate/mutate_pod.go @@ -19,6 +19,7 @@ package mutate import ( "encoding/json" "fmt" + admissionv1 "k8s.io/api/admission/v1" whv1 "k8s.io/api/admissionregistration/v1" v1 "k8s.io/api/core/v1" @@ -90,8 +91,10 @@ func Pods(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { Allowed: true, Patch: patchBytes, } - pt := admissionv1.PatchTypeJSONPatch - reviewResponse.PatchType = &pt + if len(patchBytes) > 0 { + pt := admissionv1.PatchTypeJSONPatch + reviewResponse.PatchType = &pt + } return &reviewResponse } diff --git a/pkg/webhooks/admission/queues/mutate/mutate_queue.go b/pkg/webhooks/admission/queues/mutate/mutate_queue.go index b39546b0b5..4845c41cd9 100644 --- a/pkg/webhooks/admission/queues/mutate/mutate_queue.go +++ b/pkg/webhooks/admission/queues/mutate/mutate_queue.go @@ -88,12 +88,15 @@ func Queues(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { } } - pt := admissionv1.PatchTypeJSONPatch - return &admissionv1.AdmissionResponse{ - Allowed: true, - Patch: patchBytes, - PatchType: &pt, + reviewResponse := admissionv1.AdmissionResponse{ + Allowed: true, + Patch: patchBytes, } + if len(patchBytes) > 0 { + pt := admissionv1.PatchTypeJSONPatch + reviewResponse.PatchType = &pt + } + return &reviewResponse } func createQueuePatch(queue *schedulingv1beta1.Queue) ([]byte, error) {