Skip to content

Commit

Permalink
Filter sealed segments (milvus-io#18065)
Browse files Browse the repository at this point in the history
- Remove FilterThreshold in DataNode
- Alter filter logic in flowgraph DDNode

See also: milvus-io#17943

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jul 7, 2022
1 parent 58a9a0b commit d9e8231
Show file tree
Hide file tree
Showing 4 changed files with 416 additions and 325 deletions.
5 changes: 0 additions & 5 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,6 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
}
}

// FilterThreshold is the start time ouf DataNode
var FilterThreshold Timestamp

// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
if err := node.idAllocator.Start(); err != nil {
Expand Down Expand Up @@ -486,8 +483,6 @@ func (node *DataNode) Start() error {
return errors.New("DataNode fail to start")
}

FilterThreshold = rep.GetTimestamp()

go node.BackGroundGC(node.clearSignal)

go node.compactionExecutor.start(node.ctx)
Expand Down
85 changes: 49 additions & 36 deletions internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,6 @@ func (dsService *dataSyncService) clearGlobalFlushingCache() {
dsService.flushingSegCache.Remove(segments...)
}

// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
func (dsService *dataSyncService) getSegmentInfos(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
},
SegmentIDs: segmentIds,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
segmentInfos = infoResp.Infos
return segmentInfos, nil
}

// initNodes inits a TimetickedFlowGraph
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
Expand All @@ -208,6 +182,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}

futures := make([]*concurrency.Future, 0, len(unflushedSegmentInfos)+len(flushedSegmentInfos))

for _, us := range unflushedSegmentInfos {
if us.CollectionID != dsService.collectionID ||
us.GetInsertChannel() != vchanInfo.ChannelName {
Expand All @@ -220,10 +195,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
continue
}

log.Info("Recover Segment NumOfRows form checkpoints",
zap.String("InsertChannel", us.GetInsertChannel()),
zap.Int64("SegmentID", us.GetID()),
zap.Int64("NumOfRows", us.GetNumOfRows()),
log.Info("recover growing segments form checkpoints",
zap.String("vChannelName", us.GetInsertChannel()),
zap.Int64("segmentID", us.GetID()),
zap.Int64("numRows", us.GetNumOfRows()),
)
var cp *segmentCheckPoint
if us.GetDmlPosition() != nil {
Expand Down Expand Up @@ -255,11 +230,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
)
continue
}

log.Info("Recover Segment NumOfRows form checkpoints",
zap.String("InsertChannel", fs.GetInsertChannel()),
zap.Int64("SegmentID", fs.GetID()),
zap.Int64("NumOfRows", fs.GetNumOfRows()),
log.Info("recover sealed segments form checkpoints",
zap.String("vChannelName", fs.GetInsertChannel()),
zap.Int64("segmentID", fs.GetID()),
zap.Int64("numRows", fs.GetNumOfRows()),
)
// avoid closure capture iteration variable
segment := fs
Expand Down Expand Up @@ -294,7 +268,20 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}

var ddNode Node = newDDNode(dsService.ctx, dsService.collectionID, vchanInfo, unflushedSegmentInfos, dsService.msFactory, dsService.compactor)
var ddNode Node
ddNode, err = newDDNode(
dsService.ctx,
dsService.collectionID,
vchanInfo.GetChannelName(),
vchanInfo.GetDroppedSegmentIds(),
flushedSegmentInfos,
unflushedSegmentInfos,
dsService.msFactory,
dsService.compactor)
if err != nil {
return err
}

var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
Expand Down Expand Up @@ -361,3 +348,29 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
return nil
}

// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
},
SegmentIDs: segmentIDs,
IncludeUnHealthy: true,
})
if err != nil {
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
if infoResp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(infoResp.GetStatus().Reason)
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
segmentInfos = infoResp.Infos
return segmentInfos, nil
}
Loading

0 comments on commit d9e8231

Please sign in to comment.