Skip to content

Commit

Permalink
webui(dm): add many fix of webui (#4946) (#5024)
Browse files Browse the repository at this point in the history
close #4956, close #4993
  • Loading branch information
ti-chi-bot authored Mar 25, 2022
1 parent 1e29b06 commit fb721c1
Show file tree
Hide file tree
Showing 22 changed files with 383 additions and 255 deletions.
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:le
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first."
ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when not enable relay and no running tasks for now"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source %s can only be updated when relay is disabled and no tasks are running for now"
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]"
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source."
ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`."
Expand Down
70 changes: 47 additions & 23 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.Name = task.Name
subTaskCfg.Mode = string(task.TaskMode)
// set task meta
subTaskCfg.MetaFile = *task.MetaSchema
subTaskCfg.MetaSchema = *task.MetaSchema
// add binlog meta
if sourceCfg.BinlogGtid != nil || sourceCfg.BinlogName != nil || sourceCfg.BinlogPos != nil {
meta := &Meta{}
Expand Down Expand Up @@ -186,7 +186,6 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
subTaskCfg.From = sourceCfgMap[sourceCfg.SourceName].From
// set target db config
subTaskCfg.To = *toDBCfg.Clone()
// TODO set meet error policy
// TODO ExprFilter
// set full unit config
subTaskCfg.MydumperConfig = DefaultMydumperConfig()
Expand Down Expand Up @@ -218,13 +217,11 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
subTaskCfg.ValidatorCfg = defaultValidatorConfig()
// set route,blockAllowList,filter config
doCnt := len(tableMigrateRuleMap[sourceCfg.SourceName])
doDBs := make([]string, doCnt)
doTables := make([]*filter.Table, doCnt)

doDBs := []string{}
doTables := []*filter.Table{}
routeRules := []*router.TableRule{}
filterRules := []*bf.BinlogEventRule{}
for j, rule := range tableMigrateRuleMap[sourceCfg.SourceName] {
for _, rule := range tableMigrateRuleMap[sourceCfg.SourceName] {
// route
if rule.Target != nil && (rule.Target.Schema != nil || rule.Target.Table != nil) {
tableRule := &router.TableRule{SchemaPattern: rule.Source.Schema, TablePattern: rule.Source.Table}
Expand All @@ -244,17 +241,31 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
return nil, terror.ErrOpenAPICommonError.Generatef("filter rule name %s not found.", name)
}
filterRule.SchemaPattern = rule.Source.Schema
filterRule.TablePattern = rule.Source.Table
if rule.Source.Table != "" {
filterRule.TablePattern = rule.Source.Table
}
filterRules = append(filterRules, &filterRule)
}
}
// BlockAllowList
doDBs[j] = rule.Source.Schema
doTables[j] = &filter.Table{Schema: rule.Source.Schema, Name: rule.Source.Table}
if rule.Source.Table != "" {
doTables = append(doTables, &filter.Table{Schema: rule.Source.Schema, Name: rule.Source.Table})
} else {
doDBs = append(doDBs, rule.Source.Schema)
}
}
subTaskCfg.RouteRules = routeRules
subTaskCfg.FilterRules = filterRules
subTaskCfg.BAList = &filter.Rules{DoDBs: removeDuplication(doDBs), DoTables: doTables}
if len(doDBs) > 0 || len(doTables) > 0 {
bAList := &filter.Rules{}
if len(doDBs) > 0 {
bAList.DoDBs = removeDuplication(doDBs)
}
if len(doTables) > 0 {
bAList.DoTables = doTables
}
subTaskCfg.BAList = bAList
}
// adjust sub task config
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
Expand Down Expand Up @@ -513,6 +524,8 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
}
// set table migrate rules
tableMigrateRuleList := []openapi.TaskTableMigrateRule{}
// used to remove repeated rules
ruleMap := map[string]struct{}{}
appendOneRule := func(sourceName, schemaPattern, tablePattern, targetSchema, targetTable string) {
tableMigrateRule := openapi.TaskTableMigrateRule{
Source: struct {
Expand All @@ -525,13 +538,15 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
Table: tablePattern,
},
}
if targetSchema != "" || targetTable != "" {
if targetSchema != "" {
tableMigrateRule.Target = &struct {
Schema *string `json:"schema,omitempty"`
Table *string `json:"table,omitempty"`
}{
Schema: &targetSchema,
Table: &targetTable,
}
if targetTable != "" {
tableMigrateRule.Target.Table = &targetTable
}
}
if filterRuleList, ok := filterMap[sourceName]; ok {
Expand All @@ -541,26 +556,35 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
}
tableMigrateRule.BinlogFilterRule = &ruleNameList
}
ruleKey := strings.Join([]string{sourceName, schemaPattern, tablePattern}, "-")
if _, ok := ruleMap[ruleKey]; ok {
return
}
ruleMap[ruleKey] = struct{}{}
tableMigrateRuleList = append(tableMigrateRuleList, tableMigrateRule)
}

// gen migrate rules by route
for sourceName, ruleList := range routeMap {
for _, rule := range ruleList {
appendOneRule(sourceName, rule.SchemaPattern, rule.TablePattern, rule.TargetSchema, rule.TargetTable)
}
}
// for user only set BlockAllowList without route rules, this means keep same with upstream db and table
if len(tableMigrateRuleList) == 0 {
for _, cfg := range subTaskConfigList {
if cfg.BAList != nil {
for idx := range cfg.BAList.DoTables {
schemaPattern := cfg.BAList.DoTables[idx].Schema
tablePattern := cfg.BAList.DoTables[idx].Name
appendOneRule(cfg.SourceID, schemaPattern, tablePattern, "", "")
}

// gen migrate rules by BAList
for _, cfg := range subTaskConfigList {
if cfg.BAList != nil {
for idx := range cfg.BAList.DoDBs {
schemaPattern := cfg.BAList.DoDBs[idx]
appendOneRule(cfg.SourceID, schemaPattern, "", "", "")
}
for idx := range cfg.BAList.DoTables {
schemaPattern := cfg.BAList.DoTables[idx].Schema
tablePattern := cfg.BAList.DoTables[idx].Name
appendOneRule(cfg.SourceID, schemaPattern, tablePattern, "", "")
}
}
}

// set basic global config
task := openapi.Task{
Name: oneSubtaskConfig.Name,
Expand Down
Loading

0 comments on commit fb721c1

Please sign in to comment.