Skip to content

Commit

Permalink
allow to set DestConnectionConfig at src #592-1
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Jul 18, 2022
1 parent 9f145f3 commit f3affd3
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 30 deletions.
8 changes: 4 additions & 4 deletions api/handler/v2/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,10 +1011,10 @@ func buildSrcTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfi

func buildMysqlDestTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfig, allocsFromNomad []nomadApi.Allocation) (destTaskDetail models.MysqlDestTaskDetail) {
mysqlConnectionConfig := &models.DatabaseConnectionConfig{
Host: internalTaskConfig.ConnectionConfig.Host,
Port: internalTaskConfig.ConnectionConfig.Port,
User: internalTaskConfig.ConnectionConfig.User,
Password: internalTaskConfig.ConnectionConfig.Password,
Host: internalTaskConfig.DestConnectionConfig.Host,
Port: internalTaskConfig.DestConnectionConfig.Port,
User: internalTaskConfig.DestConnectionConfig.User,
Password: internalTaskConfig.DestConnectionConfig.Password,
DatabaseType: "MySQL",
}
mysqlDestTaskConfig := &models.MysqlDestTaskConfig{
Expand Down
24 changes: 24 additions & 0 deletions driver/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ func (sm *StoreManager) GetGtidForJob(jobName string) (string, error) {
return string(p.Value), nil
}

func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error) {
key := fmt.Sprintf("dtle/%v/Config", jobName)

kv, err := sm.consulStore.Get(key)
if err != nil {
return nil, err
}

config := &MySQLDriverConfig{}
err = json.Unmarshal(kv.Value, config)
if err != nil {
return nil, err
}
return config, nil
}

func (sm *StoreManager) GetSourceType(jobName string) (string, error) {
sm.logger.Debug("GetSourceType")

Expand Down Expand Up @@ -256,6 +272,14 @@ func (sm *StoreManager) SrcWatchNats(jobName string, stopCh chan struct{},
return natsAddr, nil
}

func (sm *StoreManager) PutConfig(subject string, config *MySQLDriverConfig) error {
url := fmt.Sprintf("dtle/%v/Config", subject)
bs, err := json.Marshal(config)
if err != nil {
return err
}
return sm.consulStore.Put(url, bs, nil)
}
func (sm *StoreManager) PutKey(subject string, key string, value []byte) error {
url := fmt.Sprintf("dtle/%v/%v", subject, key)
return sm.consulStore.Put(url, value, nil)
Expand Down
19 changes: 7 additions & 12 deletions driver/common/taskconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TaskTypeFromString(s string) string {
}

type DtleTaskConfig struct {
GetConfigFrom string `codec:"GetConfigFrom"`
//Ref:http://dev.mysql.com/doc/refman/5.7/en/replication-options-slave.html#option_mysqld_replicate-do-table
ReplicateDoDb []*DataSource `codec:"ReplicateDoDb"`
ReplicateIgnoreDb []*DataSource `codec:"ReplicateIgnoreDb"`
Expand Down Expand Up @@ -63,11 +64,12 @@ type DtleTaskConfig struct {
UseMySQLDependency bool `codec:"UseMySQLDependency"`
ForeignKeyChecks bool `codec:"ForeignKeyChecks"`

SkipCreateDbTable bool `codec:"SkipCreateDbTable"`
SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"`
SkipIncrementalCopy bool `codec:"SkipIncrementalCopy"`
ConnectionConfig *mysqlconfig.ConnectionConfig `codec:"ConnectionConfig"`
KafkaConfig *KafkaConfig `codec:"KafkaConfig"`
SkipCreateDbTable bool `codec:"SkipCreateDbTable"`
SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"`
SkipIncrementalCopy bool `codec:"SkipIncrementalCopy"`
ConnectionConfig *mysqlconfig.ConnectionConfig `codec:"ConnectionConfig"`
DestConnectionConfig *mysqlconfig.ConnectionConfig `codec:"DestConnectionConfig"`
KafkaConfig *KafkaConfig `codec:"KafkaConfig"`
// support oracle extractor/applier
OracleConfig *config.OracleConfig `codec:"OracleConfig"`
}
Expand All @@ -89,13 +91,6 @@ func (d *DtleTaskConfig) SetDefaultForEmpty() {
d.GroupTimeout = DefaultSrcGroupTimeout
}

if d.ConnectionConfig == nil {
d.ConnectionConfig = &mysqlconfig.ConnectionConfig{}
}
if d.ConnectionConfig.Charset == "" {
d.ConnectionConfig.Charset = "utf8mb4"
}

if d.KafkaConfig != nil {
if d.KafkaConfig.MessageGroupMaxSize == 0 {
d.KafkaConfig.MessageGroupMaxSize = DefaultKafkaMessageGroupMaxSize
Expand Down
26 changes: 15 additions & 11 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ var (
"big_tx_max_jobs": hclspec.NewAttr("big_tx_max_jobs", "number", false),
})

connectionConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"Host": hclspec.NewAttr("Host", "string", true),
"Port": hclspec.NewAttr("Port", "number", true),
"User": hclspec.NewAttr("User", "string", true),
"Password": hclspec.NewAttr("Password", "string", true),
"Charset": hclspec.NewDefault(hclspec.NewAttr("Charset", "string", false),
hclspec.NewLiteral(`"utf8mb4"`)),
})

// taskConfigSpec is the hcl specification for the driver config section of
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
Expand Down Expand Up @@ -131,14 +140,8 @@ var (
"SkipIncrementalCopy": hclspec.NewAttr("SkipIncrementalCopy", "bool", false),
"SlaveNetWriteTimeout": hclspec.NewDefault(hclspec.NewAttr("SlaveNetWriteTimeout", "number", false),
hclspec.NewLiteral(`28800`)), // 8 hours
"ConnectionConfig": hclspec.NewBlock("ConnectionConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{
"Host": hclspec.NewAttr("Host", "string", true),
"Port": hclspec.NewAttr("Port", "number", true),
"User": hclspec.NewAttr("User", "string", true),
"Password": hclspec.NewAttr("Password", "string", true),
"Charset": hclspec.NewDefault(hclspec.NewAttr("Charset", "string", false),
hclspec.NewLiteral(`"utf8mb4"`)),
})),
"ConnectionConfig": hclspec.NewBlock("ConnectionConfig", false, connectionConfigSpec),
"DestConnectionConfig": hclspec.NewBlock("DestConnectionConfig", false, connectionConfigSpec),
"WaitOnJob": hclspec.NewAttr("WaitOnJob", "string", false),
"BulkInsert1": hclspec.NewDefault(hclspec.NewAttr("BulkInsert1", "number", false),
hclspec.NewLiteral(`4`)),
Expand Down Expand Up @@ -172,6 +175,7 @@ var (
"Password": hclspec.NewAttr("Password", "string", true),
"Scn": hclspec.NewAttr("Scn", "number", true),
})),
"GetConfigFrom": hclspec.NewAttr("GetConfigFrom", "string", false),
})

// capabilities is returned by the Capabilities RPC and indicates what
Expand Down Expand Up @@ -577,13 +581,13 @@ func (d *Driver) verifyDriverConfig(config common.DtleTaskConfig) error {
}

connectConfigNum := 0
for _, notNil := range []bool{config.ConnectionConfig != nil, config.KafkaConfig != nil, config.OracleConfig != nil} {
for _, notNil := range []bool{config.DestConnectionConfig != nil, config.KafkaConfig != nil, config.OracleConfig != nil} {
if notNil {
connectConfigNum += 1
}
}
if connectConfigNum != 1 {
addErrMsgs("one and only one of connection config should be set")
if connectConfigNum > 1 {
addErrMsgs("only one dest connection config should be set")
}

for _, doDb := range config.ReplicateDoDb {
Expand Down
21 changes: 18 additions & 3 deletions driver/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,21 @@ func (a *Applier) Run() {
return
}

switch a.mysqlContext.GetConfigFrom {
case "":
a.mysqlContext.DestConnectionConfig = a.mysqlContext.ConnectionConfig
a.logger.Info("get job config from nomad config")
case "consul":
a.logger.Info("get job config from consul")
a.mysqlContext, err = a.storeManager.GetConfig(a.subject)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "GetConfig"))
return
}
default:
a.onError(common.TaskStateDead, fmt.Errorf("unrecognized GetConfigFrom %v", a.mysqlContext.GetConfigFrom))
}

sourceType, err := a.storeManager.GetSourceType(a.subject)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "watchSourceType"))
Expand Down Expand Up @@ -655,7 +670,7 @@ func (a *Applier) publishProgress() {
}

func (a *Applier) InitDB() (err error) {
applierUri := a.mysqlContext.ConnectionConfig.GetDBUri()
applierUri := a.mysqlContext.DestConnectionConfig.GetDBUri()
if a.db, err = sql.CreateDB(applierUri); err != nil {
return err
}
Expand All @@ -678,7 +693,7 @@ func (a *Applier) initDBConnections() (err error) {
return someSysVars.Err
}
a.logger.Debug("Connection validated", "on",
hclog.Fmt("%s:%d", a.mysqlContext.ConnectionConfig.Host, a.mysqlContext.ConnectionConfig.Port))
hclog.Fmt("%s:%d", a.mysqlContext.DestConnectionConfig.Host, a.mysqlContext.DestConnectionConfig.Port))

a.MySQLVersion = someSysVars.Version
a.lowerCaseTableNames = someSysVars.LowerCaseTableNames
Expand All @@ -694,7 +709,7 @@ func (a *Applier) initDBConnections() (err error) {
}
a.logger.Debug("after ValidateGrants")

a.logger.Info("Initiated", "mysql", a.mysqlContext.ConnectionConfig.GetAddr(), "version", a.MySQLVersion)
a.logger.Info("Initiated", "mysql", a.mysqlContext.DestConnectionConfig.GetAddr(), "version", a.MySQLVersion)

return nil
}
Expand Down
5 changes: 5 additions & 0 deletions driver/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (e *Extractor) Run() {
return
}

err = e.storeManager.PutConfig(e.subject, e.mysqlContext)
if err != nil {
e.onError(common.TaskStateDead, errors.Wrap(err, "PutKey config"))
}

{
jobStatus, _ := e.storeManager.GetJobStatus(e.subject)
if jobStatus == common.TargetGtidFinished {
Expand Down

0 comments on commit f3affd3

Please sign in to comment.