diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 3622057..cbcc994 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -1061,7 +1061,7 @@ func (e *MetaCDC) getChannelReader(info *meta.TaskInfo, replicateEntity *Replica } } else { // skip the msg when db or collection name is not matched - collectionInfos := GetCollectionInfos(info, msgDatabaseName) + collectionInfos := GetCollectionInfos(info, msgDatabaseName, msgCollectionName) if collectionInfos == nil { return true } @@ -1353,7 +1353,7 @@ func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc { log.Info("database is dropped", zap.String("database", databaseInfo.Name), zap.String("collection", currentCollectionName)) return false } - taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name) + taskCollectionInfos := GetCollectionInfos(taskInfo, databaseInfo.Name, currentCollectionName) if taskCollectionInfos == nil { return false } @@ -1361,7 +1361,7 @@ func GetShouldReadFunc(taskInfo *meta.TaskInfo) cdcreader.ShouldReadFunc { } } -func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.CollectionInfo { +func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string, collectionName string) []model.CollectionInfo { var taskCollectionInfos []model.CollectionInfo if len(taskInfo.CollectionInfos) > 0 { if dbName != cdcreader.DefaultDatabase { @@ -1373,8 +1373,8 @@ func GetCollectionInfos(taskInfo *meta.TaskInfo, dbName string) []model.Collecti taskCollectionInfos = taskInfo.DBCollections[dbName] if taskCollectionInfos == nil { isExclude := lo.ContainsBy(taskInfo.ExcludeCollections, func(s string) bool { - db, _ := getCollectionNameFromFull(s) - return db == dbName + db, collection := getCollectionNameFromFull(s) + return db == dbName && collection == collectionName }) if isExclude { return nil diff --git a/server/cdc_impl_test.go b/server/cdc_impl_test.go index 2646291..48299ad 100644 --- a/server/cdc_impl_test.go +++ b/server/cdc_impl_test.go @@ -835,6 +835,46 @@ func TestShouldReadCollection(t *testing.T) { }, })) }) + + t.Run("db collections", func(t *testing.T) { + f := GetShouldReadFunc(&meta.TaskInfo{ + DBCollections: map[string][]model.CollectionInfo{ + "*": { + { + Name: "*", + }, + }, + }, + ExcludeCollections: []string{"default.foo"}, + }) + assert.False(t, f( + &coremodel.DatabaseInfo{ + Name: cdcreader.DefaultDatabase, + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) + assert.True(t, f( + &coremodel.DatabaseInfo{ + Name: cdcreader.DefaultDatabase, + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "hoo", + }, + })) + assert.True(t, f( + &coremodel.DatabaseInfo{ + Name: "kind", + }, + &pb.CollectionInfo{ + Schema: &schemapb.CollectionSchema{ + Name: "foo", + }, + })) + }) } func TestList(t *testing.T) { diff --git a/server/server.go b/server/server.go index 4717d12..118094a 100644 --- a/server/server.go +++ b/server/server.go @@ -126,6 +126,8 @@ func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer ht c.handleError(writer, fmt.Sprintf("fail to decode the %s request, error: %s", requestType, err.Error()), http.StatusInternalServerError) return nil } + requestBytes, _ := json.Marshal(requestModel) + log.Info("request receive", zap.String("type", requestType), zap.String("data", string(requestBytes))) response, err := handler.handle(c.api, requestModel) if err != nil { code := http.StatusInternalServerError