Skip to content

Commit

Permalink
improve to check the duplicate collection and replicate user and role…
Browse files Browse the repository at this point in the history
… api

Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Oct 18, 2024
1 parent 684fc67 commit 4b0d962
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 51 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ jobs:
"enable_tls": false,
"connect_timeout": 120
},
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
Expand Down Expand Up @@ -253,6 +256,9 @@ jobs:
"token": "root:Milvus",
"connect_timeout": 120
},
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
Expand Down Expand Up @@ -390,6 +396,9 @@ jobs:
"connect_timeout": 120,
"channel_num": 16
},
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
Expand Down Expand Up @@ -527,6 +536,9 @@ jobs:
"connect_timeout": 120,
"channel_num": 8
},
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
Expand Down Expand Up @@ -662,6 +674,9 @@ jobs:
"token": "root:Milvus",
"connect_timeout": 120
},
"extra_info": {
"enable_user_role": true
},
"collection_infos": [
{
"name": "*"
Expand Down
4 changes: 3 additions & 1 deletion core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
)

const (
AllCollection = "*"
AllCollection = "*"
AllDatabase = "*"
DefaultDatabase = "default"
)

type CollectionInfo struct {
Expand Down
15 changes: 15 additions & 0 deletions core/util/msgpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bytes"
"sync"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/requestutil"
)
Expand Down Expand Up @@ -75,3 +76,17 @@ func GetCollectionIDFromMsgPack(msgPack *msgstream.MsgPack) int64 {
collectionID, _ := GetCollectionIDFromRequest(firstMsg)
return collectionID
}

func IsUserRoleMessage(msgPack *msgstream.MsgPack) bool {
if len(msgPack.Msgs) == 0 {
return false
}
msgType := msgPack.Msgs[0].Type()
return msgType == commonpb.MsgType_CreateCredential ||
msgType == commonpb.MsgType_DeleteCredential ||
msgType == commonpb.MsgType_UpdateCredential ||
msgType == commonpb.MsgType_CreateRole ||
msgType == commonpb.MsgType_DropRole ||
msgType == commonpb.MsgType_OperateUserRole ||
msgType == commonpb.MsgType_OperatePrivilege
}
171 changes: 121 additions & 50 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type MetaCDC struct {
sync.RWMutex
data map[string][]string
excludeData map[string][]string
extraInfos map[string]model.ExtraInfo
}
cdcTasks struct {
sync.RWMutex
Expand Down Expand Up @@ -142,6 +143,7 @@ func NewMetaCDC(serverConfig *CDCServerConfig) *MetaCDC {

cdc.collectionNames.data = make(map[string][]string)
cdc.collectionNames.excludeData = make(map[string][]string)
cdc.collectionNames.extraInfos = make(map[string]model.ExtraInfo)
cdc.cdcTasks.data = make(map[string]*meta.TaskInfo)
cdc.replicateEntityMap.data = make(map[string]*ReplicateEntity)
return cdc
Expand Down Expand Up @@ -176,6 +178,7 @@ func (e *MetaCDC) ReloadTask() {
})
e.collectionNames.data[uKey] = append(e.collectionNames.data[uKey], newCollectionNames...)
e.collectionNames.excludeData[uKey] = append(e.collectionNames.excludeData[uKey], taskInfo.ExcludeCollections...)
e.collectionNames.excludeData[uKey] = lo.Uniq(e.collectionNames.excludeData[uKey])
e.cdcTasks.Lock()
e.cdcTasks.data[taskInfo.TaskID] = taskInfo
e.cdcTasks.Unlock()
Expand Down Expand Up @@ -231,6 +234,99 @@ func getTaskUniqueIDFromReq(req *request.CreateRequest) string {
panic("fail to get the task unique id")
}

func getDatabaseName(i any) string {
switch r := i.(type) {
case *meta.TaskInfo:
if r.DatabaseInfo.Name != "" {
return r.DatabaseInfo.Name
}
return cdcreader.DefaultDatabase
case *request.CreateRequest:
if r.DatabaseInfo.Name != "" {
return r.DatabaseInfo.Name
}
return cdcreader.DefaultDatabase
default:
panic("invalid type")
}
}

func getFullCollectionName(collectionName string, databaseName string) string {
return fmt.Sprintf("%s.%s", databaseName, collectionName)
}

func getCollectionNameFromFull(fullName string) (string, string) {
names := strings.Split(fullName, ".")
if len(names) != 2 {
panic("invalid full collection name")
}
return names[0], names[1]
}

func matchCollectionName(sampleCollection, targetCollection string) (bool, bool) {
db1, collection1 := getCollectionNameFromFull(sampleCollection)
db2, collection2 := getCollectionNameFromFull(targetCollection)
return (db1 == db2 || db1 == cdcreader.AllDatabase) &&
(collection1 == collection2 || collection1 == cdcreader.AllCollection),
db1 == cdcreader.AllDatabase || collection1 == cdcreader.AllCollection
}

func (e *MetaCDC) checkDuplicateCollection(uKey string, newCollectionNames []string, extraInfo model.ExtraInfo) ([]string, error) {
e.collectionNames.Lock()
defer e.collectionNames.Unlock()
existExtraInfo := e.collectionNames.extraInfos[uKey]
if existExtraInfo.EnableUserRole && extraInfo.EnableUserRole {
return nil, servererror.NewClientError(fmt.Sprintf("the enable user role param is duplicate"))

Check failure on line 279 in server/cdc_impl.go

View workflow job for this annotation

GitHub Actions / lint

S1039: unnecessary use of fmt.Sprintf (gosimple)
}
if names, ok := e.collectionNames.data[uKey]; ok {
var duplicateCollections []string
containsAny := false
for _, name := range names {
d, c := getCollectionNameFromFull(name)
if d == cdcreader.AllDatabase || c == cdcreader.AllCollection {
containsAny = true
}
}
for _, newCollectionName := range newCollectionNames {
if lo.Contains(names, newCollectionName) {
duplicateCollections = append(duplicateCollections, newCollectionName)
continue
}
nd, nc := getCollectionNameFromFull(newCollectionName)
if nd == cdcreader.AllDatabase && nc == cdcreader.AllCollection {
continue
}
if containsAny && !lo.Contains(e.collectionNames.excludeData[uKey], newCollectionName) {
duplicateCollections = append(duplicateCollections, newCollectionName)
continue
}
}
if len(duplicateCollections) > 0 {
log.Info("duplicate collections",
zap.Strings("request_collections", newCollectionNames),
zap.Strings("exist_collections", names),
zap.Strings("exclude_collections", e.collectionNames.excludeData[uKey]),
zap.Strings("duplicate_collections", duplicateCollections))
return nil, servererror.NewClientError(fmt.Sprintf("the collection name is duplicate with existing task, %v", duplicateCollections))
}
}
// release lock early to accept other requests
var excludeCollectionNames []string
for _, newCollectionName := range newCollectionNames {
for _, existCollectionName := range e.collectionNames.data[uKey] {
if match, _ := matchCollectionName(newCollectionName, existCollectionName); match {
excludeCollectionNames = append(excludeCollectionNames, existCollectionName)
}
}
}
e.collectionNames.excludeData[uKey] = append(e.collectionNames.excludeData[uKey], excludeCollectionNames...)
e.collectionNames.data[uKey] = append(e.collectionNames.data[uKey], newCollectionNames...)
e.collectionNames.extraInfos[uKey] = model.ExtraInfo{
EnableUserRole: existExtraInfo.EnableUserRole || extraInfo.EnableUserRole,
}
return excludeCollectionNames, nil
}

func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateResponse, err error) {
defer func() {
log.Info("create request done")
Expand All @@ -242,53 +338,19 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
return nil, err
}
uKey := getTaskUniqueIDFromReq(req)
databaseName := getDatabaseName(req)
newCollectionNames := lo.Map(req.CollectionInfos, func(t model.CollectionInfo, _ int) string {
return t.Name
return getFullCollectionName(t.Name, databaseName)
})
e.collectionNames.Lock()
if names, ok := e.collectionNames.data[uKey]; ok {
existAll := lo.Contains(names, cdcreader.AllCollection)
duplicateCollections := lo.Filter(req.CollectionInfos, func(info model.CollectionInfo, _ int) bool {
return (!existAll && lo.Contains(names, info.Name)) || (existAll && info.Name == cdcreader.AllCollection)
})
if len(duplicateCollections) > 0 {
e.collectionNames.Unlock()
return nil, servererror.NewClientError(fmt.Sprintf("some collections are duplicate with existing tasks, %v", lo.Map(duplicateCollections, func(t model.CollectionInfo, i int) string {
return t.Name
})))
}
if existAll {
excludeCollectionNames := lo.Filter(e.collectionNames.excludeData[uKey], func(s string, _ int) bool {
return !lo.Contains(names, s)
})
duplicateCollections = lo.Filter(req.CollectionInfos, func(info model.CollectionInfo, _ int) bool {
return !lo.Contains(excludeCollectionNames, info.Name)
})
if len(duplicateCollections) > 0 {
e.collectionNames.Unlock()
return nil, servererror.NewClientError(fmt.Sprintf("some collections are duplicate with existing tasks, check the `*` collection task and other tasks, %v", lo.Map(duplicateCollections, func(t model.CollectionInfo, i int) string {
return t.Name
})))
}
}
}
// release lock early to accept other requests
var excludeCollectionNames []string
if newCollectionNames[0] == cdcreader.AllCollection {
existCollectionNames := e.collectionNames.data[uKey]
excludeCollectionNames = make([]string, len(existCollectionNames))
copy(excludeCollectionNames, existCollectionNames)
e.collectionNames.excludeData[uKey] = excludeCollectionNames
excludeCollectionNames, err := e.checkDuplicateCollection(uKey, newCollectionNames, req.ExtraInfo)
if err != nil {
return nil, err
}
e.collectionNames.data[uKey] = append(e.collectionNames.data[uKey], newCollectionNames...)
e.collectionNames.Unlock()

revertCollectionNames := func() {
e.collectionNames.Lock()
defer e.collectionNames.Unlock()
if newCollectionNames[0] == cdcreader.AllCollection {
e.collectionNames.excludeData[uKey] = []string{}
}
e.collectionNames.excludeData[uKey] = lo.Without(e.collectionNames.excludeData[uKey], excludeCollectionNames...)
e.collectionNames.data[uKey] = lo.Without(e.collectionNames.data[uKey], newCollectionNames...)
}

Expand All @@ -308,6 +370,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
DatabaseInfo: req.DatabaseInfo,
CollectionInfos: req.CollectionInfos,
RPCRequestChannelInfo: req.RPCChannelInfo,
ExtraInfo: req.ExtraInfo,
ExcludeCollections: excludeCollectionNames,
WriterCacheConfig: req.BufferConfig,
State: meta.TaskStateInitial,
Expand Down Expand Up @@ -954,9 +1017,9 @@ func replicateMetric(info *meta.TaskInfo, channelName string, msgPack *msgstream
func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *ReplicateEntity, channelName, channelPosition string) (api.Reader, error) {
taskLog := log.With(zap.String("task_id", info.TaskID))
collectionName := info.CollectionNames()[0]
databaseName := info.DatabaseInfo.Name
databaseName := getDatabaseName(info)
isAnyCollection := collectionName == cdcreader.AllCollection
isAnyDatabase := databaseName == ""
isAnyDatabase := databaseName == cdcreader.AllDatabase
// isTmpCollection := collectionName == model.TmpCollectionName

dataHandleFunc := func(funcCtx context.Context, pack *msgstream.MsgPack) bool {
Expand All @@ -974,7 +1037,15 @@ func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *Replica
msgDatabaseName := util.GetDatabaseNameFromMsgPack(pack)
// TODO it should be changed if replicate the user and role info or multi collection
// TODO how to handle it when there are "*" and "foo" collection names in the task list
if (!isAnyCollection && msgCollectionName != collectionName) ||
if msgCollectionName == "" && msgDatabaseName == "" {
extraSkip := true
if info.ExtraInfo.EnableUserRole && util.IsUserRoleMessage(pack) {
extraSkip = false
}
if extraSkip {
return true
}
} else if (!isAnyCollection && msgCollectionName != collectionName) ||
(!isAnyDatabase && msgDatabaseName != databaseName) {
// skip the message if the collection name is not equal to the task collection name
return true
Expand Down Expand Up @@ -1101,9 +1172,7 @@ func (e *MetaCDC) delete(taskID string) error {
uKey = milvusURI + kafkaAddress
collectionNames := info.CollectionNames()
e.collectionNames.Lock()
if collectionNames[0] == cdcreader.AllCollection {
e.collectionNames.excludeData[uKey] = []string{}
}
e.collectionNames.excludeData[uKey] = lo.Without(e.collectionNames.excludeData[uKey], info.ExcludeCollections...)
e.collectionNames.data[uKey] = lo.Without(e.collectionNames.data[uKey], collectionNames...)
e.collectionNames.Unlock()

Expand Down Expand Up @@ -1259,21 +1328,23 @@ func (e *MetaCDC) Maintenance(req *request.MaintenanceRequest) (*request.Mainten
}

func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc {
isAll := taskInfo.CollectionInfos[0].Name == cdcreader.AllCollection
isAllCollection := taskInfo.CollectionInfos[0].Name == cdcreader.AllCollection
databaseName := getDatabaseName(taskInfo)
isAllDataBase := databaseName == cdcreader.AllDatabase
return func(databaseInfo *coremodel.DatabaseInfo, collectionInfo *pb.CollectionInfo) bool {
currentCollectionName := collectionInfo.Schema.Name
if databaseInfo.Dropped {
log.Info("database is dropped", zap.String("database", databaseInfo.Name), zap.String("collection", currentCollectionName))
return false
}

notStarContains := !isAll && lo.ContainsBy(taskInfo.CollectionInfos, func(taskCollectionInfo model.CollectionInfo) bool {
notStarContains := !isAllCollection && lo.ContainsBy(taskInfo.CollectionInfos, func(taskCollectionInfo model.CollectionInfo) bool {
return taskCollectionInfo.Name == currentCollectionName
})
starContains := isAll && !lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
starContains := isAllCollection && !lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool {
return s == currentCollectionName
})
dbMatch := taskInfo.DatabaseInfo.Name == "" ||
dbMatch := isAllDataBase ||
taskInfo.DatabaseInfo.Name == databaseInfo.Name

return (notStarContains || starContains) && dbMatch
Expand Down
Loading

0 comments on commit 4b0d962

Please sign in to comment.