Skip to content

Commit

Permalink
Fix to fail to get the collection name sometimes
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Mar 25, 2023
1 parent 1fda2e0 commit 457e216
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions core/reader/milvus_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,13 @@ func (reader *MilvusCollectionReader) watchPartition(watchCtx context.Context) {
info.PartitionName != reader.etcdConfig.DefaultPartitionName {
collectionName, ok := reader.collectionID2Name.Load(id)
if !ok {
// TODO get etcd sometimes can't get the collection
log.Warn("not found the collection", zap.Int64("collection_id", id),
zap.Int64("partition_id", info.PartitionID),
zap.String("partition_name", info.PartitionName))
return
collectionName = reader.getCollectionNameByID(id)
if collectionName == "" {
log.Warn("not found the collection", zap.Int64("collection_id", id),
zap.Int64("partition_id", info.PartitionID),
zap.String("partition_name", info.PartitionName))
return
}
}
data := &model.CDCData{
Msg: &msgstream.CreatePartitionMsg{
Expand All @@ -239,6 +241,34 @@ func (reader *MilvusCollectionReader) watchPartition(watchCtx context.Context) {
}
}

func (reader *MilvusCollectionReader) getCollectionNameByID(collectionID int64) string {
resp, err := util.EtcdGet(reader.etcdCli, path.Join(reader.collectionPrefix(), strconv.FormatInt(collectionID, 10)))
if err != nil {
log.Warn("fail to get all collection data", zap.Int64("collection_id", collectionID), zap.Error(err))
return ""
}
if len(resp.Kvs) == 0 {
log.Warn("the collection isn't existed", zap.Int64("collection_id", collectionID))
return ""
}
info := &pb.CollectionInfo{}
err = proto.Unmarshal(resp.Kvs[0].Value, info)
if err != nil {
log.Warn("fail to unmarshal collection info, maybe it's a deleted collection",
zap.Int64("collection_id", collectionID),
zap.String("value", util.Base64Encode(resp.Kvs[0].Value)),
zap.Error(err))
return ""
}
collectionName := reader.collectionName(info)
if reader.shouldReadFunc(info) {
reader.collectionID2Name.Store(collectionID, collectionName)
return collectionName
}
log.Warn("the collection can't be read", zap.Int64("id", collectionID), zap.String("name", collectionName))
return ""
}

func (reader *MilvusCollectionReader) getAllCollections() {
var (
existedCollectionInfos []*pb.CollectionInfo
Expand Down

0 comments on commit 457e216

Please sign in to comment.