Skip to content

Commit

Permalink
Merge pull request #2267 from Thor-wl/0601-1.6CP
Browse files Browse the repository at this point in the history
[cherry-pick] cherry pick important patchs to release-1.6
  • Loading branch information
volcano-sh-bot authored Jun 1, 2022
2 parents 172e126 + a4f20dc commit 37e26cb
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/onsi/ginkgo/v2 v2.0.0
github.com/onsi/gomega v1.17.0
github.com/prometheus/client_golang v1.12.0
github.com/prometheus/common v0.32.1
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
go.uber.org/automaxprocs v1.4.0
Expand Down Expand Up @@ -72,7 +73,6 @@ require (
github.com/opencontainers/selinux v1.8.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.0 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions installer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ The following are the list configurable parameters of Volcano Chart and their de
|`basic.admission_app_name`|Admission Controller App Name|`volcano-admission`|
|`basic.controller_app_name`|Controller App Name|`volcano-controller`|
|`basic.scheduler_app_name`|Scheduler App Name|`volcano-scheduler`|
|`custom.metrics_enable`|Whether to Enable Metrics|`false`|
|`custom.admission_enable`|Whether to Enable Admission|`true`|
|`custom.controller_enable`|Whether to Enable Controller|`true`|
|`custom.scheduler_enable`|Whether to Enable Scheduler|`true`|

Specify each parameter using the `--set key=value[,key=value]` argument to `helm install`. For example,

Expand Down
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/templates/admission.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if .Values.custom.admission_enable }}
apiVersion: v1
kind: ConfigMap
metadata:
Expand Down Expand Up @@ -153,3 +154,4 @@ spec:
imagePullPolicy: IfNotPresent
command: ["./gen-admission-secret.sh", "--service", "{{ .Release.Name }}-admission-service", "--namespace",
"{{ .Release.Namespace }}", "--secret", "{{.Values.basic.admission_secret_name}}"]
{{- end }}
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if .Values.custom.controller_enable }}
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down Expand Up @@ -105,3 +106,4 @@ spec:
- -v=4
- 2>&1
imagePullPolicy: "IfNotPresent"
{{- end }}
4 changes: 3 additions & 1 deletion installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if .Values.custom.scheduler_enable }}
apiVersion: v1
kind: ConfigMap
metadata:
Expand Down Expand Up @@ -145,4 +146,5 @@ spec:
targetPort: 8080
selector:
app: volcano-scheduler
type: ClusterIP
type: ClusterIP
{{- end }}
4 changes: 3 additions & 1 deletion installer/helm/chart/volcano/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ basic:
admission_secret_name: "volcano-admission-secret"
admission_config_file: "config/volcano-admission.conf"
scheduler_config_file: "config/volcano-scheduler.conf"

image_pull_secret: ""
admission_port: 8443
custom:
metrics_enable: false
admission_enable: true
controller_enable: true
scheduler_enable: true
16 changes: 14 additions & 2 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package preempt

import (
"fmt"

"k8s.io/klog"

"volcano.sh/volcano/pkg/scheduler/api"
Expand Down Expand Up @@ -209,6 +211,14 @@ func preempt(
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

selectedNodes := util.SortNodes(nodeScores)

job, found := ssn.Jobs[preemptor.Job]
if !found {
return false, fmt.Errorf("Job %s not found in SSN", preemptor.Job)
}

currentQueue := ssn.Queues[job.Queue]

for _, node := range selectedNodes {
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
preemptor.Namespace, preemptor.Name, node.Name)
Expand Down Expand Up @@ -240,7 +250,8 @@ func preempt(

for !victimsQueue.Empty() {
// If reclaimed enough resources, break loop to avoid Sub panic.
if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
// If preemptor's queue is overused, it means preemptor can not be allcated. So no ned care about the node idle resourace
if !ssn.Overused(currentQueue) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
break
}
preemptee := victimsQueue.Pop().(*api.TaskInfo)
Expand All @@ -258,7 +269,8 @@ func preempt(
klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)

if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
// If preemptor's queue is overused, it means preemptor can not be allcated. So no ned care about the node idle resourace
if !ssn.Overused(currentQueue) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
Expand Down
27 changes: 27 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"fmt"
"strconv"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -327,6 +328,11 @@ func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) {
for i := 0; i < int(gpuNumber); i++ {
ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard)
}
unhealthyGPUs := ni.getUnhealthyGPUs(node)
for i := range unhealthyGPUs {
klog.V(4).Infof("delete unhealthy gpu id %d from GPUDevices", unhealthyGPUs[i])
delete(ni.GPUDevices, unhealthyGPUs[i])
}
}

// SetNode sets kubernetes node object to nodeInfo object
Expand Down Expand Up @@ -580,3 +586,24 @@ func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) {
}
}
}

// getUnhealthyGPUs returns all the unhealthy GPU id.
func (ni *NodeInfo) getUnhealthyGPUs(node *v1.Node) (unhealthyGPUs []int) {
unhealthyGPUs = []int{}
devicesStr, ok := node.Annotations[UnhealthyGPUIDs]

if !ok {
return
}

idsStr := strings.Split(devicesStr, ",")
for _, sid := range idsStr {
id, err := strconv.Atoi(sid)
if err != nil {
klog.Warningf("Failed to parse unhealthy gpu id %s due to %v", sid, err)
} else {
unhealthyGPUs = append(unhealthyGPUs, id)
}
}
return
}
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
// GPUIndex is the key of gpu index
GPUIndex = "volcano.sh/gpu-index"

// UnhealthyGPUIDs list of unhealthy gpu ids
UnhealthyGPUIDs = "volcano.sh/gpu-unhealthy-ids"

// OversubscriptionNode is the key of node oversubscription
OversubscriptionNode = "volcano.sh/oversubscription"
// OversubscriptionCPU is the key of cpu oversubscription
Expand Down
16 changes: 14 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/prometheus/client_golang/api"
prometheusv1 "github.com/prometheus/client_golang/api/prometheus/v1"
pmodel "github.com/prometheus/common/model"

v1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
Expand Down Expand Up @@ -1225,8 +1226,17 @@ func (sc *SchedulerCache) GetMetricsData() {
if len(warnings) > 0 {
klog.V(3).Infof("Warning querying Prometheus: %v", warnings)
}

rowValues := strings.Split(strings.TrimSpace(res.String()), "=>")
if res == nil || res.String() == "" {
klog.Warningf("Warning querying Prometheus: no data found for %s", queryStr)
continue
}
// plugin.usage only need type pmodel.ValVector in Prometheus.rulues
if res.Type() != pmodel.ValVector {
continue
}
// only method res.String() can get data, dataType []pmodel.ValVector, eg: "{k1:v1, ...} => #[value] @#[timespace]\n {k2:v2, ...} => ..."
firstRowValVector := strings.Split(res.String(), "\n")[0]
rowValues := strings.Split(strings.TrimSpace(firstRowValVector), "=>")
value := strings.Split(strings.TrimSpace(rowValues[1]), " ")
switch metric {
case cpuUsageAvg:
Expand All @@ -1247,9 +1257,11 @@ func (sc *SchedulerCache) GetMetricsData() {
func (sc *SchedulerCache) setMetricsData(usageInfo map[string]*schedulingapi.NodeUsage) {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

for k := range usageInfo {
nodeInfo, ok := sc.Nodes[k]
if ok {
klog.V(3).Infof("node: %s, ResourceUsage: %+v => %+v", k, *nodeInfo.ResourceUsage, *usageInfo[k])
nodeInfo.ResourceUsage = usageInfo[k]
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/webhooks/admission/jobs/mutate/mutate_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ func Jobs(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
Allowed: true,
Patch: patchBytes,
}
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
if len(patchBytes) > 0 {
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}

return &reviewResponse
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/webhooks/admission/podgroups/mutate/mutate_podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ func PodGroups(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
}
}

pt := admissionv1.PatchTypeJSONPatch
return &admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: &pt,
reviewResponse := admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
}
if len(patchBytes) > 0 {
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}
return &reviewResponse
}

func createPodGroupPatch(podgroup *schedulingv1beta1.PodGroup) ([]byte, error) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/webhooks/admission/pods/mutate/mutate_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mutate
import (
"encoding/json"
"fmt"

admissionv1 "k8s.io/api/admission/v1"
whv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -90,8 +91,10 @@ func Pods(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
Allowed: true,
Patch: patchBytes,
}
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
if len(patchBytes) > 0 {
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}

return &reviewResponse
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/webhooks/admission/queues/mutate/mutate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ func Queues(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
}
}

pt := admissionv1.PatchTypeJSONPatch
return &admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: &pt,
reviewResponse := admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
}
if len(patchBytes) > 0 {
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}
return &reviewResponse
}

func createQueuePatch(queue *schedulingv1beta1.Queue) ([]byte, error) {
Expand Down

0 comments on commit 37e26cb

Please sign in to comment.