Skip to content

Commit

Permalink
add events for pod with pipelined state
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Nov 3, 2019
1 parent 8d613c0 commit 367b3ee
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
}

// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []api.TaskStatus{api.Allocated, api.Pending} {
for _, status := range []api.TaskStatus{api.Allocated, api.Pending, api.Pipelined} {
for _, taskInfo := range job.TaskStatusIndex[status] {
msg := baseErrorMessage
fitError := job.NodesFitErrors[taskInfo.UID]
Expand Down
51 changes: 4 additions & 47 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ type queueAttr struct {
weight int32
share float64

deserved *api.Resource
unSatisfied *api.Resource
allocated *api.Resource
request *api.Resource
deserved *api.Resource
allocated *api.Resource
request *api.Resource
}

// New return proportion action
Expand Down Expand Up @@ -103,7 +102,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}

remaining := pp.totalResource.Clone()
totalResource := pp.totalResource.Clone()
meet := map[api.QueueID]struct{}{}
for {
totalWeight := int32(0)
Expand Down Expand Up @@ -133,11 +131,10 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}

oldDeserved := attr.deserved.Clone()
attr.deserved.Add(totalResource.Clone().Multi(float64(attr.weight) / float64(totalWeight)))
attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight)))

if attr.request.Less(attr.deserved) {
attr.deserved = helpers.Min(attr.deserved, attr.request)
attr.unSatisfied = attr.request.Sub(attr.deserved)
meet[attr.queueID] = struct{}{}
glog.V(4).Infof("queue <%s> is meet", attr.name)

Expand All @@ -159,46 +156,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}
}

for {
if true == remaining.IsEmpty() {
break
}

totalWeight := int32(0)
for _, attr := range pp.queueOpts {
if false == attr.unSatisfied.IsEmpty() {
totalWeight += attr.weight
}
}

// If no queues, break
if totalWeight == 0 {
glog.V(4).Infof("Exiting when total weight is 0")
break
}

for _, attr := range pp.queueOpts {
if attr.unSatisfied.IsEmpty() {
continue
}

added := remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight))
if true == added.LessEqual(attr.unSatisfied) {
attr.deserved.Add(added)
attr.unSatisfied.Sub(added)
remaining.Sub(added)
} else {
attr.deserved.Add(attr.unSatisfied)
attr.unSatisfied = &api.Resource{}
remaining.Sub(attr.unSatisfied)
}

if remaining.IsEmpty() {
break
}
}
}

ssn.AddQueueOrderFn(pp.Name(), func(l, r interface{}) int {
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)
Expand Down

0 comments on commit 367b3ee

Please sign in to comment.