Skip to content

Commit

Permalink
feat: add segment/channel/task/slow query render
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed Nov 12, 2024
1 parent b5b0035 commit ab158f6
Show file tree
Hide file tree
Showing 43 changed files with 2,673 additions and 705 deletions.
6 changes: 1 addition & 5 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal
ctx: ctx,
catalog: catalog,
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](1024, nil, time.Minute*60),
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](32, nil, time.Minute*15),
}
if err := csm.reloadFromKV(); err != nil {
return nil, err
Expand Down Expand Up @@ -178,10 +178,6 @@ func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) e

func (csm *compactionTaskMeta) TaskStatsJSON() string {
tasks := csm.taskStats.Values()
if len(tasks) == 0 {
return ""
}

ret, err := json.Marshal(tasks)
if err != nil {
return ""
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (suite *CompactionTaskMetaSuite) TestTaskStatsJSON() {

// testing return empty string
actualJSON := suite.meta.TaskStatsJSON()
suite.Equal("", actualJSON)
suite.Equal("[]", actualJSON)

err := suite.meta.SaveCompactionTask(task1)
suite.NoError(err)
Expand Down
5 changes: 1 addition & 4 deletions internal/datacoord/import_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type importTasks struct {
func newImportTasks() *importTasks {
return &importTasks{
tasks: make(map[int64]ImportTask),
taskStats: expirable.NewLRU[UniqueID, ImportTask](4096, nil, time.Minute*60),
taskStats: expirable.NewLRU[UniqueID, ImportTask](64, nil, time.Minute*30),
}
}

Expand Down Expand Up @@ -301,9 +301,6 @@ func (m *importMeta) RemoveTask(taskID int64) error {

func (m *importMeta) TaskStatsJSON() string {
tasks := m.tasks.listTaskStats()
if len(tasks) == 0 {
return ""
}

ret, err := json.Marshal(tasks)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/import_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func TestTaskStatsJSON(t *testing.T) {
assert.NoError(t, err)

statsJSON := im.TaskStatsJSON()
assert.Equal(t, "", statsJSON)
assert.Equal(t, "[]", statsJSON)

task1 := &importTask{
ImportTaskV2: &datapb.ImportTaskV2{
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/import_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (p *preImportTask) MarshalJSON() ([]byte, error) {
NodeID: p.GetNodeID(),
State: p.GetState().String(),
Reason: p.GetReason(),
TaskType: "PreImportTask",
TaskType: p.GetType().String(),
CreatedTime: p.GetCreatedTime(),
CompleteTime: p.GetCompleteTime(),
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (t *importTask) MarshalJSON() ([]byte, error) {
NodeID: t.GetNodeID(),
State: t.GetState().String(),
Reason: t.GetReason(),
TaskType: "ImportTask",
TaskType: t.GetType().String(),
CreatedTime: t.GetCreatedTime(),
CompleteTime: t.GetCompleteTime(),
}
Expand Down
6 changes: 1 addition & 5 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newSegmentIndexBuildInfo() *segmentBuildInfo {
// build ID -> segment index
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
// build ID -> task stats
taskStats: expirable.NewLRU[UniqueID, *indexTaskStats](1024, nil, time.Minute*60),
taskStats: expirable.NewLRU[UniqueID, *indexTaskStats](64, nil, time.Minute*30),
}
}

Expand Down Expand Up @@ -1075,10 +1075,6 @@ func (m *indexMeta) HasIndex(collectionID int64) bool {

func (m *indexMeta) TaskStatsJSON() string {
tasks := m.segmentBuildInfo.GetTaskStats()
if len(tasks) == 0 {
return ""
}

ret, err := json.Marshal(tasks)
if err != nil {
return ""
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ func TestBuildIndexTaskStatsJSON(t *testing.T) {
}

actualJSON := im.TaskStatsJSON()
assert.Equal(t, "", actualJSON)
assert.Equal(t, "[]", actualJSON)

im.segmentBuildInfo.Add(si1)
im.segmentBuildInfo.Add(si2)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() {
allocator: alloc,
tasks: make(map[int64]Task),
meta: mt,
taskStats: expirable.NewLRU[UniqueID, Task](1024, nil, time.Minute*5),
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*5),
},
allocator: alloc,
}
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/metrics_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestGetSyncTaskMetrics(t *testing.T) {
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster

expectedJSON := ""
expectedJSON := "null"
actualJSON, err := svr.getSyncTaskJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestGetSegmentsJSON(t *testing.T) {
mockCluster.EXPECT().GetSessions().Return([]*session.Session{session.NewSession(&session.NodeInfo{NodeID: 1}, dataNodeCreator)})
svr.cluster = mockCluster

expectedJSON := ""
expectedJSON := "null"
actualJSON, err := svr.getSegmentsJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestGetChannelsJSON(t *testing.T) {
svr.cluster = mockCluster
svr.meta = &meta{channelCPs: newChannelCps()}

expectedJSON := ""
expectedJSON := "null"
actualJSON, err := svr.getChannelsJSON(ctx, req)
assert.NoError(t, err)
assert.Equal(t, expectedJSON, actualJSON)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newTaskScheduler(
handler: handler,
indexEngineVersionManager: indexEngineVersionManager,
allocator: allocator,
taskStats: expirable.NewLRU[UniqueID, Task](1024, nil, time.Minute*5),
taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15),
}
ts.reloadFromMeta()
return ts
Expand Down
21 changes: 21 additions & 0 deletions internal/distributed/proxy/httpserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (h *Handlers) RegisterRoutesTo(router gin.IRouter) {
router.GET("/health", wrapHandler(h.handleGetHealth))
router.POST("/dummy", wrapHandler(h.handleDummy))

router.GET("/databases", wrapHandler(h.handleListDatabases))
router.GET("/database", wrapHandler(h.handleDescribeDatabases))

router.POST("/collection", wrapHandler(h.handleCreateCollection))
router.DELETE("/collection", wrapHandler(h.handleDropCollection))
router.GET("/collection/existence", wrapHandler(h.handleHasCollection))
Expand Down Expand Up @@ -96,6 +99,24 @@ func (h *Handlers) handleDummy(c *gin.Context) (interface{}, error) {
return h.proxy.Dummy(c, &req)
}

func (h *Handlers) handleListDatabases(c *gin.Context) (interface{}, error) {
req := milvuspb.ListDatabasesRequest{}
err := shouldBind(c, &req)
if err != nil {
return nil, fmt.Errorf("%w: parse body failed: %v", errBadRequest, err)
}
return h.proxy.ListDatabases(c, &req)
}

func (h *Handlers) handleDescribeDatabases(c *gin.Context) (interface{}, error) {
req := milvuspb.DescribeDatabaseRequest{}
err := shouldBind(c, &req)
if err != nil {
return nil, fmt.Errorf("%w: parse body failed: %v", errBadRequest, err)
}
return h.proxy.DescribeDatabase(c, &req)
}

func (h *Handlers) handleCreateCollection(c *gin.Context) (interface{}, error) {
wrappedReq := WrappedCreateCollectionRequest{}
err := shouldBind(c, &wrappedReq)
Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSyncManager(chunkManager storage.ChunkManager) SyncManager {
keyLockDispatcher: dispatcher,
chunkManager: chunkManager,
tasks: typeutil.NewConcurrentMap[string, Task](),
taskStats: expirable.NewLRU[string, Task](512, nil, time.Minute*15),
taskStats: expirable.NewLRU[string, Task](16, nil, time.Minute*15),
}
// setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))
Expand Down
2 changes: 2 additions & 0 deletions internal/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
ClusterDependenciesPath = "/_cluster/dependencies"
// HookConfigsPath is the path to get hook configurations.
HookConfigsPath = "/_hook/configs"
// SlowQueryPath is the path to get slow queries metrics
SlowQueryPath = "/_cluster/slow_query"

// QCDistPath is the path to get QueryCoord distribution.
QCDistPath = "/_qc/dist"
Expand Down
158 changes: 0 additions & 158 deletions internal/http/webui/channels.html

This file was deleted.

Loading

0 comments on commit ab158f6

Please sign in to comment.