Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jan 8, 2025
1 parent 935a9ef commit 8e637ee
Show file tree
Hide file tree
Showing 18 changed files with 232 additions and 55 deletions.
47 changes: 47 additions & 0 deletions internal/datacoord/session/mock_worker_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion internal/datacoord/task_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler)
at.req.MinClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMinClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSizeRatio = Params.DataCoordCfg.ClusteringCompactionMaxClusterSizeRatio.GetAsFloat()
at.req.MaxClusterSize = Params.DataCoordCfg.ClusteringCompactionMaxClusterSize.GetAsSize()
taskSlot := Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64()
at.req.TaskSlot = taskSlot

return true, defaultTaskSlot
return true, taskSlot
}

func (at *analyzeTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
5 changes: 4 additions & 1 deletion internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
}

taskSlot := Params.DataCoordCfg.AnalyzeTaskSlotUsage.GetAsInt64()

it.req = &workerpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
Expand All @@ -256,11 +258,12 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
OptionalScalarFields: optionalFields,
Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
TaskSlot: taskSlot,
}

log.Ctx(ctx).Info("index task pre check successfully", zap.Int64("taskID", it.GetTaskID()),
zap.Int64("segID", segment.GetID()))
return true, defaultTaskSlot
return true, taskSlot
}

func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
3 changes: 1 addition & 2 deletions internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

const (
reqTimeoutInterval = time.Second * 10
defaultTaskSlot = 16
)

type taskScheduler struct {
Expand Down Expand Up @@ -427,7 +426,7 @@ func (s *taskScheduler) processInit(task Task, nodeSlots map[int64]int64) bool {
log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", task.GetTaskID()),
zap.Int64("nodeID", nodeID))
nodeSlots[nodeID] = nodeSlots[nodeID] - taskSLot
return s.processInProgress(task)
return true
}

func (s *taskScheduler) processFinished(task Task) bool {
Expand Down
33 changes: 25 additions & 8 deletions internal/datacoord/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,9 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)

workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().QuerySlots().Return(map[int64]int64{
1: 16,
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)

mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)), withIndexMeta(createIndexMeta(catalog)),
Expand Down Expand Up @@ -972,6 +974,9 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {

catalog := catalogmocks.NewDataCoordCatalog(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().Return(map[int64]int64{
1: 16,
})

mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
Expand Down Expand Up @@ -1033,6 +1038,9 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in := mocks.NewMockIndexNodeClient(s.T())

workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().Return(map[int64]int64{
1: 16,
})

mt := createMeta(catalog, withAnalyzeMeta(s.createAnalyzeMeta(catalog)),
withIndexMeta(&indexMeta{
Expand Down Expand Up @@ -1075,14 +1083,12 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// pick client fail --> state: init
workerManager.EXPECT().PickClient().Return(0, nil).Once()

// update version failed --> state: init
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update version error")).Once()

// assign task to indexNode fail --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(&commonpb.Status{
Code: 65535,
Expand All @@ -1101,6 +1107,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// update state to building failed --> state: retry
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(errors.New("catalog update building state error")).Once()
Expand All @@ -1110,6 +1117,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// assign success --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

Expand Down Expand Up @@ -1163,6 +1171,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

Expand All @@ -1177,6 +1186,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

Expand All @@ -1193,6 +1203,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

Expand All @@ -1204,6 +1215,7 @@ func (s *taskSchedulerSuite) Test_analyzeTaskFailCase() {
in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

// init --> state: InProgress
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Twice()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil).Once()

Expand Down Expand Up @@ -1271,6 +1283,9 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
catalog := catalogmocks.NewDataCoordCatalog(s.T())
in := mocks.NewMockIndexNodeClient(s.T())
workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().QuerySlots().Return(map[int64]int64{
1: 16,
})

mt := createMeta(catalog,
withAnalyzeMeta(&analyzeMeta{
Expand Down Expand Up @@ -1361,7 +1376,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
}, nil).Once()

// assign failed --> retry
workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Once()
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *workerpb.CreateJobV2Request, option ...grpc.CallOption) (*commonpb.Status, error) {
indexNodeTasks[request.GetTaskID()]++
Expand All @@ -1378,7 +1393,7 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() {
}).Once()

// init --> inProgress
workerManager.EXPECT().PickClient().Return(s.nodeID, in).Once()
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true).Once()
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil).Twice()
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{
ID: collID,
Expand Down Expand Up @@ -1457,7 +1472,9 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
in := mocks.NewMockIndexNodeClient(s.T())

workerManager := session.NewMockWorkerManager(s.T())
workerManager.EXPECT().PickClient().Return(s.nodeID, in)
workerManager.EXPECT().QuerySlots().Return(map[int64]int64{
1: 16,
})
workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true)

minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1
Expand Down
14 changes: 13 additions & 1 deletion internal/datacoord/task_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"math"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -147,6 +148,14 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error {
return meta.statsTaskMeta.UpdateBuildingTask(st.taskID)
}

func (st *statsTask) calculateTaskSlotPolicy(segmentSize int64) int64 {
maxSegmentSize := Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024
statsTaskSlotUsage := Params.DataCoordCfg.StatsTaskSlotUsage.GetAsFloat()

slot := int64(math.Ceil(float64(segmentSize) / float64(maxSegmentSize) * statsTaskSlotUsage))
return slot
}

func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) (bool, int64) {
// set segment compacting
log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
Expand Down Expand Up @@ -187,6 +196,8 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) (b
return false, 0
}

taskSlot := st.calculateTaskSlotPolicy(segment.getSegmentSize())

st.req = &workerpb.CreateStatsRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
TaskID: st.GetTaskID(),
Expand All @@ -206,9 +217,10 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) (b
CollectionTtl: collTtl.Nanoseconds(),
CurrentTs: tsoutil.GetCurrentTime(),
BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(),
TaskSlot: taskSlot,
}

return true, defaultTaskSlot
return true, taskSlot
}

func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
Expand Down
21 changes: 15 additions & 6 deletions internal/datacoord/task_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"go.uber.org/atomic"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -69,6 +71,7 @@ func (s *statsTaskSuite) SetupSuite() {
MaxRowNum: 65535,
Level: datapb.SegmentLevel_L2,
},
size: *atomic.NewInt64(512 * 1024 * 1024),
},
},
secondaryIndexes: segmentInfoIndexes{
Expand Down Expand Up @@ -206,35 +209,38 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
s.Run("segment not healthy", func() {
s.mt.segments.segments[s.segID].State = commonpb.SegmentState_Dropped

checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
})

s.False(checkPass)
s.Zero(taskSlot)
})

s.Run("segment is sorted", func() {
s.mt.segments.segments[s.segID].State = commonpb.SegmentState_Flushed
s.mt.segments.segments[s.segID].IsSorted = true

checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
})

s.False(checkPass)
s.Zero(taskSlot)
})

s.Run("get collection failed", func() {
s.mt.segments.segments[s.segID].IsSorted = false

handler := NewNMockHandler(s.T())
handler.EXPECT().GetCollection(context.Background(), collID).Return(nil, fmt.Errorf("mock error")).Once()
checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
handler: handler,
})

s.False(checkPass)
s.Zero(taskSlot)
})

s.Run("get collection ttl failed", func() {
Expand Down Expand Up @@ -266,12 +272,13 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Properties: map[string]string{common.CollectionTTLConfigKey: "false"},
}, nil).Once()

checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
handler: handler,
})

s.False(checkPass)
s.Zero(taskSlot)
})

s.Run("alloc failed", func() {
Expand Down Expand Up @@ -306,13 +313,14 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Properties: map[string]string{common.CollectionTTLConfigKey: "100"},
}, nil)

checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
handler: handler,
allocator: alloc,
})

s.False(checkPass)
s.Zero(taskSlot)
})

s.Run("normal case", func() {
Expand Down Expand Up @@ -347,13 +355,14 @@ func (s *statsTaskSuite) TestTaskStats_PreCheck() {
Properties: map[string]string{common.CollectionTTLConfigKey: "100"},
}, nil)

checkPass := st.PreCheck(context.Background(), &taskScheduler{
checkPass, taskSlot := st.PreCheck(context.Background(), &taskScheduler{
meta: s.mt,
handler: handler,
allocator: alloc,
})

s.True(checkPass)
s.Equal(int64(2), taskSlot)
})
})

Expand Down
2 changes: 2 additions & 0 deletions internal/indexnode/indexnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type IndexNode struct {

binlogIO io.BinlogIO

usingSlot int64

initOnce sync.Once
stateLock sync.Mutex
indexTasks map[taskKey]*indexTaskInfo
Expand Down
Loading

0 comments on commit 8e637ee

Please sign in to comment.