Skip to content

Commit

Permalink
enhance: Add more ut for l0CompactionTask (#35100)
Browse files Browse the repository at this point in the history
See also: #34796

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Aug 9, 2024
1 parent fa41c79 commit 06f9ba2
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 91 deletions.
190 changes: 99 additions & 91 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,52 +139,69 @@ func (t *l0CompactionTask) processExecuting() bool {
return false
}

func (t *l0CompactionTask) GetSpan() trace.Span {
return t.span
func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
return false
}
return t.processCompleted()
}

func (t *l0CompactionTask) processCompleted() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) processTimeout() bool {
t.resetSegmentCompacting()
return true
}

func (t *l0CompactionTask) processFailed() bool {
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) GetResult() *datapb.CompactionPlanResult {
return t.result
}

func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) {
t.CompactionTask = task
func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}

func (t *l0CompactionTask) SetSpan(span trace.Span) {
t.span = span
func (t *l0CompactionTask) SetTask(task *datapb.CompactionTask) {
t.CompactionTask = task
}

func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
func (t *l0CompactionTask) GetSpan() trace.Span {
return t.span
}

func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := &datapb.CompactionTask{
PlanID: t.GetPlanID(),
TriggerID: t.GetTriggerID(),
State: t.GetState(),
StartTime: t.GetStartTime(),
EndTime: t.GetEndTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
CollectionTtl: t.CollectionTtl,
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
Channel: t.GetChannel(),
InputSegments: t.GetInputSegments(),
ResultSegments: t.GetResultSegments(),
TotalRows: t.TotalRows,
Schema: t.Schema,
NodeID: t.GetNodeID(),
FailReason: t.GetFailReason(),
RetryTimes: t.GetRetryTimes(),
Pos: t.GetPos(),
}
for _, opt := range opts {
opt(taskClone)
}
return taskClone
func (t *l0CompactionTask) SetSpan(span trace.Span) {
t.span = span
}

func (t *l0CompactionTask) EndSpan() {
Expand All @@ -193,20 +210,24 @@ func (t *l0CompactionTask) EndSpan() {
}
}

func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
func (t *l0CompactionTask) SetPlan(plan *datapb.CompactionPlan) {
t.plan = plan
}

func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
func (t *l0CompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}

func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
func (t *l0CompactionTask) GetLabel() string {
return fmt.Sprintf("%d-%s", t.PartitionID, t.GetChannel())
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (!t.hasAssignedWorker())
}

func (t *l0CompactionTask) CleanLogPath() {
Expand All @@ -229,6 +250,34 @@ func (t *l0CompactionTask) CleanLogPath() {
}
}

func (t *l0CompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.CompactionTask {
taskClone := &datapb.CompactionTask{
PlanID: t.GetPlanID(),
TriggerID: t.GetTriggerID(),
State: t.GetState(),
StartTime: t.GetStartTime(),
EndTime: t.GetEndTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
CollectionTtl: t.CollectionTtl,
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
Channel: t.GetChannel(),
InputSegments: t.GetInputSegments(),
ResultSegments: t.GetResultSegments(),
TotalRows: t.TotalRows,
Schema: t.Schema,
NodeID: t.GetNodeID(),
FailReason: t.GetFailReason(),
RetryTimes: t.GetRetryTimes(),
Pos: t.GetPos(),
}
for _, opt := range opts {
opt(taskClone)
}
return taskClone
}

func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
beginLogID, _, err := t.allocator.allocN(1)
if err != nil {
Expand Down Expand Up @@ -306,53 +355,12 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
return plan, nil
}

func (t *l0CompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err != nil {
log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
return false
}
return t.processCompleted()
}

func (t *l0CompactionTask) processCompleted() bool {
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID()))
return true
}

func (t *l0CompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

func (t *l0CompactionTask) processTimeout() bool {
t.resetSegmentCompacting()
return true
}

func (t *l0CompactionTask) processFailed() bool {
if t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
}

t.resetSegmentCompacting()
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
return true
func (t *l0CompactionTask) hasAssignedWorker() bool {
return t.GetNodeID() != 0 && t.GetNodeID() != NullNodeID
}

func (t *l0CompactionTask) checkTimeout() bool {
Expand All @@ -373,6 +381,14 @@ func (t *l0CompactionTask) checkTimeout() bool {
return false
}

func (t *l0CompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}

func (t *l0CompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
Expand All @@ -383,18 +399,10 @@ func (t *l0CompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) erro
return nil
}

func (t *l0CompactionTask) SetNodeID(id UniqueID) error {
return t.updateAndSaveTaskMeta(setNodeID(id))
}

func (t *l0CompactionTask) saveTaskMeta(task *datapb.CompactionTask) error {
return t.meta.SaveCompactionTask(task)
}

func (t *l0CompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *l0CompactionTask) saveSegmentMeta() error {
result := t.result
var operators []UpdateOperator
Expand Down
Loading

0 comments on commit 06f9ba2

Please sign in to comment.