Skip to content

Commit

Permalink
Merge pull request #35 from lminzhw/performance_improvement
Browse files Browse the repository at this point in the history
Performance improvement
  • Loading branch information
volcano-sh-bot authored Jun 28, 2019
2 parents c93ffa9 + 9697ace commit e09c367
Show file tree
Hide file tree
Showing 15 changed files with 537 additions and 121 deletions.
6 changes: 5 additions & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
Expand Down Expand Up @@ -150,7 +154,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
break
}

nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
nodeScores := util.PrioritizeNodes(task, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node := util.SelectBestNode(nodeScores)
// Allocate idle resource to the task.
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip backfill, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
continue
Expand Down Expand Up @@ -186,7 +190,7 @@ func preempt(

predicateNodes := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

selectedNodes := util.SortNodes(nodeScores)
for _, node := range selectedNodes {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
glog.V(4).Infof("Job <%s/%s> Queue <%s> skip reclaim, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (ji *JobInfo) Clone() *JobInfo {
NodesFitDelta: make(NodeResourceMap),

PDB: ji.PDB,
PodGroup: ji.PodGroup,
PodGroup: ji.PodGroup.DeepCopy(),

TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ type EvictableFn func(*TaskInfo, []*TaskInfo) []*TaskInfo
// NodeOrderFn is the func declaration used to get priority score for a node for a particular task.
type NodeOrderFn func(*TaskInfo, *NodeInfo) (float64, error)

// BatchNodeOrderFn is the func declaration used to get priority score for ALL nodes for a particular task.
type BatchNodeOrderFn func(*TaskInfo, []*NodeInfo) (map[string]float64, error)

// NodeMapFn is the func declaration used to get priority score for a node for a particular task.
type NodeMapFn func(*TaskInfo, *NodeInfo) (float64, error)

Expand Down
115 changes: 86 additions & 29 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,31 @@ type defaultStatusUpdater struct {
kbclient *kbver.Clientset
}

// following the same logic as podutil.UpdatePodCondition
func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bool {
lastTransitionTime := metav1.Now()
// Try to find this pod condition.
_, oldCondition := podutil.GetPodCondition(status, condition.Type)

if oldCondition == nil {
// We are adding new pod condition.
return true
}
// We are updating an existing condition, so we need to check if it has changed.
if condition.Status == oldCondition.Status {
lastTransitionTime = oldCondition.LastTransitionTime
}

isEqual := condition.Status == oldCondition.Status &&
condition.Reason == oldCondition.Reason &&
condition.Message == oldCondition.Message &&
condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) &&
lastTransitionTime.Equal(&oldCondition.LastTransitionTime)

// Return true if one of the fields have changed.
return !isEqual
}

// UpdatePodCondition will Update pod with podCondition
func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) {
glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
Expand Down Expand Up @@ -184,22 +209,35 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
}

func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
}
kbClient, err := kbver.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kbClient, with err: %v", err))
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
PriorityClasses: make(map[string]*v1beta1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
kubeclient: kubeClient,
kbclient: kbClient,
defaultQueue: defaultQueue,
schedulerName: schedulerName,
}

// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")})
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})

sc.Binder = &defaultBinder{
Expand Down Expand Up @@ -460,22 +498,27 @@ func (sc *SchedulerCache) BindVolumes(task *api.TaskInfo) error {

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
pod := task.Pod

pod := task.Pod.DeepCopy()

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition should be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, &v1.PodCondition{
condition := &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: message,
}); err != nil {
return err
}

if podConditionHaveUpdate(&pod.Status, condition) {
pod = pod.DeepCopy()

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition should be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil {
return err
}
} else {
glog.V(4).Infof("task unscheduleable %s/%s, message: %s, skip by no condition update", pod.Namespace, pod.Name, message)
}

return nil
Expand Down Expand Up @@ -560,6 +603,30 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
snapshot.Queues[value.UID] = value.Clone()
}

var cloneJobLock sync.Mutex
var wg sync.WaitGroup

cloneJob := func(value *api.JobInfo) {
if value.PodGroup != nil {
value.Priority = sc.defaultPriority

priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}

glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
value.Namespace, value.Name, priName, value.Priority)
}

clonedJob := value.Clone()

cloneJobLock.Lock()
snapshot.Jobs[value.UID] = clonedJob
cloneJobLock.Unlock()
wg.Done()
}

for _, value := range sc.Jobs {
// If no scheduling spec, does not handle it.
if value.PodGroup == nil && value.PDB == nil {
Expand All @@ -575,20 +642,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
continue
}

if value.PodGroup != nil {
value.Priority = sc.defaultPriority

priName := value.PodGroup.Spec.PriorityClassName
if priorityClass, found := sc.PriorityClasses[priName]; found {
value.Priority = priorityClass.Value
}

glog.V(4).Infof("The priority of job <%s/%s> is <%s/%d>",
value.Namespace, value.Name, priName, value.Priority)
}

snapshot.Jobs[value.UID] = value.Clone()
wg.Add(1)
go cloneJob(value)
}
wg.Wait()

glog.V(3).Infof("There are <%d> Jobs, <%d> Queues and <%d> Nodes in total for scheduling.",
len(snapshot.Jobs), len(snapshot.Queues), len(snapshot.Nodes))
Expand Down Expand Up @@ -658,8 +715,8 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
}

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) {
if !shadowPodGroup(job.PodGroup) {
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo, updatePG bool) (*kbapi.JobInfo, error) {
if updatePG && !shadowPodGroup(job.PodGroup) {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Cache interface {
RecordJobStatusEvent(job *api.JobInfo)

// UpdateJobStatus puts job in backlog for a while.
UpdateJobStatus(job *api.JobInfo) (*api.JobInfo, error)
UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error)

// AllocateVolumes allocates volume on the host to the task
AllocateVolumes(task *api.TaskInfo, hostname string) error
Expand Down
122 changes: 122 additions & 0 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package framework

import (
"context"
"math/rand"
"reflect"
"time"

"github.com/golang/glog"

"k8s.io/client-go/util/workqueue"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
)

const (
jobUpdaterWorker = 16

jobConditionUpdateTime = time.Minute
jobConditionUpdateTimeJitter = 30 * time.Second
)

// TimeJitterAfter means: new after old + duration + jitter
func TimeJitterAfter(new, old time.Time, duration, maxJitter time.Duration) bool {
var jitter int64
if maxJitter > 0 {
jitter = rand.Int63n(int64(maxJitter))
}
return new.After(old.Add(duration + time.Duration(jitter)))
}

type jobUpdater struct {
ssn *Session
jobQueue []*api.JobInfo
}

func newJobUpdater(ssn *Session) *jobUpdater {
queue := make([]*api.JobInfo, 0, len(ssn.Jobs))
for _, job := range ssn.Jobs {
queue = append(queue, job)
}

ju := &jobUpdater{
ssn: ssn,
jobQueue: queue,
}
return ju
}

func (ju *jobUpdater) UpdateAll() {
workqueue.ParallelizeUntil(context.TODO(), jobUpdaterWorker, len(ju.jobQueue), ju.updateJob)
}

func isPodGroupConditionsUpdated(newCondition, oldCondition []v1alpha1.PodGroupCondition) bool {
if len(newCondition) != len(oldCondition) {
return true
}

for index, newCond := range newCondition {
oldCond := oldCondition[index]

newTime := newCond.LastTransitionTime
oldTime := oldCond.LastTransitionTime
if TimeJitterAfter(newTime.Time, oldTime.Time, jobConditionUpdateTime, jobConditionUpdateTimeJitter) {
return true
}

// if newCond is not new enough, we treat it the same as the old one
newCond.LastTransitionTime = oldTime

// comparing should ignore the TransitionID
newTransitionID := newCond.TransitionID
newCond.TransitionID = oldCond.TransitionID

shouldUpdate := !reflect.DeepEqual(&newCond, &oldCond)

newCond.LastTransitionTime = newTime
newCond.TransitionID = newTransitionID
if shouldUpdate {
return true
}
}

return false
}

func isPodGroupStatusUpdated(newStatus, oldStatus *v1alpha1.PodGroupStatus) bool {
newCondition := newStatus.Conditions
newStatus.Conditions = nil
oldCondition := oldStatus.Conditions
oldStatus.Conditions = nil

shouldUpdate := !reflect.DeepEqual(newStatus, oldStatus) || isPodGroupConditionsUpdated(newCondition, oldCondition)

newStatus.Conditions = newCondition
oldStatus.Conditions = oldCondition

return shouldUpdate
}

// updateJob update specified job
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn

// If job is using PDB, ignore it.
// TODO(k82cn): remove it when removing PDB support
if job.PodGroup == nil {
ssn.cache.RecordJobStatusEvent(job)
return
}

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(&job.PodGroup.Status, oldStatus)

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
glog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
}
}
Loading

0 comments on commit e09c367

Please sign in to comment.