Skip to content

Commit

Permalink
config(engine): imitate adjust DM source config (#5550)
Browse files Browse the repository at this point in the history
close #5535
  • Loading branch information
lance6716 authored May 24, 2022
1 parent b160a9a commit ee57d0a
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 43 deletions.
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName s

func (s *Server) createSource(ctx context.Context, req openapi.CreateSourceRequest) (*openapi.Source, error) {
cfg := config.OpenAPISourceToSourceCfg(req.Source)
if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
if err := CheckAndAdjustSourceConfigFunc(ctx, cfg); err != nil {
return nil, err
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *Server) updateSource(ctx context.Context, sourceName string, req openap
newCfg.From.Password = oldCfg.From.Password
}

if err := checkAndAdjustSourceConfigFunc(ctx, newCfg); err != nil {
if err := CheckAndAdjustSourceConfigFunc(ctx, newCfg); err != nil {
return nil, err
}
if err := s.scheduler.UpdateSourceCfg(newCfg); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func (s *OpenAPIControllerSuite) SetupSuite() {
s.testTask = &task

checker.CheckSyncConfigFunc = mockCheckSyncConfig
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`))
s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData", `return(true)`))
}

func (s *OpenAPIControllerSuite) TearDownSuite() {
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checker.CheckSyncConfigFunc = checker.CheckSyncConfig
s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"))
s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData"))
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,14 @@ func (s *OpenAPIViewSuite) SetupSuite() {

func (s *OpenAPIViewSuite) SetupTest() {
checker.CheckSyncConfigFunc = mockCheckSyncConfig
checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`))
s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData", `return(true)`))
}

func (s *OpenAPIViewSuite) TearDownTest() {
checker.CheckSyncConfigFunc = checker.CheckSyncConfig
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"))
s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData"))
}
Expand Down
3 changes: 2 additions & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ var (
registerOnce sync.Once
runBackgroundOnce sync.Once

// CheckAndAdjustSourceConfigFunc is exposed to dataflow engine.
// the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a
// compatibility compromise.
// When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change.
checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig
checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl
)

Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/checkpoint/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestIsFresh(t *testing.T) {
},
},
}
taskCfg := jobCfg.ToTaskConfigs()[source1]
taskCfg := jobCfg.ToTaskCfgs()[source1]
checkpointAgent := NewAgentImpl(jobCfg)

isFresh, err := checkpointAgent.IsFresh(context.Background(), lib.WorkerDMDump, &metadata.Task{Cfg: taskCfg})
Expand Down
99 changes: 70 additions & 29 deletions engine/jobmaster/dm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,62 @@
package config

import (
"context"
"os"
"time"

"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb/util/filter"
router "github.com/pingcap/tidb/util/table-router"
dmconfig "github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/master"
"gopkg.in/yaml.v2"
)

// JobCfg copies from tiflow/dm/config/config.go and removes some deprecated fields.
// UpstreamCfg copies the needed fields from DM SourceCfg and MySQLInstance part
// of DM task config.
type UpstreamCfg struct {
dmconfig.MySQLInstance `yaml:",inline" toml:",inline" json:",inline"`
DBCfg *dmconfig.DBConfig `yaml:"db-config" toml:"db-config" json:"db-config"`
ServerID uint32 `yaml:"server-id" toml:"server-id" json:"server-id"`
Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"`
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
}

func (u *UpstreamCfg) fromDMSourceConfig(from *dmconfig.SourceConfig) {
u.DBCfg = from.From.Clone()
u.ServerID = from.ServerID
u.Flavor = from.Flavor
u.EnableGTID = from.EnableGTID
}

func (u *UpstreamCfg) toDMSourceConfig() *dmconfig.SourceConfig {
ret := dmconfig.NewSourceConfig()
ret.SourceID = u.SourceID
ret.From = *u.DBCfg.Clone()
ret.ServerID = u.ServerID
ret.Flavor = u.Flavor
ret.EnableGTID = u.EnableGTID

return ret
}

func (u *UpstreamCfg) adjust() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
dmSource := u.toDMSourceConfig()
err := master.CheckAndAdjustSourceConfigFunc(ctx, dmSource)
if err != nil {
return err
}
u.fromDMSourceConfig(dmSource)
return nil
}

// JobCfg copies from SubTaskConfig and removes some deprecated fields.
// It represents a DM subtask with multiple source configs embedded as Upstreams.
// DISCUSS: support command line args. e.g. --start-time.
type JobCfg struct {
Name string `yaml:"name" toml:"name" json:"name"`
Expand All @@ -46,7 +90,6 @@ type JobCfg struct {
Syncers map[string]*dmconfig.SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"`
Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"`
Validators map[string]*dmconfig.ValidatorConfig `yaml:"validators" toml:"validators" json:"validators"`

// remove source config, use db config instead.
Upstreams []*UpstreamCfg `yaml:"upstreams" toml:"upstreams" json:"upstreams"`

Expand Down Expand Up @@ -75,16 +118,6 @@ type JobCfg struct {
// RemoveMeta bool `yaml:"remove-meta"`
}

// TaskCfg alias JobCfg
// The difference between task configuration and job configuration is that a task has only one usptream.
type TaskCfg JobCfg

// UpstreamCfg add db-config to dmconfig.MySQLInstance, because we no need source cfg now.
type UpstreamCfg struct {
dmconfig.MySQLInstance `yaml:",inline" toml:",inline" json:",inline"`
DBCfg *dmconfig.DBConfig `yaml:"db-config" toml:"db-config" json:"db-config"`
}

// DecodeFile reads file content from a given path and decodes it.
func (c *JobCfg) DecodeFile(fpath string) error {
bs, err := os.ReadFile(fpath)
Expand All @@ -105,9 +138,8 @@ func (c *JobCfg) Decode(content []byte) error {
}

// Yaml serializes the JobCfg into a YAML document.
func (c *JobCfg) Yaml() (string, error) {
b, err := yaml.Marshal(c)
return string(b), err
func (c *JobCfg) Yaml() ([]byte, error) {
return yaml.Marshal(c)
}

// Clone returns a deep copy of JobCfg
Expand All @@ -117,46 +149,48 @@ func (c *JobCfg) Clone() (*JobCfg, error) {
return nil, err
}
clone := &JobCfg{}
err = yaml.Unmarshal([]byte(content), clone)
err = yaml.Unmarshal(content, clone)
return clone, err
}

// ToTaskConfigs converts job config to a map, mapping from upstream source id
// ToTaskCfgs converts job config to a map, mapping from upstream source id
// to task config.
func (c *JobCfg) ToTaskConfigs() map[string]*TaskCfg {
func (c *JobCfg) ToTaskCfgs() map[string]*TaskCfg {
taskCfgs := make(map[string]*TaskCfg, len(c.Upstreams))
for _, mysqlInstance := range c.Upstreams {
// nolint:errcheck
jobCfg, _ := c.Clone()
jobCfg.Upstreams = []*UpstreamCfg{mysqlInstance}

taskCfg := (*TaskCfg)(jobCfg)
taskCfg.Upstreams = []*UpstreamCfg{mysqlInstance}
taskCfgs[mysqlInstance.SourceID] = taskCfg
}
return taskCfgs
}

// toDMTaskCfg transform a jobCfg to dm TaskCfg.
func (c *JobCfg) toDMTaskCfg() (*dmconfig.TaskConfig, error) {
// toDMTaskConfig transform a jobCfg to DM TaskCfg.
func (c *JobCfg) toDMTaskConfig() (*dmconfig.TaskConfig, error) {
dmTaskCfg := &dmconfig.TaskConfig{}

// Copy all the fields contained in dmTaskCfg.
content, err := c.Yaml()
if err != nil {
return nil, err
}
if err = yaml.Unmarshal([]byte(content), dmTaskCfg); err != nil {
if err = yaml.Unmarshal(content, dmTaskCfg); err != nil {
return nil, err
}

// transform all the fields not contained in dmTaskCfg.
for _, upstream := range c.Upstreams {
if err = upstream.adjust(); err != nil {
return nil, err
}
dmTaskCfg.MySQLInstances = append(dmTaskCfg.MySQLInstances, &upstream.MySQLInstance)
}
return dmTaskCfg, nil
}

func (c *JobCfg) fromDMTaskCfg(dmTaskCfg *dmconfig.TaskConfig) error {
func (c *JobCfg) fromDMTaskConfig(dmTaskCfg *dmconfig.TaskConfig) error {
// Copy all the fields contained in jobCfg.
return yaml.Unmarshal([]byte(dmTaskCfg.String()), c)

Expand All @@ -166,16 +200,20 @@ func (c *JobCfg) fromDMTaskCfg(dmTaskCfg *dmconfig.TaskConfig) error {
}

func (c *JobCfg) adjust() error {
dmTaskCfg, err := c.toDMTaskCfg()
dmTaskCfg, err := c.toDMTaskConfig()
if err != nil {
return err
}
if err := dmTaskCfg.Adjust(); err != nil {
return err
}
return c.fromDMTaskCfg(dmTaskCfg)
return c.fromDMTaskConfig(dmTaskCfg)
}

// TaskCfg shares same struct as JobCfg, but it only serves one upstream.
// TaskCfg can be converted to an equivalent DM subtask by ToDMSubTaskCfg.
type TaskCfg JobCfg

// ToDMSubTaskCfg adapts a TaskCfg to a SubTaskCfg for worker now.
// TODO: fully support all fields
func (c *TaskCfg) ToDMSubTaskCfg() *dmconfig.SubTaskConfig {
Expand All @@ -191,14 +229,17 @@ func (c *TaskCfg) ToDMSubTaskCfg() *dmconfig.SubTaskConfig {
cfg.IgnoreCheckingItems = c.IgnoreCheckingItems
cfg.MetaSchema = c.MetaSchema
cfg.Timezone = c.Timezone
cfg.Meta = c.Upstreams[0].Meta
cfg.From = *c.Upstreams[0].DBCfg
cfg.To = *c.TargetDB
cfg.Experimental = c.Experimental
cfg.CollationCompatible = c.CollationCompatible
cfg.SourceID = c.Upstreams[0].SourceID
cfg.BAList = c.BAList[c.Upstreams[0].BAListName]

cfg.SourceID = c.Upstreams[0].SourceID
cfg.Meta = c.Upstreams[0].Meta
cfg.From = *c.Upstreams[0].DBCfg
cfg.ServerID = c.Upstreams[0].ServerID
cfg.Flavor = c.Upstreams[0].Flavor

cfg.RouteRules = make([]*router.TableRule, len(c.Upstreams[0].RouteRules))
for j, name := range c.Upstreams[0].RouteRules {
cfg.RouteRules[j] = c.Routes[name]
Expand Down
27 changes: 24 additions & 3 deletions engine/jobmaster/dm/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package config

import (
"context"
"fmt"
"testing"

"github.com/BurntSushi/toml"
dmconfig "github.com/pingcap/tiflow/dm/dm/config"
dmmaster "github.com/pingcap/tiflow/dm/dm/master"
"github.com/stretchr/testify/require"
)

Expand All @@ -27,7 +29,20 @@ const (
subtaskTemplateDir = "."
)

func checkAndNoAdjustSourceConfigMock(ctx context.Context, cfg *dmconfig.SourceConfig) error {
if _, err := cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func TestJobCfg(t *testing.T) {
funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc
dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
defer func() {
dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup
}()

jobCfg := &JobCfg{}
require.NoError(t, jobCfg.DecodeFile(jobTemplatePath))
require.Equal(t, "test", jobCfg.Name)
Expand All @@ -40,9 +55,9 @@ func TestJobCfg(t *testing.T) {
require.NoError(t, err)
require.Equal(t, content2, content)

dmTaskCfg, err := clone.toDMTaskCfg()
dmTaskCfg, err := clone.toDMTaskConfig()
require.NoError(t, err)
require.NoError(t, clone.fromDMTaskCfg(dmTaskCfg))
require.NoError(t, clone.fromDMTaskConfig(dmTaskCfg))
content3, err := clone.Yaml()
require.NoError(t, err)
require.Equal(t, content3, content)
Expand All @@ -51,10 +66,16 @@ func TestJobCfg(t *testing.T) {
}

func TestTaskCfg(t *testing.T) {
funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc
dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
defer func() {
dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup
}()

jobCfg := &JobCfg{}
require.NoError(t, jobCfg.DecodeFile(jobTemplatePath))

taskCfgs := jobCfg.ToTaskConfigs()
taskCfgs := jobCfg.ToTaskCfgs()
for _, taskCfg := range taskCfgs {
subTaskCfg := taskCfg.ToDMSubTaskCfg()
expectCfg := &dmconfig.SubTaskConfig{}
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (jm *JobMaster) getInitStatus() ([]runtime.TaskStatus, []runtime.WorkerStat
func (jm *JobMaster) preCheck(ctx context.Context) error {
log.L().Info("start pre-checking job config", zap.String("id", jm.workerID), zap.String("jobmaster_id", jm.JobMasterID()))

taskCfgs := jm.jobCfg.ToTaskConfigs()
taskCfgs := jm.jobCfg.ToTaskCfgs()
dmSubtaskCfgs := make([]*dmconfig.SubTaskConfig, 0, len(taskCfgs))
for _, taskCfg := range taskCfgs {
dmSubtaskCfgs = append(dmSubtaskCfgs, taskCfg.ToDMSubTaskCfg())
Expand Down
15 changes: 15 additions & 0 deletions engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/tiflow/dm/checker"
dmconfig "github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/master"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/log"
resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model"
Expand Down Expand Up @@ -61,6 +62,7 @@ func TestDMJobmasterSuite(t *testing.T) {

type testDMJobmasterSuite struct {
suite.Suite
funcBackup func(ctx context.Context, cfg *dmconfig.SourceConfig) error
}

func (t *testDMJobmasterSuite) SetupSuite() {
Expand All @@ -70,6 +72,19 @@ func (t *testDMJobmasterSuite) SetupSuite() {
WorkerErrorInterval = 100 * time.Millisecond
runtime.HeartbeatInterval = 1 * time.Second
require.NoError(t.T(), log.InitLogger(&log.Config{Level: "debug"}))
t.funcBackup = master.CheckAndAdjustSourceConfigFunc
master.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock
}

func checkAndNoAdjustSourceConfigMock(ctx context.Context, cfg *dmconfig.SourceConfig) error {
if _, err := cfg.Yaml(); err != nil {
return err
}
return cfg.Verify()
}

func (t *testDMJobmasterSuite) TearDownSuite() {
master.CheckAndAdjustSourceConfigFunc = t.funcBackup
}

type masterParamListForTest struct {
Expand Down
Loading

0 comments on commit ee57d0a

Please sign in to comment.