Skip to content

Commit

Permalink
enhance: Add ut for l0CompactionTask processExecuting (milvus-io#34800)
Browse files Browse the repository at this point in the history
See also: milvus-io#34796

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Jul 29, 2024
1 parent 4a3d98d commit 9a07790
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 41 deletions.
25 changes: 17 additions & 8 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ func (t *l0CompactionTask) processExecuting() bool {
}
switch result.GetState() {
case datapb.CompactionTaskState_executing:
// will L0Compaction be timeouted?
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
log.Warn("l0CompactionTask failed to set task timeout state", zap.Error(err))
return false
}
return t.processTimeout()
Expand All @@ -124,12 +125,13 @@ func (t *l0CompactionTask) processExecuting() bool {
}

if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved)); err != nil {
log.Warn("l0CompactionTask failed to save task meta_saved state", zap.Error(err))
return false
}
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)); err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
return t.processFailed()
Expand Down Expand Up @@ -318,10 +320,13 @@ func (t *l0CompactionTask) processMetaSaved() bool {
}

func (t *l0CompactionTask) processCompleted() bool {
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
Expand Down Expand Up @@ -356,11 +361,15 @@ func (t *l0CompactionTask) processFailed() bool {

func (t *l0CompactionTask) checkTimeout() bool {
if t.GetTimeoutInSeconds() > 0 {
diff := time.Since(time.Unix(t.GetStartTime(), 0)).Seconds()
start := time.Unix(t.GetStartTime(), 0)
diff := time.Since(start).Seconds()
if diff > float64(t.GetTimeoutInSeconds()) {
log.Warn("compaction timeout",
zap.Int64("taskID", t.GetTriggerID()),
zap.Int64("planID", t.GetPlanID()),
zap.Int64("nodeID", t.GetNodeID()),
zap.Int32("timeout in seconds", t.GetTimeoutInSeconds()),
zap.Int64("startTime", t.GetStartTime()),
zap.Time("startTime", start),
)
return true
}
Expand Down
Loading

0 comments on commit 9a07790

Please sign in to comment.