diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 683b0b7a4d1bb..cb5368c8d4381 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -439,9 +439,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { } } -// FilterThreshold is the start time ouf DataNode -var FilterThreshold Timestamp - // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { if err := node.idAllocator.Start(); err != nil { @@ -486,8 +483,6 @@ func (node *DataNode) Start() error { return errors.New("DataNode fail to start") } - FilterThreshold = rep.GetTimestamp() - go node.BackGroundGC(node.clearSignal) go node.compactionExecutor.start(node.ctx) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index b1706f51c96a3..590b931be4bb8 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -163,32 +163,6 @@ func (dsService *dataSyncService) clearGlobalFlushingCache() { dsService.flushingSegCache.Remove(segments...) } -// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord -func (dsService *dataSyncService) getSegmentInfos(segmentIds []int64) ([]*datapb.SegmentInfo, error) { - var segmentInfos []*datapb.SegmentInfo - infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_SegmentInfo, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyCfg.GetNodeID(), - }, - SegmentIDs: segmentIds, - IncludeUnHealthy: true, - }) - if err != nil { - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(infoResp.GetStatus().Reason) - log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) - return nil, err - } - segmentInfos = infoResp.Infos - return segmentInfos, nil -} - // initNodes inits a TimetickedFlowGraph func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error { dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) @@ -208,6 +182,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro } futures := make([]*concurrency.Future, 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos)) + for _, us := range unflushedSegmentInfos { if us.CollectionID != dsService.collectionID || us.GetInsertChannel() != vchanInfo.ChannelName { @@ -220,10 +195,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro continue } - log.Info("Recover Segment NumOfRows form checkpoints", - zap.String("InsertChannel", us.GetInsertChannel()), - zap.Int64("SegmentID", us.GetID()), - zap.Int64("NumOfRows", us.GetNumOfRows()), + log.Info("recover growing segments form checkpoints", + zap.String("vChannelName", us.GetInsertChannel()), + zap.Int64("segmentID", us.GetID()), + zap.Int64("numRows", us.GetNumOfRows()), ) var cp *segmentCheckPoint if us.GetDmlPosition() != nil { @@ -255,11 +230,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro ) continue } - - log.Info("Recover Segment NumOfRows form checkpoints", - zap.String("InsertChannel", fs.GetInsertChannel()), - zap.Int64("SegmentID", fs.GetID()), - zap.Int64("NumOfRows", fs.GetNumOfRows()), + log.Info("recover sealed segments form checkpoints", + zap.String("vChannelName", fs.GetInsertChannel()), + zap.Int64("segmentID", fs.GetID()), + zap.Int64("numRows", fs.GetNumOfRows()), ) // avoid closure capture iteration variable segment := fs @@ -294,7 +268,20 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } - var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, unflushedSegmentInfos, dsService.msFactory, dsService.compactor) + var ddNode Node + ddNode, err = newDDNode( + dsService.ctx, + dsService.collectionID, + vchanInfo.GetChannelName(), + vchanInfo.GetDroppedSegmentIds(), + flushedSegmentInfos, + unflushedSegmentInfos, + dsService.msFactory, + dsService.compactor) + if err != nil { + return err + } + var insertBufferNode Node insertBufferNode, err = newInsertBufferNode( dsService.ctx, @@ -361,3 +348,29 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro } return nil } + +// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord +func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb.SegmentInfo, error) { + var segmentInfos []*datapb.SegmentInfo + infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_SegmentInfo, + MsgID: 0, + Timestamp: 0, + SourceID: Params.ProxyCfg.GetNodeID(), + }, + SegmentIDs: segmentIDs, + IncludeUnHealthy: true, + }) + if err != nil { + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } + if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success { + err = errors.New(infoResp.GetStatus().Reason) + log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err)) + return nil, err + } + segmentInfos = infoResp.Infos + return segmentInfos, nil +} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 3b84e30c5df59..3484b483ca349 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "reflect" - "sync" "sync/atomic" "github.com/opentracing/opentracing-go" @@ -46,14 +45,13 @@ var _ flowgraph.Node = (*ddNode)(nil) // ddNode filters messages from message streams. // // ddNode recives all the messages from message stream dml channels, including insert messages, -// delete messages and ddl messages like CreateCollectionMsg. +// delete messages and ddl messages like CreateCollectionMsg and DropCollectionMsg. // -// ddNode filters insert messages according to the `flushedSegment` and `FilterThreshold`. -// If the timestamp of the insert message is earlier than `FilterThreshold`, ddNode will -// filter out the insert message for those who belong to `flushedSegment` +// ddNode filters insert messages according to the `sealedSegment`. +// ddNode will filter out the insert message for those who belong to `sealedSegment` // // When receiving a `DropCollection` message, ddNode will send a signal to DataNode `BackgroundGC` -// goroutinue, telling DataNode to release the resources of this perticular flow graph. +// goroutinue, telling DataNode to release the resources of this particular flow graph. // // After the filtering process, ddNode passes all the valid insert messages and delete message // to the following flow graph node, which in DataNode is `insertBufferNode` @@ -62,20 +60,21 @@ type ddNode struct { ctx context.Context collectionID UniqueID - - segID2SegInfo sync.Map // segment ID to *SegmentInfo - flushedSegmentIDs []int64 - droppedSegmentIDs []int64 - vchannelName string + vChannelName string deltaMsgStream msgstream.MsgStream dropMode atomic.Value compactionExecutor *compactionExecutor + + // for recovery + growingSegInfo map[UniqueID]*datapb.SegmentInfo // segmentID + sealedSegInfo map[UniqueID]*datapb.SegmentInfo // segmentID + droppedSegmentIDs []int64 } // Name returns node name, implementing flowgraph.Node func (ddn *ddNode) Name() string { - return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vchannelName) + return fmt.Sprintf("ddNode-%d-%s", ddn.collectionID, ddn.vChannelName) } // Operate handles input messages, implementing flowgrpah.Node @@ -104,7 +103,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if load := ddn.dropMode.Load(); load != nil && load.(bool) { log.Debug("ddNode in dropMode", - zap.String("vchannel name", ddn.vchannelName), + zap.String("vChannelName", ddn.vChannelName), zap.Int64("collection ID", ddn.collectionID)) return []Msg{} } @@ -127,39 +126,41 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { log.Info("Receiving DropCollection msg", zap.Any("collectionID", ddn.collectionID), - zap.String("vChannelName", ddn.vchannelName)) + zap.String("vChannelName", ddn.vChannelName)) ddn.dropMode.Store(true) - log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vchannelName)) - ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vchannelName) + log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName)) + ddn.compactionExecutor.stopExecutingtaskByVChannelName(ddn.vChannelName) fgMsg.dropCollection = true } + case commonpb.MsgType_Insert: imsg := msg.(*msgstream.InsertMsg) if imsg.CollectionID != ddn.collectionID { - log.Warn("filter invalid InsertMsg, collection mis-match", + log.Warn("filter invalid insert message, collection mis-match", zap.Int64("Get collID", imsg.CollectionID), zap.Int64("Expected collID", ddn.collectionID)) continue } - if msg.EndTs() < FilterThreshold { - log.Info("Filtering Insert Messages", - zap.Uint64("Message endts", msg.EndTs()), - zap.Uint64("FilterThreshold", FilterThreshold), + + if ddn.tryToFilterSegmentInsertMessages(imsg) { + log.Info("filter insert messages", + zap.Int64("filter segment ID", imsg.GetSegmentID()), + zap.Uint64("message timestamp", msg.EndTs()), ) - if ddn.filterFlushedSegmentInsertMessages(imsg) { - continue - } + continue } + log.Debug("DDNode receive insert messages", zap.Int("numRows", len(imsg.GetRowIDs())), - zap.String("vChannelName", ddn.vchannelName)) + zap.String("vChannelName", ddn.vChannelName)) fgMsg.insertMessages = append(fgMsg.insertMessages, imsg) + case commonpb.MsgType_Delete: dmsg := msg.(*msgstream.DeleteMsg) log.Debug("DDNode receive delete messages", zap.Int64("num", dmsg.NumRows), - zap.String("vChannelName", ddn.vchannelName)) + zap.String("vChannelName", ddn.vChannelName)) for i := int64(0); i < dmsg.NumRows; i++ { dmsg.HashValues = append(dmsg.HashValues, uint32(0)) } @@ -177,7 +178,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { return ddn.forwardDeleteMsg(forwardMsgs, msMsg.TimestampMin(), msMsg.TimestampMax()) }, flowGraphRetryOpt) if err != nil { - err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vchannelName, err) + err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vChannelName, err) log.Error(err.Error()) if !common.IsIgnorableError(err) { panic(err) @@ -194,27 +195,31 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { return []Msg{&fgMsg} } -func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) bool { - if ddn.isFlushed(msg.GetSegmentID()) || ddn.isDropped(msg.GetSegmentID()) { +func (ddn *ddNode) tryToFilterSegmentInsertMessages(msg *msgstream.InsertMsg) bool { + // Filter all dropped segments + if ddn.isDropped(msg.GetSegmentID()) { return true } - if si, ok := ddn.segID2SegInfo.Load(msg.GetSegmentID()); ok { - if msg.EndTs() <= si.(*datapb.SegmentInfo).GetDmlPosition().GetTimestamp() { - return true + // Filter all sealed segments until current Ts > Sealed segment cp + for segID, segInfo := range ddn.sealedSegInfo { + if msg.EndTs() > segInfo.GetDmlPosition().GetTimestamp() { + delete(ddn.sealedSegInfo, segID) } - - ddn.segID2SegInfo.Delete(msg.GetSegmentID()) } - return false -} + if _, ok := ddn.sealedSegInfo[msg.GetSegmentID()]; ok { + return true + } -func (ddn *ddNode) isFlushed(segmentID UniqueID) bool { - for _, s := range ddn.flushedSegmentIDs { - if s == segmentID { + // Filter all growing segments until current Ts > growing segment dmlPosition + if si, ok := ddn.growingSegInfo[msg.GetSegmentID()]; ok { + if msg.EndTs() <= si.GetDmlPosition().GetTimestamp() { return true } + + delete(ddn.growingSegInfo, msg.GetSegmentID()) } + return false } @@ -273,7 +278,7 @@ func (ddn *ddNode) sendDeltaTimeTick(ts Timestamp) error { zap.Any("collectionID", ddn.collectionID), zap.Any("ts", ts), zap.Any("ts_p", p), - zap.Any("channel", ddn.vchannelName), + zap.Any("channel", ddn.vChannelName), ) return nil } @@ -284,35 +289,28 @@ func (ddn *ddNode) Close() { } } -func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelInfo, - unflushedSegments []*datapb.SegmentInfo, msFactory msgstream.Factory, compactor *compactionExecutor) *ddNode { +func newDDNode(ctx context.Context, collID UniqueID, vChannelName string, droppedSegmentIDs []UniqueID, + sealedSegments []*datapb.SegmentInfo, growingSegments []*datapb.SegmentInfo, + msFactory msgstream.Factory, compactor *compactionExecutor) (*ddNode, error) { + baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength) baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism) - fs := make([]int64, 0, len(vchanInfo.GetFlushedSegmentIds())) - fs = append(fs, vchanInfo.GetFlushedSegmentIds()...) - log.Info("ddNode add flushed segment", - zap.Int64("collectionID", vchanInfo.GetCollectionID()), - zap.Int("No. Segment", len(vchanInfo.FlushedSegmentIds)), - ) - deltaStream, err := msFactory.NewMsgStream(ctx) if err != nil { - log.Error(err.Error()) - return nil + return nil, err } - pChannelName := funcutil.ToPhysicalChannel(vchanInfo.ChannelName) - log.Info("ddNode add flushed segment", - zap.String("channelName", vchanInfo.ChannelName), + pChannelName := funcutil.ToPhysicalChannel(vChannelName) + log.Info("ddNode convert vChannel to pChannel", + zap.String("vChannelName", vChannelName), zap.String("pChannelName", pChannelName), ) + deltaChannelName, err := funcutil.ConvertChannelName(pChannelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) if err != nil { - log.Error(err.Error()) - return nil + return nil, err } - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) deltaStream.AsProducer([]string{deltaChannelName}) metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Inc() @@ -324,23 +322,28 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI ctx: ctx, BaseNode: baseNode, collectionID: collID, - flushedSegmentIDs: fs, - droppedSegmentIDs: vchanInfo.GetDroppedSegmentIds(), - vchannelName: vchanInfo.ChannelName, + sealedSegInfo: make(map[UniqueID]*datapb.SegmentInfo, len(sealedSegments)), + growingSegInfo: make(map[UniqueID]*datapb.SegmentInfo, len(growingSegments)), + droppedSegmentIDs: droppedSegmentIDs, + vChannelName: vChannelName, deltaMsgStream: deltaMsgStream, compactionExecutor: compactor, } dd.dropMode.Store(false) - for _, us := range unflushedSegments { - dd.segID2SegInfo.Store(us.GetID(), us) + for _, s := range sealedSegments { + dd.sealedSegInfo[s.GetID()] = s } - log.Info("ddNode add unflushed segment", + for _, s := range growingSegments { + dd.growingSegInfo[s.GetID()] = s + } + log.Info("ddNode add sealed and growing segments", zap.Int64("collectionID", collID), - zap.Int("No. Segment", len(vchanInfo.GetUnflushedSegmentIds())), + zap.Int("No. sealed segments", len(sealedSegments)), + zap.Int("No. growing segments", len(growingSegments)), ) - return dd + return dd, nil } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index c91cafc35f45f..3e90dd2f19d1d 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -21,90 +21,74 @@ import ( "fmt" "testing" - "github.com/milvus-io/milvus/internal/util/retry" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + + "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/flowgraph" + "github.com/milvus-io/milvus/internal/util/retry" ) -type mockFactory struct { - msgstream.Factory -} - -func TestFlowGraph_DDNode_newDDNode(te *testing.T) { +func TestFlowGraph_DDNode_newDDNode(t *testing.T) { tests := []struct { - inCollID UniqueID - - inFlushedSegs []UniqueID - inFlushedChannelTs Timestamp - inUnFlushedSegID UniqueID - inUnFlushedChannelTs Timestamp - description string + + inSealedSegs []*datapb.SegmentInfo + inGrowingSegs []*datapb.SegmentInfo }{ - {UniqueID(1), []UniqueID{100, 101, 102}, 666666, 200, 666666, - "Input VchannelInfo with 3 flushed segs and 1 unflushed seg"}, - {UniqueID(2), []UniqueID{103}, 666666, 200, 666666, - "Input VchannelInfo with 1 flushed seg and 1 unflushed seg"}, - {UniqueID(3), []UniqueID{}, 666666, 200, 666666, - "Input VchannelInfo with 0 flushed segs and 1 unflushed seg"}, - {UniqueID(3), []UniqueID{104}, 666666, 0, 0, - "Input VchannelInfo with 1 flushed seg and empty unflushed seg"}, + { + "3 sealed segments and 1 growing segment", + []*datapb.SegmentInfo{ + getSegmentInfo(100, 10000), + getSegmentInfo(101, 10000), + getSegmentInfo(102, 10000)}, + []*datapb.SegmentInfo{ + getSegmentInfo(200, 10000)}, + }, + { + "0 sealed segments and 0 growing segment", + []*datapb.SegmentInfo{}, + []*datapb.SegmentInfo{}, + }, } - for _, test := range tests { - te.Run(test.description, func(t *testing.T) { - di := &datapb.SegmentInfo{} - - if test.inUnFlushedSegID != 0 { - di.ID = test.inUnFlushedSegID - di.DmlPosition = &internalpb.MsgPosition{Timestamp: test.inUnFlushedChannelTs} - } + var ( + collectionID = UniqueID(1) + channelName = fmt.Sprintf("by-dev-rootcoord-dml-%s", t.Name()) + droppedSegIDs = []UniqueID{} + ) - var fi []*datapb.SegmentInfo - for _, id := range test.inFlushedSegs { - s := &datapb.SegmentInfo{ID: id} - fi = append(fi, s) - } - - mmf := &mockMsgStreamFactory{ - true, true, - } - ddNode := newDDNode( + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + mockFactory := &mockMsgStreamFactory{true, true} + ddNode, err := newDDNode( context.Background(), - test.inCollID, - &datapb.VchannelInfo{ - FlushedSegmentIds: test.inFlushedSegs, - UnflushedSegmentIds: []int64{di.ID}, - ChannelName: "by-dev-rootcoord-dml-test", - }, - []*datapb.SegmentInfo{di}, - mmf, + collectionID, + channelName, + droppedSegIDs, + test.inSealedSegs, + test.inGrowingSegs, + mockFactory, newCompactionExecutor(), ) + require.NoError(t, err) require.NotNil(t, ddNode) - assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vchannelName), ddNode.Name()) - assert.Equal(t, test.inCollID, ddNode.collectionID) - assert.Equal(t, len(test.inFlushedSegs), len(ddNode.flushedSegmentIDs)) - assert.ElementsMatch(t, test.inFlushedSegs, ddNode.flushedSegmentIDs) - - si, ok := ddNode.segID2SegInfo.Load(test.inUnFlushedSegID) - assert.True(t, ok) - assert.Equal(t, test.inUnFlushedSegID, si.(*datapb.SegmentInfo).GetID()) - assert.Equal(t, test.inUnFlushedChannelTs, si.(*datapb.SegmentInfo).GetDmlPosition().GetTimestamp()) + + assert.Equal(t, fmt.Sprintf("ddNode-%d-%s", ddNode.collectionID, ddNode.vChannelName), ddNode.Name()) + + assert.Equal(t, len(test.inSealedSegs), len(ddNode.sealedSegInfo)) + assert.Equal(t, len(test.inGrowingSegs), len(ddNode.growingSegInfo)) }) } } -func TestFlowGraph_DDNode_Operate(to *testing.T) { - to.Run("Test DDNode Operate DropCollection Msg", func(te *testing.T) { +func TestFlowGraph_DDNode_Operate(t *testing.T) { + t.Run("Test DDNode Operate DropCollection Msg", func(t *testing.T) { // invalid inputs invalidInTests := []struct { in []Msg @@ -119,7 +103,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { } for _, test := range invalidInTests { - te.Run(test.description, func(t *testing.T) { + t.Run(test.description, func(t *testing.T) { ddn := ddNode{} rt := ddn.Operate(test.in) assert.Empty(t, rt) @@ -142,7 +126,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { } for _, test := range tests { - te.Run(test.description, func(t *testing.T) { + t.Run(test.description, func(t *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) @@ -152,7 +136,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { ctx: context.Background(), collectionID: test.ddnCollID, deltaMsgStream: deltaStream, - vchannelName: "ddn_drop_msg", + vChannelName: "ddn_drop_msg", compactionExecutor: newCompactionExecutor(), } @@ -177,68 +161,32 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { } }) - to.Run("Test DDNode Operate Insert Msg", func(te *testing.T) { - tests := []struct { - ddnCollID UniqueID - inMsgCollID UniqueID - - MsgEndTs Timestamp - threshold Timestamp - - ddnFlushedSegment UniqueID - inMsgSegID UniqueID + t.Run("Test DDNode Operate and filter insert msg", func(t *testing.T) { + factory := dependency.NewDefaultFactory(true) + deltaStream, err := factory.NewMsgStream(context.Background()) + require.Nil(t, err) + deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) + deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) - expectedRtLen int - description string - }{ - {1, 1, 2000, 3000, 100, 100, 0, - "MsgEndTs(2000) < threshold(3000), inMsgSegID(100) IN ddnFlushedSeg {100}"}, - {1, 1, 2000, 3000, 100, 200, 1, - "MsgEndTs(2000) < threshold(3000), inMsgSegID(200) NOT IN ddnFlushedSeg {100}"}, - {1, 1, 4000, 3000, 100, 101, 1, - "Seg 101, MsgEndTs(4000) > FilterThreshold(3000)"}, - {1, 1, 4000, 3000, 100, 200, 1, - "Seg 200, MsgEndTs(4000) > FilterThreshold(3000)"}, - {1, 2, 4000, 3000, 100, 100, 0, - "inMsgCollID(2) != ddnCollID"}, + var ( + collectionID UniqueID = 1 + ) + // Prepare ddNode states + ddn := ddNode{ + ctx: context.Background(), + collectionID: collectionID, + droppedSegmentIDs: []UniqueID{100}, + deltaMsgStream: deltaStream, } - for _, test := range tests { - te.Run(test.description, func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) - deltaStream.AsProducer([]string{"DataNode-test-delta-channel-0"}) - // Prepare ddNode states - ddn := ddNode{ - ctx: context.Background(), - flushedSegmentIDs: []int64{test.ddnFlushedSegment}, - collectionID: test.ddnCollID, - deltaMsgStream: deltaStream, - } - FilterThreshold = test.threshold - - // Prepare insert messages - var iMsg msgstream.TsMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{EndTimestamp: test.MsgEndTs}, - InsertRequest: internalpb.InsertRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert}, - CollectionID: test.inMsgCollID, - SegmentID: test.inMsgSegID, - }, - } - tsMessages := []msgstream.TsMsg{iMsg} - var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) + tsMessages := []msgstream.TsMsg{getInsertMsg(100, 10000), getInsertMsg(200, 20000)} + var msgStreamMsg Msg = flowgraph.GenerateMsgStreamMsg(tsMessages, 0, 0, nil, nil) - // Test - rt := ddn.Operate([]Msg{msgStreamMsg}) - assert.Equal(t, test.expectedRtLen, len(rt[0].(*flowGraphMsg).insertMessages)) - }) - } + rt := ddn.Operate([]Msg{msgStreamMsg}) + assert.Equal(t, 1, len(rt[0].(*flowGraphMsg).insertMessages)) }) - to.Run("Test DDNode Operate Delete Msg", func(te *testing.T) { + t.Run("Test DDNode Operate Delete Msg", func(t *testing.T) { tests := []struct { ddnCollID UniqueID inMsgCollID UniqueID @@ -253,7 +201,7 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { } for _, test := range tests { - te.Run(test.description, func(t *testing.T) { + t.Run(test.description, func(t *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) assert.Nil(t, err) @@ -287,10 +235,10 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { } }) - to.Run("Test forwardDeleteMsg failed", func(te *testing.T) { + t.Run("Test forwardDeleteMsg failed", func(t *testing.T) { factory := dependency.NewDefaultFactory(true) deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(to, err) + assert.Nil(t, err) deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc) // Prepare ddNode states ddn := ddNode{ @@ -315,121 +263,231 @@ func TestFlowGraph_DDNode_Operate(to *testing.T) { // Test flowGraphRetryOpt = retry.Attempts(1) - assert.Panics(te, func() { + assert.Panics(t, func() { ddn.Operate([]Msg{msgStreamMsg}) }) }) } -func TestFlowGraph_DDNode_filterMessages(te *testing.T) { +func TestFlowGraph_DDNode_filterMessages(t *testing.T) { tests := []struct { - ddnFlushedSegments []UniqueID - ddnSegID2Ts map[UniqueID]Timestamp + description string - inMsgSegID UniqueID - inMsgSegEntTs Timestamp - expectedOut bool + droppedSegIDs []UniqueID + sealedSegInfo map[UniqueID]*datapb.SegmentInfo + growingSegInfo map[UniqueID]*datapb.SegmentInfo - description string + inMsg *msgstream.InsertMsg + expected bool }{ - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 1, 1500, true, - "Seg 1 in flushedSegs {1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 2, 1500, true, - "Seg 2 in flushedSegs {1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 3, 1500, true, - "Seg 3 in flushedSegs {1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 1500, false, - "Seg 4, inMsgSegEntTs(1500) > SegCheckPoint(1000)"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 500, true, - "Seg 4, inMsgSegEntTs(500) <= SegCheckPoint(1000)"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 4, 1000, true, - "Seg 4 inMsgSegEntTs(1000) <= SegCheckPoint(1000)"}, - {[]UniqueID{1, 2, 3}, map[UniqueID]Timestamp{4: 1000, 5: 2000}, 5, 1500, true, - "Seg 5 inMsgSegEntTs(1500) <= SegCheckPoint(2000)"}, + {"test dropped segments true", + []UniqueID{100}, + nil, + nil, + getInsertMsg(100, 10000), + true}, + {"test dropped segments true 2", + []UniqueID{100, 101, 102}, + nil, + nil, + getInsertMsg(102, 10000), + true}, + {"test sealed segments msgTs <= segmentTs true", + []UniqueID{}, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + nil, + getInsertMsg(200, 10000), + true}, + {"test sealed segments msgTs <= segmentTs true", + []UniqueID{}, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + nil, + getInsertMsg(200, 50000), + true}, + {"test sealed segments msgTs > segmentTs false", + []UniqueID{}, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + nil, + getInsertMsg(222, 70000), + false}, + {"test growing segments msgTs <= segmentTs true", + []UniqueID{}, + nil, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + getInsertMsg(200, 10000), + true}, + {"test growing segments msgTs > segmentTs false", + []UniqueID{}, + nil, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + getInsertMsg(200, 70000), + false}, + {"test not exist", + []UniqueID{}, + map[UniqueID]*datapb.SegmentInfo{ + 400: getSegmentInfo(500, 50000), + 500: getSegmentInfo(400, 50000), + }, + map[UniqueID]*datapb.SegmentInfo{ + 200: getSegmentInfo(200, 50000), + 300: getSegmentInfo(300, 50000), + }, + getInsertMsg(111, 70000), + false}, } for _, test := range tests { - te.Run(test.description, func(t *testing.T) { - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) + t.Run(test.description, func(t *testing.T) { // Prepare ddNode states ddn := ddNode{ - flushedSegmentIDs: test.ddnFlushedSegments, - deltaMsgStream: deltaStream, - } - - for k, v := range test.ddnSegID2Ts { - ddn.segID2SegInfo.Store(k, &datapb.SegmentInfo{DmlPosition: &internalpb.MsgPosition{Timestamp: v}}) - } - - // Prepare insert messages - var iMsg = &msgstream.InsertMsg{ - BaseMsg: msgstream.BaseMsg{EndTimestamp: test.inMsgSegEntTs}, - InsertRequest: internalpb.InsertRequest{ - Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert}, - SegmentID: test.inMsgSegID, - }, + droppedSegmentIDs: test.droppedSegIDs, + sealedSegInfo: test.sealedSegInfo, + growingSegInfo: test.growingSegInfo, } // Test - rt := ddn.filterFlushedSegmentInsertMessages(iMsg) - assert.Equal(t, test.expectedOut, rt) - - si, ok := ddn.segID2SegInfo.Load(iMsg.GetSegmentID()) - if !rt { - assert.False(t, ok) - assert.Nil(t, si) - } + got := ddn.tryToFilterSegmentInsertMessages(test.inMsg) + assert.Equal(t, test.expected, got) }) } -} -func TestFlowGraph_DDNode_isFlushed(te *testing.T) { - tests := []struct { - influshedSegment []UniqueID - inSeg UniqueID + t.Run("Test delete segment from sealed segments", func(t *testing.T) { + tests := []struct { + description string + segRemained bool - expectedOut bool + segTs Timestamp + msgTs Timestamp - description string - }{ - {[]UniqueID{1, 2, 3}, 1, true, - "Input seg 1 in flushedSegs{1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, 2, true, - "Input seg 2 in flushedSegs{1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, 3, true, - "Input seg 3 in flushedSegs{1, 2, 3}"}, - {[]UniqueID{1, 2, 3}, 4, false, - "Input seg 4 not in flushedSegs{1, 2, 3}"}, - {[]UniqueID{}, 5, false, - "Input seg 5, no flushedSegs {}"}, - } + sealedSegInfo map[UniqueID]*datapb.SegmentInfo + inMsg *msgstream.InsertMsg + msgFiltered bool + }{ + {"msgTssegTs", + false, + 50000, + 10000, + map[UniqueID]*datapb.SegmentInfo{ + 100: getSegmentInfo(100, 70000), + 101: getSegmentInfo(101, 50000)}, + getInsertMsg(300, 60000), + false, + }, + } - for _, test := range tests { - te.Run(test.description, func(t *testing.T) { - fs := []*datapb.SegmentInfo{} - for _, id := range test.influshedSegment { - s := &datapb.SegmentInfo{ID: id} - fs = append(fs, s) - } - factory := dependency.NewDefaultFactory(true) - deltaStream, err := factory.NewMsgStream(context.Background()) - assert.Nil(t, err) - ddn := &ddNode{flushedSegmentIDs: test.influshedSegment, deltaMsgStream: deltaStream} - assert.Equal(t, test.expectedOut, ddn.isFlushed(test.inSeg)) - }) - } -} + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ddn := &ddNode{sealedSegInfo: test.sealedSegInfo} + + got := ddn.tryToFilterSegmentInsertMessages(test.inMsg) + assert.Equal(t, test.msgFiltered, got) + + if test.segRemained { + assert.Equal(t, 2, len(ddn.sealedSegInfo)) + } else { + assert.Equal(t, 1, len(ddn.sealedSegInfo)) + } -func TestFlowGraph_DDNode_isDropped(te *testing.T) { - genSegmentInfoByID := func(segmentID UniqueID) *datapb.SegmentInfo { - return &datapb.SegmentInfo{ - ID: segmentID, + _, ok := ddn.sealedSegInfo[test.inMsg.GetSegmentID()] + assert.Equal(t, test.segRemained, ok) + }) } - } + }) + + t.Run("Test delete segment from growing segments", func(t *testing.T) { + tests := []struct { + description string + segRemained bool + + growingSegInfo map[UniqueID]*datapb.SegmentInfo + inMsg *msgstream.InsertMsg + msgFiltered bool + }{ + {"msgTssegTs", + false, + map[UniqueID]*datapb.SegmentInfo{ + 100: getSegmentInfo(100, 50000), + 101: getSegmentInfo(101, 50000)}, + getInsertMsg(100, 60000), + false, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + ddn := &ddNode{ + growingSegInfo: test.growingSegInfo, + } + + got := ddn.tryToFilterSegmentInsertMessages(test.inMsg) + assert.Equal(t, test.msgFiltered, got) + + if test.segRemained { + assert.Equal(t, 2, len(ddn.growingSegInfo)) + } else { + assert.Equal(t, 1, len(ddn.growingSegInfo)) + } + + _, ok := ddn.growingSegInfo[test.inMsg.GetSegmentID()] + assert.Equal(t, test.segRemained, ok) + }) + } + }) +} +func TestFlowGraph_DDNode_isDropped(t *testing.T) { tests := []struct { indroppedSegment []*datapb.SegmentInfo inSeg UniqueID @@ -438,20 +496,20 @@ func TestFlowGraph_DDNode_isDropped(te *testing.T) { description string }{ - {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 1, true, - "Input seg 1 in droppedSegs{1, 2, 3}"}, - {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 2, true, - "Input seg 2 in droppedSegs{1, 2, 3}"}, - {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 3, true, - "Input seg 3 in droppedSegs{1, 2, 3}"}, - {[]*datapb.SegmentInfo{genSegmentInfoByID(1), genSegmentInfoByID(2), genSegmentInfoByID(3)}, 4, false, - "Input seg 4 not in droppedSegs{1, 2, 3}"}, + {[]*datapb.SegmentInfo{getSegmentInfo(1, 0), getSegmentInfo(2, 0), getSegmentInfo(3, 0)}, 1, true, + "Input seg 1 in droppedSegs{1,2,3}"}, + {[]*datapb.SegmentInfo{getSegmentInfo(1, 0), getSegmentInfo(2, 0), getSegmentInfo(3, 0)}, 2, true, + "Input seg 2 in droppedSegs{1,2,3}"}, + {[]*datapb.SegmentInfo{getSegmentInfo(1, 0), getSegmentInfo(2, 0), getSegmentInfo(3, 0)}, 3, true, + "Input seg 3 in droppedSegs{1,2,3}"}, + {[]*datapb.SegmentInfo{getSegmentInfo(1, 0), getSegmentInfo(2, 0), getSegmentInfo(3, 0)}, 4, false, + "Input seg 4 not in droppedSegs{1,2,3}"}, {[]*datapb.SegmentInfo{}, 5, false, "Input seg 5, no droppedSegs {}"}, } for _, test := range tests { - te.Run(test.description, func(t *testing.T) { + t.Run(test.description, func(t *testing.T) { dsIDs := []int64{} for _, seg := range test.indroppedSegment { dsIDs = append(dsIDs, seg.GetID()) @@ -464,3 +522,25 @@ func TestFlowGraph_DDNode_isDropped(te *testing.T) { }) } } + +func getSegmentInfo(segmentID UniqueID, ts Timestamp) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: segmentID, + DmlPosition: &internalpb.MsgPosition{Timestamp: ts}, + } +} + +func getInsertMsg(segmentID UniqueID, ts Timestamp) *msgstream.InsertMsg { + return &msgstream.InsertMsg{ + BaseMsg: msgstream.BaseMsg{EndTimestamp: ts}, + InsertRequest: internalpb.InsertRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_Insert}, + SegmentID: segmentID, + CollectionID: 1, + }, + } +} + +type mockFactory struct { + msgstream.Factory +}