Skip to content

Commit

Permalink
Add sorted field for segment info returned to SDK
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Sep 26, 2024
1 parent cfd636e commit df5a77b
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 75 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497
github.com/minio/minio-go/v7 v7.0.61
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd h1:x0b0+foTe23sKcVFseR1DE8+BB08EH6ViiRHaz8PEik=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 h1:t4sQMbSy05p8qgMGvEGyLYYLoZ9fD1dushS1bj5X6+0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
66 changes: 39 additions & 27 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func newChannelCps() *channelCPs {

// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map[string]map[string]int // segment state, seg level -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
stateChange map[string]map[string]map[string]int // segment state, seg level -> state -> isSorted change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}

type collectionInfo struct {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (m *meta) reloadFromKV() error {
for _, segment := range segments {
// segments from catalog.ListSegments will not have logPath
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc()
if segment.State == commonpb.SegmentState_Flushed {
numStoredRows += segment.NumOfRows

Expand Down Expand Up @@ -511,7 +511,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
}
m.segments.SetSegment(segment.GetID(), segment)

metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Inc()
log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID()))
return nil
}
Expand All @@ -533,7 +533,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String(), getSortStatus(segment.GetIsSorted())).Dec()
coll, ok := m.collections[segment.CollectionID]
if ok {
metrics.CleanupDataCoordSegmentMetrics(coll.DatabaseName, segment.CollectionID, segment.ID)
Expand Down Expand Up @@ -637,7 +637,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
// Persist segment updates first.
clonedSegment := curSegInfo.Clone()
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
// Update segment state and prepare segment metric update.
Expand Down Expand Up @@ -748,7 +748,7 @@ func CreateL0Operator(collectionID, partitionID, segmentID int64, channel string
Level: datapb.SegmentLevel_L0,
},
}
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, 0)
modPack.metricMutation.addNewSeg(commonpb.SegmentState_Flushed, datapb.SegmentLevel_L0, false, 0)
}
return true
}
Expand Down Expand Up @@ -1016,7 +1016,7 @@ func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
segments: make(map[int64]*SegmentInfo),
increments: make(map[int64]metastore.BinlogsIncrement),
metricMutation: &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
},
}

Expand Down Expand Up @@ -1055,7 +1055,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI

// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}
modSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) {
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
stateChange: make(map[string]map[string]map[string]int),
}

segment := m.segments.GetSegment(seg2Drop.ID)
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
compactFromSegIDs := make([]int64, 0)
compactToSegIDs := make([]int64, 0)
compactFromSegInfos := make([]*SegmentInfo, 0)
Expand Down Expand Up @@ -1460,7 +1460,7 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
segment := NewSegmentInfo(segmentInfo)
compactToSegInfos = append(compactToSegInfos, segment)
compactToSegIDs = append(compactToSegIDs, segment.GetID())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
}

log = log.With(zap.Int64s("compact from", compactFromSegIDs), zap.Int64s("compact to", compactToSegIDs))
Expand Down Expand Up @@ -1504,7 +1504,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
zap.Int64("partitionID", t.PartitionID),
zap.String("channel", t.GetChannel()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}
var compactFromSegIDs []int64
var compactFromSegInfos []*SegmentInfo
for _, segmentID := range t.GetInputSegments() {
Expand Down Expand Up @@ -1558,7 +1558,7 @@ func (m *meta) completeMixCompactionMutation(t *datapb.CompactionTask, result *d
// L1 segment with NumRows=0 will be discarded, so no need to change the metric
if compactToSegmentInfo.GetNumOfRows() > 0 {
// metrics mutation for compactTo segments
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows())
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetIsSorted(), compactToSegmentInfo.GetNumOfRows())
} else {
compactToSegmentInfo.State = commonpb.SegmentState_Dropped
}
Expand Down Expand Up @@ -1812,11 +1812,15 @@ func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupL
}

// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, rowCount int64) {
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCount int64) {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
s.stateChange[level.String()] = make(map[string]map[string]int)
}
s.stateChange[level.String()][state.String()] += 1
if _, ok := s.stateChange[level.String()][state.String()]; !ok {
s.stateChange[level.String()][state.String()] = make(map[string]int)

}
s.stateChange[level.String()][state.String()][getSortStatus(isSorted)] += 1

s.rowCountChange += rowCount
s.rowCountAccChange += rowCount
Expand All @@ -1826,20 +1830,28 @@ func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.
// has persisted in Etcd.
func (s *segMetricMutation) commit() {
for level, submap := range s.stateChange {
for state, change := range submap {
metrics.DataCoordNumSegments.WithLabelValues(state, level).Add(float64(change))
for state, sortedMap := range submap {
for sortedLabel, change := range sortedMap {
metrics.DataCoordNumSegments.WithLabelValues(state, level, sortedLabel).Add(float64(change))
}
}
}
}

// append updates current segMetricMutation when segment state change happens.
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, rowCountUpdate int64) {
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, isSorted bool, rowCountUpdate int64) {
if oldState != newState {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
s.stateChange[level.String()] = make(map[string]map[string]int)
}
if _, ok := s.stateChange[level.String()][oldState.String()]; !ok {
s.stateChange[level.String()][oldState.String()] = make(map[string]int)
}
if _, ok := s.stateChange[level.String()][newState.String()]; !ok {
s.stateChange[level.String()][newState.String()] = make(map[string]int)
}
s.stateChange[level.String()][oldState.String()] -= 1
s.stateChange[level.String()][newState.String()] += 1
s.stateChange[level.String()][oldState.String()][getSortStatus(isSorted)] -= 1
s.stateChange[level.String()][newState.String()][getSortStatus(isSorted)] += 1
}
// Update # of rows on new flush operations and drop operations.
if isFlushState(newState) && !isFlushState(oldState) {
Expand All @@ -1863,7 +1875,7 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
zap.String("old state", segToUpdate.GetState().String()),
zap.String("new state", targetState.String()),
zap.Int64("# of rows", segToUpdate.GetNumOfRows()))
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetIsSorted(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
if targetState == commonpb.SegmentState_Dropped {
segToUpdate.DroppedAt = uint64(time.Now().UnixNano())
Expand Down Expand Up @@ -1957,7 +1969,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
zap.Int64("old segmentID", oldSegmentID),
zap.Int64("target segmentID", result.GetSegmentID()))

metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]map[string]int)}

oldSegment := m.segments.GetSegment(oldSegmentID)
if oldSegment == nil {
Expand Down Expand Up @@ -1994,7 +2006,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
}
segment := NewSegmentInfo(segmentInfo)
if segment.GetNumOfRows() > 0 {
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetIsSorted(), segment.GetNumOfRows())
} else {
segment.State = commonpb.SegmentState_Dropped
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
_, err := newMeta(ctx, suite.catalog, nil)
suite.NoError(err)

suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String(), "unsorted"), 1)
})
}

Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
InsertChannel: segment.InsertChannel,
NumOfRows: rowCount,
Level: segment.GetLevel(),
IsSorted: segment.GetIsSorted(),
})
}

Expand Down
7 changes: 7 additions & 0 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,10 @@ func createStorageConfig() *indexpb.StorageConfig {

return storageConfig
}

func getSortStatus(sorted bool) string {
if sorted {
return "sorted"
}
return "unsorted"
}
4 changes: 3 additions & 1 deletion internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ message SegmentInfo {
bool enable_index = 16;
bool is_fake = 17;
data.SegmentLevel level = 18;
bool is_sorted = 19;
}

message CollectionInfo {
Expand Down Expand Up @@ -634,7 +635,8 @@ message SegmentVersionInfo {
int64 version = 5;
uint64 last_delta_timestamp = 6;
map<int64, FieldIndexInfo> index_info = 7;
data.SegmentLevel level = 8;
data.SegmentLevel level = 8;
bool is_sorted = 9;
}

message ChannelVersionInfo {
Expand Down
2 changes: 2 additions & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4111,6 +4111,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
NumRows: info.NumOfRows,
State: info.State,
Level: commonpb.SegmentLevel(info.Level),
IsSorted: info.GetIsSorted(),
}
}
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
Expand Down Expand Up @@ -4184,6 +4185,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
State: info.SegmentState,
NodeIds: info.NodeIds,
Level: commonpb.SegmentLevel(info.Level),
IsSorted: info.GetIsSorted(),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistribut
PartitionID: s.GetPartition(),
InsertChannel: s.GetChannel(),
Level: s.GetLevel(),
IsSorted: s.GetIsSorted(),
}
}
updates = append(updates, &meta.Segment{
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func MergeMetaSegmentIntoSegmentInfo(info *querypb.SegmentInfo, segments ...*met
SegmentState: commonpb.SegmentState_Sealed,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
Level: first.Level,
IsSorted: first.GetIsSorted(),
}
for _, indexInfo := range first.IndexInfo {
info.IndexName = indexInfo.IndexName
Expand Down
Loading

0 comments on commit df5a77b

Please sign in to comment.