Skip to content

Commit

Permalink
fix: refuse schedule compaction tasks if there is no slot (#37589)
Browse files Browse the repository at this point in the history
See #37621

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu authored Nov 13, 2024
1 parent 3389a6b commit 1a49da2
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 47 deletions.
104 changes: 68 additions & 36 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
}

func (c *compactionPlanHandler) schedule() []CompactionTask {
selected := make([]CompactionTask, 0)
if c.queueTasks.Len() == 0 {
return selected
}
var (
parallelism = Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
slots map[int64]int64
)

c.executingGuard.Lock()
if len(c.executingTasks) >= parallelism {
return selected
}
c.executingGuard.Unlock()

l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
Expand Down Expand Up @@ -227,21 +242,20 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
c.queueTasks.Enqueue(t)
}
}()
selected := make([]CompactionTask, 0)

p := getPrioritizer()
if &c.queueTasks.prioritizer != &p {
c.queueTasks.UpdatePrioritizer(p)
}

c.executingGuard.Lock()
tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks)
c.executingGuard.Unlock()
for len(selected) < tasksToGo && c.queueTasks.Len() > 0 {
// The schedule loop will stop if either:
// 1. no more task to schedule (the task queue is empty)
// 2. the parallelism of running tasks is reached
// 3. no avaiable slots
for {
t, err := c.queueTasks.Dequeue()
if err != nil {
// Will never go here
return selected
break // 1. no more task to schedule
}

switch t.GetTaskProto().GetType() {
Expand Down Expand Up @@ -273,11 +287,27 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
selected = append(selected, t)
}

if t.NeedReAssignNodeID() {
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
log.RatedWarn(10, "not enough slots for compaction task", zap.Int64("planID", t.GetTaskProto().GetPlanID()))
selected = selected[:len(selected)-1]
excluded = append(excluded, t)
break // 3. no avaiable slots
}
}

c.executingGuard.Lock()
c.executingTasks[t.GetTaskProto().GetPlanID()] = t
if len(c.executingTasks) >= parallelism {
break // 2. the parallelism of running tasks is reached
}
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
return selected
}
Expand Down Expand Up @@ -592,49 +622,51 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (Com
return task, nil
}

func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
slots := c.cluster.QuerySlots()
func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
if len(slots) == 0 {
return
return NullNodeID
}

for _, t := range tasks {
nodeID, useSlot := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel()))
continue
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err))
} else {
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
nodeID, useSlot := pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("type", t.GetTaskProto().GetType().String()), zap.String("vchannel", t.GetTaskProto().GetChannel()))
return NullNodeID
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Error(err))
return NullNodeID
}
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.String("vchannel", t.GetTaskProto().GetChannel()), zap.Any("nodeID", nodeID))
return nodeID
}

func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.

var needAssignIDTasks []CompactionTask
// Assign node id if needed
var slots map[int64]int64
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.NeedReAssignNodeID() {
needAssignIDTasks = append(needAssignIDTasks, t)
if slots == nil {
slots = c.cluster.QuerySlots()
}
id := assignNodeID(slots, t)
if id == NullNodeID {
break
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc()
}
}
c.executingGuard.RUnlock()
if len(needAssignIDTasks) > 0 {
c.assignNodeIDs(needAssignIDTasks)
}

var finishedTasks []CompactionTask
c.executingGuard.RLock()
Expand All @@ -658,7 +690,7 @@ func (c *compactionPlanHandler) checkCompaction() error {
return nil
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

Expand Down
3 changes: 3 additions & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func (t *mixCompactionTask) processPipelining() bool {

err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan())
if err != nil {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here.
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
Expand Down
94 changes: 83 additions & 11 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ type CompactionPlanHandlerSuite struct {
mockCm *MockChannelManager
mockSessMgr *session.MockDataNodeManager
handler *compactionPlanHandler
cluster Cluster
cluster *MockCluster
}

func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockMeta = NewMockCompactionMeta(s.T())
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Maybe()
s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockCm = NewMockChannelManager(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
Expand Down Expand Up @@ -231,6 +232,80 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
}
}

func (s *CompactionPlanHandlerSuite) TestScheduleWithSlotLimit() {
tests := []struct {
description string
tasks []CompactionTask
plans []*datapb.CompactionPlan
expectedOut []UniqueID // planID
}{
{
"2 L0 tasks, only 1 can be scheduled",
[]CompactionTask{
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 10,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-10",
}, nil, s.mockMeta, s.mockSessMgr),
newL0CompactionTask(&datapb.CompactionTask{
PlanID: 11,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
}, nil, s.mockMeta, s.mockSessMgr),
},
[]*datapb.CompactionPlan{
{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction},
{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
},
[]UniqueID{10},
},
{
"2 Mix tasks, only 1 can be scheduled",
[]CompactionTask{
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 14,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-2",
}, nil, s.mockMeta, s.mockSessMgr),
newMixCompactionTask(&datapb.CompactionTask{
PlanID: 13,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_pipelining,
Channel: "ch-11",
}, nil, s.mockMeta, s.mockSessMgr),
},
[]*datapb.CompactionPlan{
{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction},
{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
},
[]UniqueID{13},
},
}

for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.cluster.EXPECT().QuerySlots().Return(map[int64]int64{
101: 8,
}).Maybe()
s.generateInitTasksForSchedule()
// submit the testing tasks
for i, t := range test.tasks {
t.SetPlan(test.plans[i])
s.handler.submitTask(t)
}

gotTasks := s.handler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetTaskProto().GetPlanID()
}))
})
}
}

func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
// dataNode 102's paralleTasks has running L0 tasks
// nothing of the same channel will be able to schedule
Expand Down Expand Up @@ -378,7 +453,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
task1.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
node, useSlot := s.handler.pickAnyNode(nodeSlots, task1)
node, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

Expand All @@ -387,7 +462,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
}, nil, nil, nil)
task2.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()

node, useSlot = s.handler.pickAnyNode(nodeSlots, task2)
node, useSlot = pickAnyNode(nodeSlots, task2)
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot

Expand All @@ -396,11 +471,11 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
}, nil, nil, nil)
task3.slotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()

node, useSlot = s.handler.pickAnyNode(nodeSlots, task3)
node, useSlot = pickAnyNode(nodeSlots, task3)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
node, useSlot = pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}

Expand All @@ -414,7 +489,7 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeSlotUsageShouldNotBeZero() {
Type: datapb.CompactionType_MixCompaction,
}, nil, nil, nil)
task1.slotUsage = 0
nodeID, useSlot := s.handler.pickAnyNode(nodeSlots, task1)
nodeID, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(NullNodeID), nodeID)
s.Equal(int64(0), useSlot)
}
Expand All @@ -441,11 +516,11 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
executingTasks[1] = task1
executingTasks[2] = task2
s.handler.executingTasks = executingTasks
node, useSlot := s.handler.pickAnyNode(nodeSlots, task1)
node, useSlot := pickAnyNode(nodeSlots, task1)
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(nodeSlots, task2)
node, useSlot = pickAnyNode(nodeSlots, task2)
s.Equal(int64(NullNodeID), node)
}

Expand Down Expand Up @@ -555,7 +630,6 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)

task := &datapb.CompactionTask{
Expand Down Expand Up @@ -673,7 +747,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {

// s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
// s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).RunAndReturn(
func(t *datapb.CompactionTask, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
if t.GetPlanID() == 2 {
Expand Down Expand Up @@ -755,7 +828,6 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.SetupTest()

// s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
Expand Down

0 comments on commit 1a49da2

Please sign in to comment.