Skip to content

Commit

Permalink
Improve DataCoord and DataNode logs (#20265)
Browse files Browse the repository at this point in the history
/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
  • Loading branch information
soothing-rain authored Nov 3, 2022
1 parent d908198 commit 5708352
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 25 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
}

func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
oldSegments, modSegments, newSegment := c.meta.GetCompleteCompactionMeta(plan.GetSegmentBinlogs(), result)
oldSegments, modSegments, newSegment := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
log := log.With(zap.Int64("planID", plan.GetPlanID()))

modInfos := make([]*datapb.SegmentInfo, len(modSegments))
Expand Down
25 changes: 22 additions & 3 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -65,6 +66,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
droppedIDs = make(typeutil.UniqueSet)
seekPosition *internalpb.MsgPosition
)
var minPosSegID int64
var minPosTs uint64
for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
Expand All @@ -91,20 +94,36 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
segmentPosition = s.GetStartPosition()
}
if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
minPosSegID = s.GetID()
minPosTs = segmentPosition.GetTimestamp()
seekPosition = segmentPosition
}
}

// use collection start position when segment position is not found
if seekPosition == nil {
if seekPosition != nil {
log.Info("channel seek position set as the minimal segment position",
zap.Int64("segment ID", minPosSegID),
zap.Uint64("position timestamp", minPosTs),
zap.String("realworld position timestamp", tsoutil.ParseAndFormatHybridTs(minPosTs)),
)
} else {
// use collection start position when segment position is not found
if channel.StartPositions == nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil && err == nil {
seekPosition = getCollectionStartPosition(channel.Name, collection)
}
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
zap.String("realworld position timestamp", tsoutil.ParseAndFormatHybridTs(seekPosition.GetTimestamp())),
)
} else {
// use passed start positions, skip to ask rootcoord.
// use passed start positions, skip to ask RootCoord.
seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
log.Info("segment position not found, setting channel seek position to channel start position",
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
zap.String("realworld position timestamp", tsoutil.ParseAndFormatHybridTs(seekPosition.GetTimestamp())),
)
}
}

Expand Down
43 changes: 28 additions & 15 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,16 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
// since the channel unwatching operation is not atomic here
// ** the removal flag is always with last batch
// ** the last batch must contains at least one segment
// 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel
// since the flag is not marked so DataNode can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
// 1. when failure occurs between batches, failover mechanism will continue with the earlist checkpoint of this channel
// since the flag is not marked so DataNode can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error {
var modSegIDs []int64
for k := range modSegments {
modSegIDs = append(modSegIDs, k)
}
log.Info("meta update: batch save drop segments",
zap.Int64s("drop segments", modSegIDs))
segments := make([]*datapb.SegmentInfo, 0)
for _, seg := range modSegments {
segments = append(segments, seg.SegmentInfo)
Expand Down Expand Up @@ -863,13 +869,13 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.segments.SetIsCompacting(segmentID, compacting)
}

// GetCompleteCompactionMeta returns
// PrepareCompleteCompactionMutation returns
// - the segment info of compactedFrom segments before compaction to revert
// - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows
func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) {
log.Info("meta update: get complete compaction meta")
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) {
log.Info("meta update: prepare for complete compaction mutation")
m.Lock()
defer m.Unlock()

Expand Down Expand Up @@ -939,26 +945,33 @@ func (m *meta) GetCompleteCompactionMeta(compactionLogs []*datapb.CompactionSegm
}
segment := NewSegmentInfo(segmentInfo)

log.Info("meta update: get complete compaction meta - complete",
zap.Int64("segmentID", segmentInfo.ID),
zap.Int64("collectionID", segmentInfo.CollectionID),
zap.Int64("partitionID", segmentInfo.PartitionID),
zap.Int64("NumOfRows", segmentInfo.NumOfRows),
zap.Any("compactionFrom", segmentInfo.CompactionFrom))
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collection ID", segment.GetCollectionID()),
zap.Int64("partition ID", segment.GetPartitionID()),
zap.Int64("new segment ID", segment.GetID()),
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))

return oldSegments, modSegments, segment
}

func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
var modSegIDs []int64
for _, seg := range modSegments {
modSegIDs = append(modSegIDs, seg.GetID())
}
log.Info("meta update: alter meta store for compaction updates",
zap.Int64s("compact from segments (segments to be updated as dropped)", modSegIDs),
zap.Int64("compact to segment", newSegment.GetID()))
return m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modSegments, newSegment)
}

func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error {
log.Info("revert metastore after compaction failure",
log.Info("meta update: revert metastore after compaction failure",
zap.Int64("collectionID", removalSegment.CollectionID),
zap.Int64("partitionID", removalSegment.PartitionID),
zap.Int64("compactedTo", removalSegment.ID),
zap.Int64s("compactedFrom", removalSegment.GetCompactionFrom()),
zap.Int64("compactedTo (segment to remove)", removalSegment.ID),
zap.Int64s("compactedFrom (segments to add back)", removalSegment.GetCompactionFrom()),
)
return m.catalog.RevertAlterSegmentsAndAddNewSegment(m.ctx, oldSegments, removalSegment)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func TestMeta_alterInMemoryMetaAfterCompaction(t *testing.T) {

}

func TestMeta_GetCompleteCompactionMeta(t *testing.T) {
func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
prepareSegments := &SegmentsInfo{
map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
Expand Down Expand Up @@ -781,7 +781,7 @@ func TestMeta_GetCompleteCompactionMeta(t *testing.T) {
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")},
NumOfRows: 1,
}
beforeCompact, afterCompact, newSegment := m.GetCompleteCompactionMeta(inCompactionLogs, inCompactionResult)
beforeCompact, afterCompact, newSegment := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult)
assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
Expand Down
9 changes: 5 additions & 4 deletions internal/datanode/channel_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"time"

"github.com/milvus-io/milvus-proto/go-api/schemapb"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
Expand All @@ -33,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)

type (
Expand Down Expand Up @@ -328,7 +327,8 @@ func (c *ChannelMeta) RollPKstats(segID UniqueID, stats []*storage.PrimaryKeySta
}

// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
//
// transfer segments states from *New* to *Normal*.
func (c *ChannelMeta) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
c.segMu.Lock()
defer c.segMu.Unlock()
Expand Down Expand Up @@ -451,7 +451,8 @@ func (c *ChannelMeta) getCollectionID() UniqueID {
}

// getCollectionSchema gets collection schema from rootcoord for a certain timestamp.
// If you want the latest collection schema, ts should be 0.
//
// If you want the latest collection schema, ts should be 0.
func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
if !c.validCollection(collID) {
return nil, fmt.Errorf("mismatch collection, want %d, actual %d", c.collectionID, collID)
Expand Down
6 changes: 6 additions & 0 deletions internal/util/tsoutil/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func ParseHybridTs(ts uint64) (int64, int64) {
return int64(physical), int64(logical)
}

// ParseAndFormatHybridTs parses the ts and returns its human-readable format.
func ParseAndFormatHybridTs(ts uint64) string {
physicalTs, _ := ParseHybridTs(ts)
return time.Unix(physicalTs, 0).Format(time.RFC3339) // Convert to RFC3339 format
}

// CalculateDuration returns the number of milliseconds obtained by subtracting ts2 from ts1.
func CalculateDuration(ts1, ts2 typeutil.Timestamp) int64 {
p1, _ := ParseHybridTs(ts1)
Expand Down

0 comments on commit 5708352

Please sign in to comment.