Skip to content

Commit

Permalink
Fix DN flowgraph delete buffer logs (milvus-io#18657)
Browse files Browse the repository at this point in the history
1. Remove logs about not existing segments.
2. Group logs by timestamp.
3. Log changed segments only.
4. Pair the segments reference lock and unlock log by taskID.

Resolves: milvus-io#18655

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Aug 15, 2022
1 parent d983a06 commit 4c86cc6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 15 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/segment_reference_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (srm *SegmentReferenceManager) AddSegmentsLock(taskID int64, segIDs []Uniqu
for _, segID := range segIDs {
srm.segmentReferCnt[segID]++
}
log.Info("add reference lock on segments successfully", zap.Int64s("segIDs", segIDs), zap.Int64("nodeID", nodeID))
log.Info("add reference lock on segments successfully", zap.Int64("taskID", taskID), zap.Int64s("segIDs", segIDs), zap.Int64("nodeID", nodeID))
return nil
}

Expand Down
28 changes: 14 additions & 14 deletions internal/datanode/flow_graph_delete_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (dn *deleteNode) Close() {
log.Info("Flowgraph Delete Node closing")
}

func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error {
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([]UniqueID, error) {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))

// Update delBuf for merged segments
Expand All @@ -129,11 +129,14 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
primaryKeys := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys)
segIDToPks, segIDToTss := dn.filterSegmentByPK(msg.PartitionID, primaryKeys, msg.Timestamps)

segIDs := make([]UniqueID, 0, len(segIDToPks))
for segID, pks := range segIDToPks {
segIDs = append(segIDs, segID)

rows := len(pks)
tss, ok := segIDToTss[segID]
if !ok || rows != len(tss) {
return fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID)
return nil, fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID)
}

var delDataBuf *DelDataBuf
Expand Down Expand Up @@ -164,23 +167,18 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
dn.delBuf.Store(segID, delDataBuf)
}

return nil
return segIDs, nil
}

func (dn *deleteNode) showDelBuf() {
segments := dn.replica.filterSegments(dn.channelName, common.InvalidPartitionID)
for _, seg := range segments {
segID := seg.segmentID
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs {
if v, ok := dn.delBuf.Load(segID); ok {
delDataBuf, _ := v.(*DelDataBuf)
log.Debug("delta buffer status",
zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID),
zap.Int64("entries", delDataBuf.GetEntriesNum()),
zap.String("vChannel", dn.channelName))
} else {
log.Debug("segment not exist",
zap.Int64("segment ID", segID),
zap.String("vChannel", dn.channelName))
}
}
}
Expand Down Expand Up @@ -211,22 +209,24 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
msg.SetTraceCtx(ctx)
}

var segIDs []UniqueID
for i, msg := range fgMsg.deleteMessages {
traceID, _, _ := trace.InfoFromSpan(spans[i])
log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID))

err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange)
if err != nil {
// error occurs only when deleteMsg is misaligned, should not happen
err = fmt.Errorf("buffer delete msg failed, err = %s", err)
log.Error(err.Error())
panic(err)
}
segIDs = append(segIDs, tmpSegIDs...)
}

// show all data in dn.delBuf
// show changed segment's status in dn.delBuf of a certain ts
if len(fgMsg.deleteMessages) != 0 {
dn.showDelBuf()
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
}

// handle flush
Expand Down
42 changes: 42 additions & 0 deletions internal/datanode/flow_graph_delete_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var deleteNodeTestDir = "/tmp/milvus_test/deleteNode"
Expand Down Expand Up @@ -426,3 +427,44 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
})
})
}

func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix("")

fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, &mockReplica{}, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.EtcdCfg.MetaRootPath = testPath
Params.DataNodeCfg.DeleteBinlogRootPath = testPath

c := &nodeConfig{
replica: &mockReplica{},
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
require.NoError(t, err)

tests := []struct {
seg UniqueID
numRows int64
}{
{111, 10},
{112, 10},
{113, 1},
}

for _, test := range tests {
delBuf := newDelDataBuf()
delBuf.updateSize(test.numRows)
delNode.delBuf.Store(test.seg, delBuf)
}

delNode.showDelBuf([]UniqueID{111, 112, 113}, 100)
}

0 comments on commit 4c86cc6

Please sign in to comment.