Skip to content

Commit

Permalink
Remove recollect segment stats during starting datacoord (#27562)
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 authored Oct 13, 2023
1 parent 237d96f commit 42d778e
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 147 deletions.
16 changes: 3 additions & 13 deletions internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"fmt"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/samber/lo"
"go.uber.org/zap"
)

// Cluster provides interfaces to interact with datanode cluster
Expand Down Expand Up @@ -110,17 +111,6 @@ func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTas
c.sessionManager.Import(ctx, nodeID, it)
}

// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager.
func (c *Cluster) ReCollectSegmentStats(ctx context.Context) error {
for _, node := range c.sessionManager.getLiveNodeIDs() {
err := c.sessionManager.ReCollectSegmentStats(ctx, node)
if err != nil {
return err
}
}
return nil
}

// GetSessions returns all sessions
func (c *Cluster) GetSessions() []*Session {
return c.sessionManager.GetSessions()
Expand Down
70 changes: 4 additions & 66 deletions internal/datacoord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"stathat.com/c/consistent"

"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"stathat.com/c/consistent"
)

func getMetaKv(t *testing.T) kv.MetaKv {
Expand Down Expand Up @@ -585,66 +586,3 @@ func TestCluster_Import(t *testing.T) {
})
time.Sleep(500 * time.Millisecond)
}

func TestCluster_ReCollectSegmentStats(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()

t.Run("recollect succeed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var mockSessionCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNode, error) {
return newMockDataNodeClient(1, nil)
}
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})

t.Run("recollect failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})
}
29 changes: 1 addition & 28 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"

"github.com/milvus-io/milvus/internal/common"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
Expand Down Expand Up @@ -363,12 +364,6 @@ func (s *Server) startDataCoord() {
}
s.startServerLoop()

// DataCoord (re)starts successfully and starts to collection segment stats
// data from all DataNode.
// This will prevent DataCoord from missing out any important segment stats
// data while offline.
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
s.reCollectSegmentStats(s.ctx)
s.stateCode.Store(commonpb.StateCode_Healthy)
}

Expand Down Expand Up @@ -1022,25 +1017,3 @@ func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, e
}
return false, statusErr
}

func (s *Server) reCollectSegmentStats(ctx context.Context) {
if s.channelManager == nil {
log.Error("null channel manager found, which should NOT happen in non-testing environment")
return
}
nodes := s.sessionManager.getLiveNodeIDs()
log.Info("re-collecting segment stats from DataNodes",
zap.Int64s("DataNode IDs", nodes))

reCollectFunc := func() error {
err := s.cluster.ReCollectSegmentStats(ctx)
if err != nil {
return err
}
return nil
}

if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil {
panic(err)
}
}
33 changes: 4 additions & 29 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,23 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"

grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)

const (
flushTimeout = 15 * time.Second
// TODO: evaluate and update import timeout.
importTimeout = 3 * time.Hour
reCollectTimeout = 5 * time.Second
importTimeout = 3 * time.Hour
)

// SessionManager provides the grpc interfaces of cluster
Expand Down Expand Up @@ -223,32 +224,6 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data
log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr))
}

// ReCollectSegmentStats collects segment stats info from DataNodes, after DataCoord reboots.
func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) error {
cli, err := c.getClient(ctx, nodeID)
if err != nil {
log.Warn("failed to get dataNode client", zap.Int64("DataNode ID", nodeID), zap.Error(err))
return err
}
ctx, cancel := context.WithTimeout(ctx, reCollectTimeout)
defer cancel()
resp, err := cli.ResendSegmentStats(ctx, &datapb.ResendSegmentStatsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ResendSegmentStats),
commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()),
),
})
if err := VerifyResponse(resp, err); err != nil {
log.Warn("re-collect segment stats call failed",
zap.Int64("DataNode ID", nodeID), zap.Error(err))
return err
}
log.Info("re-collect segment stats call succeeded",
zap.Int64("DataNode ID", nodeID),
zap.Int64s("segment stat collected", resp.GetSegResent()))
return nil
}

func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateResult {
wg := sync.WaitGroup{}
ctx := context.Background()
Expand Down
8 changes: 2 additions & 6 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,17 +693,13 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen

// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
// It returns a list of segments to be sent.
// Deprecated in 2.2.15, reversed it just for compatibility during rolling back
func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) {
log.Info("start resending segment stats, if any",
zap.Int64("DataNode ID", Params.DataNodeCfg.GetNodeID()))
segResent := node.flowgraphManager.resendTT()
log.Info("found segment(s) with stats to resend",
zap.Int64s("segment IDs", segResent))
return &datapb.ResendSegmentStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
SegResent: segResent,
SegResent: make([]int64, 0),
}, nil
}

Expand Down
12 changes: 7 additions & 5 deletions internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"

"github.com/milvus-io/milvus/internal/common"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
Expand All @@ -47,9 +52,6 @@ import (
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

const returnError = "ReturnError"
Expand Down Expand Up @@ -1253,11 +1255,11 @@ func TestDataNode_ResendSegmentStats(t *testing.T) {
resp, err := node.ResendSegmentStats(node.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent())
assert.Empty(t, resp.GetSegResent())

// Duplicate call.
resp, err = node.ResendSegmentStats(node.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent())
assert.Empty(t, resp.GetSegResent())
}
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ service DataNode {
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(ImportTaskRequest) returns(common.Status) {}

// Deprecated
rpc ResendSegmentStats(ResendSegmentStatsRequest) returns(ResendSegmentStatsResponse) {}

rpc AddImportSegment(AddImportSegmentRequest) returns(AddImportSegmentResponse) {}
Expand Down
2 changes: 2 additions & 0 deletions internal/proto/datapb/data_coord.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 42d778e

Please sign in to comment.