Skip to content

Commit

Permalink
enhance: refine error handle in clustering compaction task
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jul 25, 2024
1 parent 4f6cbfd commit f840db7
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 50 deletions.
9 changes: 1 addition & 8 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,7 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
sessions: c.sessions,
}
case datapb.CompactionType_ClusteringCompaction:
task = &clusteringCompactionTask{
CompactionTask: t,
allocator: c.allocator,
meta: c.meta,
sessions: c.sessions,
handler: c.handler,
analyzeScheduler: c.analyzeScheduler,
}
task = newClusteringCompactionTask(t, c.allocator, c.meta, c.sessions, c.handler, c.analyzeScheduler)
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
Expand Down
62 changes: 36 additions & 26 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import (

var _ CompactionTask = (*clusteringCompactionTask)(nil)

const (
taskMaxRetryTimes = int32(3)
)

type clusteringCompactionTask struct {
*datapb.CompactionTask
plan *datapb.CompactionPlan
Expand All @@ -55,6 +51,20 @@ type clusteringCompactionTask struct {
sessions SessionManager
handler Handler
analyzeScheduler *taskScheduler

maxRetryTimes int32
}

func newClusteringCompactionTask(t *datapb.CompactionTask, allocator allocator, meta CompactionMeta, session SessionManager, handler Handler, analyzeScheduler *taskScheduler) *clusteringCompactionTask {
return &clusteringCompactionTask{
CompactionTask: t,
allocator: allocator,
meta: meta,
sessions: session,
handler: handler,
analyzeScheduler: analyzeScheduler,
maxRetryTimes: 3,
}
}

func (t *clusteringCompactionTask) Process() bool {
Expand All @@ -63,12 +73,15 @@ func (t *clusteringCompactionTask) Process() bool {
err := t.retryableProcess()
if err != nil {
log.Warn("fail in process task", zap.Error(err))
if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes {
if merr.IsRetryableErr(err) && t.RetryTimes < t.maxRetryTimes {
// retry in next Process
t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1))
err = t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1))
} else {
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
}
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))

Check warning on line 84 in internal/datacoord/compaction_task_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_clustering.go#L84

Added line #L84 was not covered by tests
}
}
// task state update, refresh retry times count
Expand All @@ -80,16 +93,20 @@ func (t *clusteringCompactionTask) Process() bool {
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration))
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}

if t.State == datapb.CompactionTaskState_completed {
t.updateAndSaveTaskMeta(setEndTime(ts))
updateOps = append(updateOps, setEndTime(ts))
elapse := ts - t.StartTime
log.Info("clustering compaction task total elapse", zap.Int64("elapse", elapse))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse))
}
err = t.updateAndSaveTaskMeta(updateOps...)
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))

Check warning on line 108 in internal/datacoord/compaction_task_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_clustering.go#L108

Added line #L108 was not covered by tests
}
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
Expand Down Expand Up @@ -183,8 +200,6 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP

func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID()))
ts := time.Now().UnixMilli()
t.updateAndSaveTaskMeta(setStartTime(ts))
var operators []UpdateOperator
for _, segID := range t.InputSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2))
Expand Down Expand Up @@ -218,8 +233,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// setNodeID(NullNodeID) to trigger reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return err
}
Expand All @@ -230,7 +244,6 @@ func (t *clusteringCompactionTask) processExecuting() error {
result := t.result
if len(result.GetSegments()) == 0 {
log.Warn("illegal compaction results, this should not happen")
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
return merr.WrapErrCompactionResult("compaction result is empty")
}

Expand Down Expand Up @@ -260,8 +273,10 @@ func (t *clusteringCompactionTask) processExecuting() error {
return nil
case datapb.CompactionTaskState_failed:
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
default:
log.Error("not support compaction task state", zap.String("state", result.GetState().String()))
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
}
return nil
}

func (t *clusteringCompactionTask) processMetaSaved() error {
Expand Down Expand Up @@ -382,8 +397,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
}

t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
return nil
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
}

func (t *clusteringCompactionTask) doAnalyze() error {
Expand All @@ -409,9 +423,8 @@ func (t *clusteringCompactionTask) doAnalyze() error {
State: indexpb.JobState_JobStateInit,
},
})
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
log.Info("submit analyze task", zap.Int64("planID", t.GetPlanID()), zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("id", t.GetAnalyzeTaskID()))
return nil
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))

Check warning on line 427 in internal/datacoord/compaction_task_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_clustering.go#L427

Added line #L427 was not covered by tests
}

func (t *clusteringCompactionTask) doCompact() error {
Expand Down Expand Up @@ -445,21 +458,18 @@ func (t *clusteringCompactionTask) doCompact() error {
t.plan, err = t.BuildCompactionRequest()
if err != nil {
log.Warn("Failed to BuildCompactionRequest", zap.Error(err))
return merr.WrapErrBuildCompactionRequestFail(err) // retryable
return err
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
return nil
return t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
return nil
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
}

func (t *clusteringCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
Expand Down
Loading

0 comments on commit f840db7

Please sign in to comment.