-
Notifications
You must be signed in to change notification settings - Fork 64
Error handling updates in quota management usage #367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
847b0b9
5ebd199
38b5144
f25812c
3d82520
e287f1d
d816fb5
8caa3ae
5ee871e
37fdd3a
eb3a0e9
4d29d6c
80d1e58
986ac60
cf60966
b32182c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Logging improvements Additional test cases in quotaforest manager
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1103,10 +1103,10 @@ func (qjm *XController) ScheduleNext() { | |
// amount of resources asked by the job | ||
qj, err := qjm.qjqueue.Pop() | ||
if err != nil { | ||
klog.V(3).Infof("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err) | ||
klog.Errorf("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err) | ||
return // Try to pop qjqueue again | ||
} else { | ||
klog.V(3).Infof("[ScheduleNext] activeQ.Pop %s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status) | ||
klog.Infof("[ScheduleNext] activeQ.Pop %s *Delay=%.6f seconds RemainingLength=%d &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.Length(), qj, qj.ResourceVersion, qj.Status) | ||
} | ||
|
||
apiCacheAWJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) | ||
|
@@ -1125,6 +1125,7 @@ func (qjm *XController) ScheduleNext() { | |
|
||
// Re-compute SystemPriority for DynamicPriority policy | ||
if qjm.serverOption.DynamicPriority { | ||
klog.V(4).Info("[ScheduleNext] dynamic priority enabled") | ||
// Create newHeap to temporarily store qjqueue jobs for updating SystemPriority | ||
tempQ := newHeap(cache.MetaNamespaceKeyFunc, HigherSystemPriorityQJ) | ||
qj.Status.SystemPriority = float64(qj.Spec.Priority) + qj.Spec.PrioritySlope*(time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time)).Seconds() | ||
|
@@ -1140,12 +1141,12 @@ func (qjm *XController) ScheduleNext() { | |
qjm.qjqueue.AddIfNotPresent(qjtemp.(*arbv1.AppWrapper)) | ||
} | ||
// Print qjqueue.ativeQ for debugging | ||
if klog.V(10).Enabled() { | ||
if klog.V(4).Enabled() { | ||
pq := qjm.qjqueue.(*PriorityQueue) | ||
if qjm.qjqueue.Length() > 0 { | ||
for key, element := range pq.activeQ.data.items { | ||
qjtemp := element.obj.(*arbv1.AppWrapper) | ||
klog.V(10).Infof("[ScheduleNext] AfterCalc: qjqLength=%d Key=%s index=%d Priority=%.1f SystemPriority=%.1f QueueJobState=%s", | ||
klog.V(4).Infof("[ScheduleNext] AfterCalc: qjqLength=%d Key=%s index=%d Priority=%.1f SystemPriority=%.1f QueueJobState=%s", | ||
qjm.qjqueue.Length(), key, element.index, float64(qjtemp.Spec.Priority), qjtemp.Status.SystemPriority, qjtemp.Status.QueueJobState) | ||
} | ||
} | ||
|
@@ -1162,7 +1163,7 @@ func (qjm *XController) ScheduleNext() { | |
} | ||
|
||
if qj.Status.CanRun { | ||
klog.V(10).Infof("[ScheduleNext] AppWrapper job: %s from prioirty queue is already scheduled. Ignoring request: Status=%+v\n", qj.Name, qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s from prioirty queue is already scheduled. Ignoring request: Status=%+v\n", qj.Name, qj.Status) | ||
return | ||
} | ||
apiCacheAppWrapper, err := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) | ||
|
@@ -1171,7 +1172,7 @@ func (qjm *XController) ScheduleNext() { | |
return | ||
} | ||
if apiCacheAppWrapper.Status.CanRun { | ||
klog.V(10).Infof("[ScheduleNext] AppWrapper job: %s from API is already scheduled. Ignoring request: Status=%+v\n", apiCacheAppWrapper.Name, apiCacheAppWrapper.Status) | ||
klog.V(4).Infof("[ScheduleNext] AppWrapper job: %s from API is already scheduled. Ignoring request: Status=%+v\n", apiCacheAppWrapper.Name, apiCacheAppWrapper.Status) | ||
return | ||
} | ||
|
||
|
@@ -1182,11 +1183,11 @@ func (qjm *XController) ScheduleNext() { | |
qjm.updateEtcd(qj, "ScheduleNext - setHOL") | ||
qjm.qjqueue.AddUnschedulableIfNotPresent(qj) // working on qj, avoid other threads putting it back to activeQ | ||
|
||
klog.V(10).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s Version=%s activeQ=%t Unsched=%t Status=%+v", qjm.qjqueue.Length(), qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] after Pop qjqLength=%d qj %s Version=%s activeQ=%t Unsched=%t Status=%+v", qjm.qjqueue.Length(), qj.Name, qj.ResourceVersion, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj.Status) | ||
if qjm.isDispatcher { | ||
klog.V(2).Infof("[ScheduleNext] [Dispatcher Mode] Dispatch Next QueueJob: %s\n", qj.Name) | ||
klog.Infof("[ScheduleNext] [Dispatcher Mode] Dispatch Next QueueJob: %s\n", qj.Name) | ||
} else { | ||
klog.V(2).Infof("[ScheduleNext] [Agent Mode] Deploy Next QueueJob: %s Status=%+v\n", qj.Name, qj.Status) | ||
klog.Infof("[ScheduleNext] [Agent Mode] Deploy Next QueueJob: %s Status=%+v\n", qj.Name, qj.Status) | ||
} | ||
|
||
dispatchFailedReason := "AppWrapperNotRunnable." | ||
|
@@ -1240,8 +1241,11 @@ func (qjm *XController) ScheduleNext() { | |
// HeadOfLine logic | ||
HOLStartTime := time.Now() | ||
forwarded := false | ||
fowardingLoopCount := 1 | ||
quotaFits := false | ||
// Try to forward to eventQueue for at most HeadOfLineHoldingTime | ||
for !forwarded { | ||
klog.Infof("[ScheduleNext] Forwarding loop iteration: %d", fowardingLoopCount) | ||
priorityindex := qj.Status.SystemPriority | ||
// Support for Non-Preemption | ||
if !qjm.serverOption.Preemption { | ||
|
@@ -1253,17 +1257,19 @@ func (qjm *XController) ScheduleNext() { | |
} | ||
resources, proposedPreemptions := qjm.getAggregatedAvailableResourcesPriority( | ||
qjm.cache.GetUnallocatedResources(), priorityindex, qj, "") | ||
klog.V(2).Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) | ||
klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) | ||
|
||
if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { | ||
//Now evaluate quota | ||
fits := true | ||
klog.V(10).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
if qjm.serverOption.QuotaEnabled { | ||
if qjm.quotaManager != nil { | ||
quotaFits, preemptAWs, msg := qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) | ||
var msg string | ||
var preemptAWs []*arbv1.AppWrapper | ||
quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) | ||
if quotaFits { | ||
klog.V(4).Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
klog.Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
// Set any jobs that are marked for preemption | ||
qjm.preemptAWJobs(preemptAWs) | ||
} else { // Not enough free quota to dispatch appwrapper | ||
|
@@ -1279,26 +1285,30 @@ func (qjm *XController) ScheduleNext() { | |
} else { | ||
fits = false | ||
//Quota manager not initialized | ||
dispatchFailedMessage = "Quota evaluation is enable but not initialize. Insufficient quota to dispatch AppWrapper." | ||
klog.Errorf("[ScheduleNext] Quota evaluation is enable but not initialize. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) | ||
dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." | ||
klog.Errorf("[ScheduleNext] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) | ||
} | ||
} else { | ||
klog.V(10).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
} | ||
|
||
// If quota evalauation is set or quota evaluation not enabled set the appwrapper to be dispatched | ||
// If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched | ||
if fits { | ||
// aw is ready to go! | ||
apiQueueJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) | ||
// apiQueueJob's ControllerFirstTimestamp is only microsecond level instead of nanosecond level | ||
if e != nil { | ||
klog.Errorf("[ScheduleNext] Unable to get AW %s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) | ||
if qjm.quotaManager != nil && quotaFits { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asm582, @dmatch01, @KPostOffice and @metalcycling I believe that this one of the causes for cc: @tardieu There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. quota is not released when AW is deleted by clients, which is a different issue. |
||
//Quota was allocated for this appwrapper, release it. | ||
qjm.quotaManager.Release(qj) | ||
} | ||
return | ||
} | ||
// make sure qj has the latest information | ||
if larger(apiQueueJob.ResourceVersion, qj.ResourceVersion) { | ||
klog.V(10).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) | ||
klog.V(10).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) | ||
klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) | ||
klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) | ||
apiQueueJob.DeepCopyInto(qj) | ||
} | ||
desired := int32(0) | ||
|
@@ -1317,23 +1327,39 @@ func (qjm *XController) ScheduleNext() { | |
} else { // successful add to eventQueue, remove from qjqueue | ||
qjm.qjqueue.Delete(qj) | ||
forwarded = true | ||
klog.V(3).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
} | ||
} //updateEtcd | ||
} //fits | ||
} else { // Not enough free resources to dispatch HOL | ||
dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." | ||
klog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
klog.V(4).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
} | ||
// stop trying to dispatch after HeadOfLineHoldingTime | ||
if forwarded || time.Now().After(HOLStartTime.Add(time.Duration(qjm.serverOption.HeadOfLineHoldingTime)*time.Second)) { | ||
schedulingTimeExpired := time.Now().After(HOLStartTime.Add(time.Duration(qjm.serverOption.HeadOfLineHoldingTime) * time.Second)) | ||
if forwarded { | ||
break | ||
} else if schedulingTimeExpired { | ||
// stop trying to dispatch after HeadOfLineHoldingTime | ||
// release quota if allocated | ||
if qjm.quotaManager != nil && quotaFits { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asm582, @KPostOffice, @metalcycling in your testing have you seen this condition happen ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @z103cb We have not seen this condition happen, who sets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asm582 the HeadofLineHoldingTime is a command line option, defined at: AFAIK, the helm chart does not define an override |
||
//Quota was allocated for this appwrapper, release it. | ||
qjm.quotaManager.Release(qj) | ||
} | ||
break | ||
} else { // Try to dispatch again after one second | ||
if qjm.quotaManager != nil && quotaFits { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asm582, @metalcycling have you seen this happen in your testing ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think retries could be avoided here explicitly, undispatched AW should be picked up in the next dispatch cycle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asm582 can you be more specific I really don't understand your comment. |
||
//release any quota as the qj will be tried again and the quota might have been allocated. | ||
qjm.quotaManager.Release(qj) | ||
} | ||
time.Sleep(time.Second * 1) | ||
} | ||
fowardingLoopCount += 1 | ||
} | ||
if !forwarded { // start thread to backoff | ||
klog.V(3).Infof("[ScheduleNext] HOL backoff %s after waiting for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) | ||
if qjm.quotaManager != nil && quotaFits { | ||
qjm.quotaManager.Release(qj) | ||
} | ||
go qjm.backoff(qj, dispatchFailedReason, dispatchFailedMessage) | ||
} | ||
} | ||
|
@@ -1414,9 +1440,10 @@ func (qjm *XController) waitForPodCountUpdates(searchCond *arbv1.AppWrapperCondi | |
|
||
// Don't reserve resources if timeout is hit | ||
if timeSinceDispatched.Microseconds() > timeoutMicroSeconds { | ||
return false | ||
klog.V(10).Infof("[waitForPodCountUpdates] Dispatch duration time %d microseconds has reached timeout value of %d microseconds", | ||
timeSinceDispatched.Microseconds(), timeoutMicroSeconds) | ||
|
||
return false | ||
} | ||
|
||
klog.V(10).Infof("[waitForPodCountUpdates] Dispatch duration time %d microseconds has not reached timeout value of %d microseconds", | ||
|
@@ -1578,7 +1605,7 @@ func (qjm *XController) UpdateQueueJobs() { | |
newjob.Status.SystemPriority = float64(newjob.Spec.Priority) | ||
newjob.Status.QueueJobState = arbv1.AppWrapperCondInit | ||
newjob.Status.Conditions = []arbv1.AppWrapperCondition{ | ||
arbv1.AppWrapperCondition{ | ||
{ | ||
Type: arbv1.AppWrapperCondInit, | ||
Status: v1.ConditionTrue, | ||
LastUpdateMicroTime: metav1.NowMicro(), | ||
|
@@ -2110,6 +2137,7 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool | |
updateQj = qj.DeepCopy() | ||
} | ||
cc.updateEtcd(updateQj, "[syncQueueJob] setCompleted") | ||
cc.quotaManager.Release(updateQj) | ||
} | ||
|
||
// Bugfix to eliminate performance problem of overloading the event queue. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asm582, @dmatch01, @KPostOffice and @metalcycling please take a very close look at the changes starting from this line.