Skip to content

Commit

Permalink
enhance: Add more ut for l0CompactionTask (#35101)
Browse files Browse the repository at this point in the history
See also: #34796
pr: #35100

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jul 31, 2024
1 parent 0bec383 commit cf56a14
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 86 deletions.
176 changes: 90 additions & 86 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,51 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
return false
}
return t.processCompleted()
}

func (t *l0CompactionTask) processCompleted() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) processTimeout() bool {
t.resetSegmentCompacting()
return true
}

func (t *l0CompactionTask) processFailed() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) GetSpan() trace.Span {
return t.span
}
Expand All @@ -153,46 +198,14 @@ func (t *l0CompactionTask) SetSpan(span trace.Span) {
t.span = span
}

func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}

func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := &datapb.CompactionTask{
PlanID: t.GetPlanID(),
TriggerID: t.GetTriggerID(),
State: t.GetState(),
StartTime: t.GetStartTime(),
EndTime: t.GetEndTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
CollectionTtl: t.CollectionTtl,
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
Channel: t.GetChannel(),
InputSegments: t.GetInputSegments(),
ResultSegments: t.GetResultSegments(),
TotalRows: t.TotalRows,
Schema: t.Schema,
NodeID: t.GetNodeID(),
FailReason: t.GetFailReason(),
RetryTimes: t.GetRetryTimes(),
Pos: t.GetPos(),
}
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}

func (t *l0CompactionTask) EndSpan() {
if t.span != nil {
t.span.End()
}
}

func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}

func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
Expand All @@ -203,8 +216,12 @@ func (t *l0CompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}

func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
return t.GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker())
}

func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
Expand All @@ -231,6 +248,34 @@ func (t *l0CompactionTask) CleanLogPath() {
}
}

func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := &datapb.CompactionTask{
PlanID: t.GetPlanID(),
TriggerID: t.GetTriggerID(),
State: t.GetState(),
StartTime: t.GetStartTime(),
EndTime: t.GetEndTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
CollectionTtl: t.CollectionTtl,
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
Channel: t.GetChannel(),
InputSegments: t.GetInputSegments(),
ResultSegments: t.GetResultSegments(),
TotalRows: t.TotalRows,
Schema: t.Schema,
NodeID: t.GetNodeID(),
FailReason: t.GetFailReason(),
RetryTimes: t.GetRetryTimes(),
Pos: t.GetPos(),
}
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}

func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
Expand Down Expand Up @@ -303,53 +348,12 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
return plan, nil
}

func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
return false
}
return t.processCompleted()
}

func (t *l0CompactionTask) processCompleted() bool {
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

func (t *l0CompactionTask) processTimeout() bool {
t.resetSegmentCompacting()
return true
}

func (t *l0CompactionTask) processFailed() bool {
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
return true
func (t *l0CompactionTask) hasAssignedWorker() bool {
return t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID
}

func (t *l0CompactionTask) checkTimeout() bool {
Expand All @@ -370,6 +374,14 @@ func (t *l0CompactionTask) checkTimeout() bool {
return false
}

func (t *l0CompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}

func (t *l0CompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
Expand All @@ -380,18 +392,10 @@ func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) erro
return nil
}

func (t *l0CompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}

func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(task)
}

func (t *l0CompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *l0CompactionTask) saveSegmentMeta() error {
result := t.result
var operators []UpdateOperator
Expand Down
Loading

0 comments on commit cf56a14

Please sign in to comment.