From ee57d0a7a46e3f6fa70d8b54e77b579d11aa1c2f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 24 May 2022 19:50:46 +0800 Subject: [PATCH] config(engine): imitate adjust DM source config (#5550) close pingcap/tiflow#5535 --- dm/dm/master/openapi_controller.go | 4 +- dm/dm/master/openapi_controller_test.go | 4 +- dm/dm/master/openapi_view_test.go | 4 +- dm/dm/master/server.go | 3 +- engine/jobmaster/dm/checkpoint/agent_test.go | 2 +- engine/jobmaster/dm/config/config.go | 99 ++++++++++++++------ engine/jobmaster/dm/config/config_test.go | 27 +++++- engine/jobmaster/dm/dm_jobmaster.go | 2 +- engine/jobmaster/dm/dm_jobmaster_test.go | 15 +++ engine/jobmaster/dm/message_agent_test.go | 9 +- engine/jobmaster/dm/metadata/job.go | 2 +- engine/jobmaster/dm/metadata/job_test.go | 15 +++ 12 files changed, 143 insertions(+), 43 deletions(-) diff --git a/dm/dm/master/openapi_controller.go b/dm/dm/master/openapi_controller.go index f00d21af8f3..20662881e48 100644 --- a/dm/dm/master/openapi_controller.go +++ b/dm/dm/master/openapi_controller.go @@ -115,7 +115,7 @@ func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName s func (s *Server) createSource(ctx context.Context, req openapi.CreateSourceRequest) (*openapi.Source, error) { cfg := config.OpenAPISourceToSourceCfg(req.Source) - if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil { + if err := CheckAndAdjustSourceConfigFunc(ctx, cfg); err != nil { return nil, err } @@ -148,7 +148,7 @@ func (s *Server) updateSource(ctx context.Context, sourceName string, req openap newCfg.From.Password = oldCfg.From.Password } - if err := checkAndAdjustSourceConfigFunc(ctx, newCfg); err != nil { + if err := CheckAndAdjustSourceConfigFunc(ctx, newCfg); err != nil { return nil, err } if err := s.scheduler.UpdateSourceCfg(newCfg); err != nil { diff --git a/dm/dm/master/openapi_controller_test.go b/dm/dm/master/openapi_controller_test.go index 7925366b6fc..83a2b27c83a 100644 --- a/dm/dm/master/openapi_controller_test.go +++ b/dm/dm/master/openapi_controller_test.go @@ -62,13 +62,13 @@ func (s *OpenAPIControllerSuite) SetupSuite() { s.testTask = &task checker.CheckSyncConfigFunc = mockCheckSyncConfig - checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`)) s.Nil(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData", `return(true)`)) } func (s *OpenAPIControllerSuite) TearDownSuite() { - checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig + CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig checker.CheckSyncConfigFunc = checker.CheckSyncConfig s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB")) s.Nil(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData")) diff --git a/dm/dm/master/openapi_view_test.go b/dm/dm/master/openapi_view_test.go index 16abd4d399b..2ba781d9d44 100644 --- a/dm/dm/master/openapi_view_test.go +++ b/dm/dm/master/openapi_view_test.go @@ -163,14 +163,14 @@ func (s *OpenAPIViewSuite) SetupSuite() { func (s *OpenAPIViewSuite) SetupTest() { checker.CheckSyncConfigFunc = mockCheckSyncConfig - checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`)) s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData", `return(true)`)) } func (s *OpenAPIViewSuite) TearDownTest() { checker.CheckSyncConfigFunc = checker.CheckSyncConfig - checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig + CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB")) s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData")) } diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index dda598ffc71..2f088b7e75d 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -87,10 +87,11 @@ var ( registerOnce sync.Once runBackgroundOnce sync.Once + // CheckAndAdjustSourceConfigFunc is exposed to dataflow engine. // the difference of below functions is checkAndAdjustSourceConfigForDMCtlFunc will not AdjustCaseSensitive. It's a // compatibility compromise. // When we need to change the implementation of dmctl to OpenAPI, we should notice the user about this change. - checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig + CheckAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig checkAndAdjustSourceConfigForDMCtlFunc = checkAndAdjustSourceConfigForDMCtl ) diff --git a/engine/jobmaster/dm/checkpoint/agent_test.go b/engine/jobmaster/dm/checkpoint/agent_test.go index e70d8f874ea..f326bc438ff 100644 --- a/engine/jobmaster/dm/checkpoint/agent_test.go +++ b/engine/jobmaster/dm/checkpoint/agent_test.go @@ -194,7 +194,7 @@ func TestIsFresh(t *testing.T) { }, }, } - taskCfg := jobCfg.ToTaskConfigs()[source1] + taskCfg := jobCfg.ToTaskCfgs()[source1] checkpointAgent := NewAgentImpl(jobCfg) isFresh, err := checkpointAgent.IsFresh(context.Background(), lib.WorkerDMDump, &metadata.Task{Cfg: taskCfg}) diff --git a/engine/jobmaster/dm/config/config.go b/engine/jobmaster/dm/config/config.go index ca125c81190..6bfc8eb053f 100644 --- a/engine/jobmaster/dm/config/config.go +++ b/engine/jobmaster/dm/config/config.go @@ -14,7 +14,9 @@ package config import ( + "context" "os" + "time" "github.com/pingcap/errors" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" @@ -22,10 +24,52 @@ import ( "github.com/pingcap/tidb/util/filter" router "github.com/pingcap/tidb/util/table-router" dmconfig "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/master" "gopkg.in/yaml.v2" ) -// JobCfg copies from tiflow/dm/config/config.go and removes some deprecated fields. +// UpstreamCfg copies the needed fields from DM SourceCfg and MySQLInstance part +// of DM task config. +type UpstreamCfg struct { + dmconfig.MySQLInstance `yaml:",inline" toml:",inline" json:",inline"` + DBCfg *dmconfig.DBConfig `yaml:"db-config" toml:"db-config" json:"db-config"` + ServerID uint32 `yaml:"server-id" toml:"server-id" json:"server-id"` + Flavor string `yaml:"flavor" toml:"flavor" json:"flavor"` + EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` +} + +func (u *UpstreamCfg) fromDMSourceConfig(from *dmconfig.SourceConfig) { + u.DBCfg = from.From.Clone() + u.ServerID = from.ServerID + u.Flavor = from.Flavor + u.EnableGTID = from.EnableGTID +} + +func (u *UpstreamCfg) toDMSourceConfig() *dmconfig.SourceConfig { + ret := dmconfig.NewSourceConfig() + ret.SourceID = u.SourceID + ret.From = *u.DBCfg.Clone() + ret.ServerID = u.ServerID + ret.Flavor = u.Flavor + ret.EnableGTID = u.EnableGTID + + return ret +} + +func (u *UpstreamCfg) adjust() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + dmSource := u.toDMSourceConfig() + err := master.CheckAndAdjustSourceConfigFunc(ctx, dmSource) + if err != nil { + return err + } + u.fromDMSourceConfig(dmSource) + return nil +} + +// JobCfg copies from SubTaskConfig and removes some deprecated fields. +// It represents a DM subtask with multiple source configs embedded as Upstreams. // DISCUSS: support command line args. e.g. --start-time. type JobCfg struct { Name string `yaml:"name" toml:"name" json:"name"` @@ -46,7 +90,6 @@ type JobCfg struct { Syncers map[string]*dmconfig.SyncerConfig `yaml:"syncers" toml:"syncers" json:"syncers"` Routes map[string]*router.TableRule `yaml:"routes" toml:"routes" json:"routes"` Validators map[string]*dmconfig.ValidatorConfig `yaml:"validators" toml:"validators" json:"validators"` - // remove source config, use db config instead. Upstreams []*UpstreamCfg `yaml:"upstreams" toml:"upstreams" json:"upstreams"` @@ -75,16 +118,6 @@ type JobCfg struct { // RemoveMeta bool `yaml:"remove-meta"` } -// TaskCfg alias JobCfg -// The difference between task configuration and job configuration is that a task has only one usptream. -type TaskCfg JobCfg - -// UpstreamCfg add db-config to dmconfig.MySQLInstance, because we no need source cfg now. -type UpstreamCfg struct { - dmconfig.MySQLInstance `yaml:",inline" toml:",inline" json:",inline"` - DBCfg *dmconfig.DBConfig `yaml:"db-config" toml:"db-config" json:"db-config"` -} - // DecodeFile reads file content from a given path and decodes it. func (c *JobCfg) DecodeFile(fpath string) error { bs, err := os.ReadFile(fpath) @@ -105,9 +138,8 @@ func (c *JobCfg) Decode(content []byte) error { } // Yaml serializes the JobCfg into a YAML document. -func (c *JobCfg) Yaml() (string, error) { - b, err := yaml.Marshal(c) - return string(b), err +func (c *JobCfg) Yaml() ([]byte, error) { + return yaml.Marshal(c) } // Clone returns a deep copy of JobCfg @@ -117,27 +149,26 @@ func (c *JobCfg) Clone() (*JobCfg, error) { return nil, err } clone := &JobCfg{} - err = yaml.Unmarshal([]byte(content), clone) + err = yaml.Unmarshal(content, clone) return clone, err } -// ToTaskConfigs converts job config to a map, mapping from upstream source id +// ToTaskCfgs converts job config to a map, mapping from upstream source id // to task config. -func (c *JobCfg) ToTaskConfigs() map[string]*TaskCfg { +func (c *JobCfg) ToTaskCfgs() map[string]*TaskCfg { taskCfgs := make(map[string]*TaskCfg, len(c.Upstreams)) for _, mysqlInstance := range c.Upstreams { // nolint:errcheck jobCfg, _ := c.Clone() - jobCfg.Upstreams = []*UpstreamCfg{mysqlInstance} - taskCfg := (*TaskCfg)(jobCfg) + taskCfg.Upstreams = []*UpstreamCfg{mysqlInstance} taskCfgs[mysqlInstance.SourceID] = taskCfg } return taskCfgs } -// toDMTaskCfg transform a jobCfg to dm TaskCfg. -func (c *JobCfg) toDMTaskCfg() (*dmconfig.TaskConfig, error) { +// toDMTaskConfig transform a jobCfg to DM TaskCfg. +func (c *JobCfg) toDMTaskConfig() (*dmconfig.TaskConfig, error) { dmTaskCfg := &dmconfig.TaskConfig{} // Copy all the fields contained in dmTaskCfg. @@ -145,18 +176,21 @@ func (c *JobCfg) toDMTaskCfg() (*dmconfig.TaskConfig, error) { if err != nil { return nil, err } - if err = yaml.Unmarshal([]byte(content), dmTaskCfg); err != nil { + if err = yaml.Unmarshal(content, dmTaskCfg); err != nil { return nil, err } // transform all the fields not contained in dmTaskCfg. for _, upstream := range c.Upstreams { + if err = upstream.adjust(); err != nil { + return nil, err + } dmTaskCfg.MySQLInstances = append(dmTaskCfg.MySQLInstances, &upstream.MySQLInstance) } return dmTaskCfg, nil } -func (c *JobCfg) fromDMTaskCfg(dmTaskCfg *dmconfig.TaskConfig) error { +func (c *JobCfg) fromDMTaskConfig(dmTaskCfg *dmconfig.TaskConfig) error { // Copy all the fields contained in jobCfg. return yaml.Unmarshal([]byte(dmTaskCfg.String()), c) @@ -166,16 +200,20 @@ func (c *JobCfg) fromDMTaskCfg(dmTaskCfg *dmconfig.TaskConfig) error { } func (c *JobCfg) adjust() error { - dmTaskCfg, err := c.toDMTaskCfg() + dmTaskCfg, err := c.toDMTaskConfig() if err != nil { return err } if err := dmTaskCfg.Adjust(); err != nil { return err } - return c.fromDMTaskCfg(dmTaskCfg) + return c.fromDMTaskConfig(dmTaskCfg) } +// TaskCfg shares same struct as JobCfg, but it only serves one upstream. +// TaskCfg can be converted to an equivalent DM subtask by ToDMSubTaskCfg. +type TaskCfg JobCfg + // ToDMSubTaskCfg adapts a TaskCfg to a SubTaskCfg for worker now. // TODO: fully support all fields func (c *TaskCfg) ToDMSubTaskCfg() *dmconfig.SubTaskConfig { @@ -191,14 +229,17 @@ func (c *TaskCfg) ToDMSubTaskCfg() *dmconfig.SubTaskConfig { cfg.IgnoreCheckingItems = c.IgnoreCheckingItems cfg.MetaSchema = c.MetaSchema cfg.Timezone = c.Timezone - cfg.Meta = c.Upstreams[0].Meta - cfg.From = *c.Upstreams[0].DBCfg cfg.To = *c.TargetDB cfg.Experimental = c.Experimental cfg.CollationCompatible = c.CollationCompatible - cfg.SourceID = c.Upstreams[0].SourceID cfg.BAList = c.BAList[c.Upstreams[0].BAListName] + cfg.SourceID = c.Upstreams[0].SourceID + cfg.Meta = c.Upstreams[0].Meta + cfg.From = *c.Upstreams[0].DBCfg + cfg.ServerID = c.Upstreams[0].ServerID + cfg.Flavor = c.Upstreams[0].Flavor + cfg.RouteRules = make([]*router.TableRule, len(c.Upstreams[0].RouteRules)) for j, name := range c.Upstreams[0].RouteRules { cfg.RouteRules[j] = c.Routes[name] diff --git a/engine/jobmaster/dm/config/config_test.go b/engine/jobmaster/dm/config/config_test.go index dd410436352..026ed1dec0d 100644 --- a/engine/jobmaster/dm/config/config_test.go +++ b/engine/jobmaster/dm/config/config_test.go @@ -14,11 +14,13 @@ package config import ( + "context" "fmt" "testing" "github.com/BurntSushi/toml" dmconfig "github.com/pingcap/tiflow/dm/dm/config" + dmmaster "github.com/pingcap/tiflow/dm/dm/master" "github.com/stretchr/testify/require" ) @@ -27,7 +29,20 @@ const ( subtaskTemplateDir = "." ) +func checkAndNoAdjustSourceConfigMock(ctx context.Context, cfg *dmconfig.SourceConfig) error { + if _, err := cfg.Yaml(); err != nil { + return err + } + return cfg.Verify() +} + func TestJobCfg(t *testing.T) { + funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc + dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + defer func() { + dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup + }() + jobCfg := &JobCfg{} require.NoError(t, jobCfg.DecodeFile(jobTemplatePath)) require.Equal(t, "test", jobCfg.Name) @@ -40,9 +55,9 @@ func TestJobCfg(t *testing.T) { require.NoError(t, err) require.Equal(t, content2, content) - dmTaskCfg, err := clone.toDMTaskCfg() + dmTaskCfg, err := clone.toDMTaskConfig() require.NoError(t, err) - require.NoError(t, clone.fromDMTaskCfg(dmTaskCfg)) + require.NoError(t, clone.fromDMTaskConfig(dmTaskCfg)) content3, err := clone.Yaml() require.NoError(t, err) require.Equal(t, content3, content) @@ -51,10 +66,16 @@ func TestJobCfg(t *testing.T) { } func TestTaskCfg(t *testing.T) { + funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc + dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + defer func() { + dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup + }() + jobCfg := &JobCfg{} require.NoError(t, jobCfg.DecodeFile(jobTemplatePath)) - taskCfgs := jobCfg.ToTaskConfigs() + taskCfgs := jobCfg.ToTaskCfgs() for _, taskCfg := range taskCfgs { subTaskCfg := taskCfg.ToDMSubTaskCfg() expectCfg := &dmconfig.SubTaskConfig{} diff --git a/engine/jobmaster/dm/dm_jobmaster.go b/engine/jobmaster/dm/dm_jobmaster.go index 43106ebcc31..904fcecd6f7 100644 --- a/engine/jobmaster/dm/dm_jobmaster.go +++ b/engine/jobmaster/dm/dm_jobmaster.go @@ -302,7 +302,7 @@ func (jm *JobMaster) getInitStatus() ([]runtime.TaskStatus, []runtime.WorkerStat func (jm *JobMaster) preCheck(ctx context.Context) error { log.L().Info("start pre-checking job config", zap.String("id", jm.workerID), zap.String("jobmaster_id", jm.JobMasterID())) - taskCfgs := jm.jobCfg.ToTaskConfigs() + taskCfgs := jm.jobCfg.ToTaskCfgs() dmSubtaskCfgs := make([]*dmconfig.SubTaskConfig, 0, len(taskCfgs)) for _, taskCfg := range taskCfgs { dmSubtaskCfgs = append(dmSubtaskCfgs, taskCfg.ToDMSubTaskCfg()) diff --git a/engine/jobmaster/dm/dm_jobmaster_test.go b/engine/jobmaster/dm/dm_jobmaster_test.go index 1e770ade8ed..23d76dff974 100644 --- a/engine/jobmaster/dm/dm_jobmaster_test.go +++ b/engine/jobmaster/dm/dm_jobmaster_test.go @@ -25,6 +25,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tiflow/dm/checker" dmconfig "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/master" "github.com/pingcap/tiflow/dm/pkg/conn" "github.com/pingcap/tiflow/dm/pkg/log" resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" @@ -61,6 +62,7 @@ func TestDMJobmasterSuite(t *testing.T) { type testDMJobmasterSuite struct { suite.Suite + funcBackup func(ctx context.Context, cfg *dmconfig.SourceConfig) error } func (t *testDMJobmasterSuite) SetupSuite() { @@ -70,6 +72,19 @@ func (t *testDMJobmasterSuite) SetupSuite() { WorkerErrorInterval = 100 * time.Millisecond runtime.HeartbeatInterval = 1 * time.Second require.NoError(t.T(), log.InitLogger(&log.Config{Level: "debug"})) + t.funcBackup = master.CheckAndAdjustSourceConfigFunc + master.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock +} + +func checkAndNoAdjustSourceConfigMock(ctx context.Context, cfg *dmconfig.SourceConfig) error { + if _, err := cfg.Yaml(); err != nil { + return err + } + return cfg.Verify() +} + +func (t *testDMJobmasterSuite) TearDownSuite() { + master.CheckAndAdjustSourceConfigFunc = t.funcBackup } type masterParamListForTest struct { diff --git a/engine/jobmaster/dm/message_agent_test.go b/engine/jobmaster/dm/message_agent_test.go index 6d1dc11dcff..63a094d0bf7 100644 --- a/engine/jobmaster/dm/message_agent_test.go +++ b/engine/jobmaster/dm/message_agent_test.go @@ -19,6 +19,7 @@ import ( "sync" "testing" + dmmaster "github.com/pingcap/tiflow/dm/dm/master" "github.com/pingcap/tiflow/engine/lib/master" resourcemeta "github.com/pingcap/tiflow/engine/pkg/externalresource/resourcemeta/model" @@ -82,13 +83,19 @@ func TestUpdateWorkerHandle(t *testing.T) { } func TestOperateWorker(t *testing.T) { + funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc + dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + defer func() { + dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup + }() + mockMasterImpl := &MockMaster{} messageAgent := NewMessageAgent(nil, "mock-jobmaster", mockMasterImpl) task1 := "task1" worker1 := "worker1" jobCfg := &config.JobCfg{} require.NoError(t, jobCfg.DecodeFile(jobTemplatePath)) - taskCfgs := jobCfg.ToTaskConfigs() + taskCfgs := jobCfg.ToTaskCfgs() taskCfg := taskCfgs[jobCfg.Upstreams[0].SourceID] // create worker diff --git a/engine/jobmaster/dm/metadata/job.go b/engine/jobmaster/dm/metadata/job.go index ca5bc3741cc..4f52f1a39b2 100644 --- a/engine/jobmaster/dm/metadata/job.go +++ b/engine/jobmaster/dm/metadata/job.go @@ -49,7 +49,7 @@ type Job struct { // NewJob creates a new Job instance func NewJob(jobCfg *config.JobCfg) *Job { - taskCfgs := jobCfg.ToTaskConfigs() + taskCfgs := jobCfg.ToTaskCfgs() job := &Job{ Tasks: make(map[string]*Task, len(taskCfgs)), } diff --git a/engine/jobmaster/dm/metadata/job_test.go b/engine/jobmaster/dm/metadata/job_test.go index 8348c5245dc..c89d4ea9938 100644 --- a/engine/jobmaster/dm/metadata/job_test.go +++ b/engine/jobmaster/dm/metadata/job_test.go @@ -17,6 +17,8 @@ import ( "context" "testing" + dmconfig "github.com/pingcap/tiflow/dm/dm/config" + dmmaster "github.com/pingcap/tiflow/dm/dm/master" "github.com/stretchr/testify/require" "github.com/pingcap/tiflow/engine/jobmaster/dm/config" @@ -28,7 +30,20 @@ const ( jobTemplatePath = "../config/job_template.yaml" ) +func checkAndNoAdjustSourceConfigMock(ctx context.Context, cfg *dmconfig.SourceConfig) error { + if _, err := cfg.Yaml(); err != nil { + return err + } + return cfg.Verify() +} + func TestJobStore(t *testing.T) { + funcBackup := dmmaster.CheckAndAdjustSourceConfigFunc + dmmaster.CheckAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + defer func() { + dmmaster.CheckAndAdjustSourceConfigFunc = funcBackup + }() + var ( source1 = "mysql-replica-01" source2 = "mysql-replica-02"