Skip to content

Commit

Permalink
Merge branch 'two-way-sync' #592-3
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Oct 27, 2022
2 parents 7d10d8b + b75b2a4 commit ed41bb4
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 167 deletions.
92 changes: 45 additions & 47 deletions api/handler/v2/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,9 @@ func createOrUpdateJob(logger g.LoggerType, jobParam *models.CreateOrUpdateMysql
}

func convertJobToNomadJob(failover bool, jobParams *models.CreateOrUpdateMysqlToMysqlJobParamV2) (*nomadApi.Job, error) {
srcTask, srcDataCenter, err := buildNomadTaskGroupItem(buildDatabaseSrcTaskConfigMap(jobParams.SrcTask), jobParams.SrcTask.TaskName, jobParams.SrcTask.NodeId, failover, jobParams.Retry)
srcTask, srcDataCenter, err := buildNomadTaskGroupItem(
buildDatabaseSrcTaskConfigMap(jobParams.SrcTask, jobParams.DestTask, nil),
jobParams.SrcTask.TaskName, jobParams.SrcTask.NodeId, failover, jobParams.Retry)
if nil != err {
return nil, fmt.Errorf("build src task failed: %v", err)
}
Expand Down Expand Up @@ -561,21 +563,13 @@ func buildRestartPolicy(RestartAttempts int) (*nomadApi.ReschedulePolicy, *nomad

func buildDatabaseDestTaskConfigMap(config *models.DestTaskConfig) map[string]interface{} {
taskConfigInNomadFormat := make(map[string]interface{})
if config.MysqlDestTaskConfig == nil {
return taskConfigInNomadFormat
}
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.ParallelWorkers, "ParallelWorkers")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.UseMySQLDependency, "UseMySQLDependency")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.DependencyHistorySize, "DependencyHistorySize")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.BulkInsert1, "BulkInsert1")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.BulkInsert2, "BulkInsert2")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlDestTaskConfig.SetGtidNext, "SetGtidNext")
taskConfigInNomadFormat["ConnectionConfig"] = buildMysqlConnectionConfigMap(config.ConnectionConfig)
taskConfigInNomadFormat["DestType"] = "mysql"

return taskConfigInNomadFormat
}

func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig) map[string]interface{} {
func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig, destConfig *models.DestTaskConfig,
kafkaConfig *models.KafkaDestTaskConfig) map[string]interface{} {
taskConfigInNomadFormat := make(map[string]interface{})

addNotRequiredParamToMap(taskConfigInNomadFormat, config.DropTableIfExists, "DropTableIfExists")
Expand All @@ -594,7 +588,7 @@ func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig) map[string]inte
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.Gtid, "Gtid")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.ExpandSyntaxSupport, "ExpandSyntaxSupport")
addNotRequiredParamToMap(taskConfigInNomadFormat, config.MysqlSrcTaskConfig.DumpEntryLimit, "DumpEntryLimit")
taskConfigInNomadFormat["ConnectionConfig"] = buildMysqlConnectionConfigMap(config.ConnectionConfig)
taskConfigInNomadFormat["SrcConnectionConfig"] = buildMysqlConnectionConfigMap(config.ConnectionConfig)
}
// for Oracle
if config.OracleSrcTaskConfig != nil {
Expand All @@ -611,6 +605,24 @@ func buildDatabaseSrcTaskConfigMap(config *models.SrcTaskConfig) map[string]inte
taskConfigInNomadFormat["ReplicateDoDb"] = buildMysqlDataSourceConfigMap(config.ReplicateDoDb)
taskConfigInNomadFormat["ReplicateIgnoreDb"] = buildMysqlDataSourceConfigMap(config.ReplicateIgnoreDb)

if destConfig != nil {
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.ParallelWorkers, "ParallelWorkers")
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.UseMySQLDependency, "UseMySQLDependency")
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.DependencyHistorySize, "DependencyHistorySize")
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.BulkInsert1, "BulkInsert1")
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.BulkInsert2, "BulkInsert2")
addNotRequiredParamToMap(taskConfigInNomadFormat, destConfig.MysqlDestTaskConfig.SetGtidNext, "SetGtidNext")
taskConfigInNomadFormat["DestConnectionConfig"] = buildMysqlConnectionConfigMap(destConfig.ConnectionConfig)
} else if kafkaConfig != nil {
kafkaMap := make(map[string]interface{})
kafkaMap["Brokers"] = kafkaConfig.BrokerAddrs
kafkaMap["Topic"] = kafkaConfig.Topic
kafkaMap["MessageGroupMaxSize"] = kafkaConfig.MessageGroupMaxSize
kafkaMap["MessageGroupTimeout"] = kafkaConfig.MessageGroupTimeout
kafkaMap["Converter"] = kafka.CONVERTER_JSON
taskConfigInNomadFormat["KafkaConfig"] = kafkaMap
}

return taskConfigInNomadFormat
}

Expand Down Expand Up @@ -997,12 +1009,12 @@ func buildSrcTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfi
connectionConfig.User = internalTaskConfig.OracleConfig.User
connectionConfig.Password = internalTaskConfig.OracleConfig.Password
connectionConfig.ServiceName = internalTaskConfig.OracleConfig.ServiceName
} else if internalTaskConfig.ConnectionConfig != nil {
} else if internalTaskConfig.SrcConnectionConfig != nil {
connectionConfig.DatabaseType = "MySQL"
connectionConfig.Host = internalTaskConfig.ConnectionConfig.Host
connectionConfig.Port = internalTaskConfig.ConnectionConfig.Port
connectionConfig.User = internalTaskConfig.ConnectionConfig.User
connectionConfig.Password = internalTaskConfig.ConnectionConfig.Password
connectionConfig.Host = internalTaskConfig.SrcConnectionConfig.Host
connectionConfig.Port = internalTaskConfig.SrcConnectionConfig.Port
connectionConfig.User = internalTaskConfig.SrcConnectionConfig.User
connectionConfig.Password = internalTaskConfig.SrcConnectionConfig.Password
srcTaskDetail.TaskConfig.MysqlSrcTaskConfig = &models.MysqlSrcTaskConfig{
ExpandSyntaxSupport: internalTaskConfig.ExpandSyntaxSupport,
AutoGtid: internalTaskConfig.AutoGtid,
Expand All @@ -1025,10 +1037,10 @@ func buildSrcTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfi

func buildMysqlDestTaskDetail(taskName string, internalTaskConfig common.DtleTaskConfig, allocsFromNomad []nomadApi.Allocation) (destTaskDetail models.MysqlDestTaskDetail) {
mysqlConnectionConfig := &models.DatabaseConnectionConfig{
Host: internalTaskConfig.ConnectionConfig.Host,
Port: internalTaskConfig.ConnectionConfig.Port,
User: internalTaskConfig.ConnectionConfig.User,
Password: internalTaskConfig.ConnectionConfig.Password,
Host: internalTaskConfig.DestConnectionConfig.Host,
Port: internalTaskConfig.DestConnectionConfig.Port,
User: internalTaskConfig.DestConnectionConfig.User,
Password: internalTaskConfig.DestConnectionConfig.Password,
DatabaseType: "MySQL",
}
mysqlDestTaskConfig := &models.MysqlDestTaskConfig{
Expand Down Expand Up @@ -1292,7 +1304,7 @@ func createOrUpdateDBToKafkaJob(c echo.Context, logger g.LoggerType, jobType Dtl
}

func convertDBToKafkaJobToNomadJob(failover bool, apiJobParams *models.CreateOrUpdateMysqlToKafkaJobParamV2) (*nomadApi.Job, error) {
srcTask, srcDataCenter, err := buildNomadTaskGroupItem(buildDatabaseSrcTaskConfigMap(apiJobParams.SrcTask), apiJobParams.SrcTask.TaskName, apiJobParams.SrcTask.NodeId, failover, apiJobParams.Retry)
srcTask, srcDataCenter, err := buildNomadTaskGroupItem(buildDatabaseSrcTaskConfigMap(apiJobParams.SrcTask, nil, apiJobParams.DestTask), apiJobParams.SrcTask.TaskName, apiJobParams.SrcTask.NodeId, failover, apiJobParams.Retry)
if nil != err {
return nil, fmt.Errorf("build src task failed: %v", err)
}
Expand All @@ -1313,15 +1325,8 @@ func convertDBToKafkaJobToNomadJob(failover bool, apiJobParams *models.CreateOrU
}

func buildKafkaDestTaskConfigMap(config *models.KafkaDestTaskConfig) map[string]interface{} {
kafkaConfig := make(map[string]interface{})
taskConfigInNomadFormat := make(map[string]interface{})

kafkaConfig["Brokers"] = config.BrokerAddrs
kafkaConfig["Topic"] = config.Topic
kafkaConfig["MessageGroupMaxSize"] = config.MessageGroupMaxSize
kafkaConfig["MessageGroupTimeout"] = config.MessageGroupTimeout
kafkaConfig["Converter"] = kafka.CONVERTER_JSON
taskConfigInNomadFormat["KafkaConfig"] = kafkaConfig
taskConfigInNomadFormat["DestType"] = "kafka"

return taskConfigInNomadFormat
}
Expand Down Expand Up @@ -1750,24 +1755,17 @@ func GetJobPositionV2(c echo.Context) error {
if err != nil {
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v; connect to consul failed: %v", handler.ConsulAddr, err)))
}
sourceType, err := storeManager.GetSourceType(reqParam.JobId)
if err != nil {
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v; get job src source type failed: %v", handler.ConsulAddr, err)))
}
var position string
switch sourceType {
case "oracle":
scn, _, err := storeManager.GetOracleSCNPosForJob(reqParam.JobId)
if nil != err {
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v ; get scn failed: %v", handler.ConsulAddr, err)))
}
position = fmt.Sprintf("%d", scn)
case "mysql":
position, err = storeManager.GetGtidForJob(reqParam.JobId)
if nil != err {
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v ; get gtid failed: %v", handler.ConsulAddr, err)))
}

// TODO use 1 key for either task types
//scn, _, err := storeManager.GetOracleSCNPosForJob(reqParam.JobId)
//if nil != err {
// return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v ; get scn failed: %v", handler.ConsulAddr, err)))
//}
//position = fmt.Sprintf("%d", scn)
position, err = storeManager.GetGtidForJob(reqParam.JobId)
if nil != err {
return c.JSON(http.StatusInternalServerError, models.BuildBaseResp(fmt.Errorf("consul_addr=%v ; get gtid failed: %v", handler.ConsulAddr, err)))
}

return c.JSON(http.StatusOK, &models.JobPositionResp{
Expand Down
12 changes: 3 additions & 9 deletions api/handler/v2/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ func decryptMySQLPwd(apiSrcTask *models.SrcTaskConfig, apiDestTask *models.DestT

func validateTaskConfig(apiSrcTask *models.SrcTaskConfig, apiDestTask *models.DestTaskConfig) ([]*models.MysqlTaskValidationReport, error) {
taskValidationRes := []*models.MysqlTaskValidationReport{}
srcTaskConfig := common.DtleTaskConfig{}
// validate src task
if apiSrcTask.MysqlSrcTaskConfig != nil {
srcTaskConfig := common.DtleTaskConfig{}
srcTaskMap := buildDatabaseSrcTaskConfigMap(apiSrcTask)
srcTaskMap := buildDatabaseSrcTaskConfigMap(apiSrcTask, apiDestTask, nil)
if err := mapstructure.WeakDecode(srcTaskMap, &srcTaskConfig); err != nil {
return nil, fmt.Errorf("convert src task config failed: %v", err)
}
Expand Down Expand Up @@ -151,19 +151,13 @@ func validateTaskConfig(apiSrcTask *models.SrcTaskConfig, apiDestTask *models.De
}
// validate dest task
{
destTaskConfig := common.DtleTaskConfig{}
destTaskMap := buildDatabaseDestTaskConfigMap(apiDestTask)
if err := mapstructure.WeakDecode(destTaskMap, &destTaskConfig); err != nil {
return nil, fmt.Errorf("convert dest task config failed: %v", err)
}

validationRes := &models.MysqlTaskValidationReport{
TaskName: apiDestTask.TaskName,
}
destTaskInspector, err := mysql.NewApplier(
&common.ExecContext{},
&common.MySQLDriverConfig{
DtleTaskConfig: destTaskConfig,
DtleTaskConfig: srcTaskConfig, // #592: src sends the config to dest.
},
g.Logger.Named("http api: validateTaskConfig"),
nil, "", nil, nil, nil, context.Background())
Expand Down
25 changes: 0 additions & 25 deletions driver/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,6 @@ func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error) {
return config, nil
}

func (sm *StoreManager) GetSourceType(jobName string) (string, error) {
sm.logger.Debug("GetSourceType")

var err error
key := fmt.Sprintf("dtle/%v/SourceType", jobName)

kv, err := sm.consulStore.Get(key)
if err != nil {
return "", err
}
return string(kv.Value), nil
}

func (sm *StoreManager) PutSourceType(jobName, sourceType string) error {
sm.logger.Debug("PutSourceType")

var err error
key := fmt.Sprintf("dtle/%v/SourceType", jobName)
err = sm.consulStore.Put(key, []byte(sourceType), nil)
if err != nil {
return err
}
return nil
}

func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error) {
natsKey := fmt.Sprintf("dtle/%v/NatsAddr", jobName)
kv, err := sm.consulStore.Get(natsKey)
Expand Down
5 changes: 4 additions & 1 deletion driver/common/taskconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type DtleTaskConfig struct {
BulkInsert3 int `codec:"BulkInsert3"`
SlaveNetWriteTimeout int `codec:"SlaveNetWriteTimeout"`
BigTxSrcQueue int32 `codec:"BigTxSrcQueue"`
TwoWaySync bool `codec:"TwoWaySync"`

ParallelWorkers int `codec:"ParallelWorkers"`
DependencyHistorySize int `codec:"DependencyHistorySize"`
Expand All @@ -70,8 +71,10 @@ type DtleTaskConfig struct {
SkipCreateDbTable bool `codec:"SkipCreateDbTable"`
SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"`
SkipIncrementalCopy bool `codec:"SkipIncrementalCopy"`
ConnectionConfig *mysqlconfig.ConnectionConfig `codec:"ConnectionConfig"`
SrcConnectionConfig *mysqlconfig.ConnectionConfig `codec:"SrcConnectionConfig"`
DestConnectionConfig *mysqlconfig.ConnectionConfig `codec:"DestConnectionConfig"`
KafkaConfig *KafkaConfig `codec:"KafkaConfig"`
DestType string `codec:"DestType"`
// support oracle extractor/applier
OracleConfig *config.OracleConfig `codec:"OracleConfig"`
}
Expand Down
28 changes: 13 additions & 15 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ var (
hclspec.NewLiteral(`"/var/log/dtle"`)),
})

connectionConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"Host": hclspec.NewAttr("Host", "string", true),
"Port": hclspec.NewAttr("Port", "number", true),
"User": hclspec.NewAttr("User", "string", true),
"Password": hclspec.NewAttr("Password", "string", true),
"Charset": hclspec.NewDefault(hclspec.NewAttr("Charset", "string", false),
hclspec.NewLiteral(`"utf8mb4"`)),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a taskConfig within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
Expand Down Expand Up @@ -135,15 +143,11 @@ var (
"SkipIncrementalCopy": hclspec.NewAttr("SkipIncrementalCopy", "bool", false),
"SlaveNetWriteTimeout": hclspec.NewDefault(hclspec.NewAttr("SlaveNetWriteTimeout", "number", false),
hclspec.NewLiteral(`28800`)), // 8 hours
"ConnectionConfig": hclspec.NewBlock("ConnectionConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{
"Host": hclspec.NewAttr("Host", "string", true),
"Port": hclspec.NewAttr("Port", "number", true),
"User": hclspec.NewAttr("User", "string", true),
"Password": hclspec.NewAttr("Password", "string", true),
"Charset": hclspec.NewDefault(hclspec.NewAttr("Charset", "string", false),
hclspec.NewLiteral(`"utf8mb4"`)),
})),
"SrcConnectionConfig": hclspec.NewBlock("SrcConnectionConfig", false, connectionConfigSpec),
"DestConnectionConfig": hclspec.NewBlock("DestConnectionConfig", false, connectionConfigSpec),
"WaitOnJob": hclspec.NewAttr("WaitOnJob", "string", false),
"TwoWaySync": hclspec.NewDefault(hclspec.NewAttr("TwoWaySync", "bool", false),
hclspec.NewLiteral(`false`)),
"BulkInsert1": hclspec.NewDefault(hclspec.NewAttr("BulkInsert1", "number", false),
hclspec.NewLiteral(`4`)),
"BulkInsert2": hclspec.NewDefault(hclspec.NewAttr("BulkInsert2", "number", false),
Expand Down Expand Up @@ -178,6 +182,7 @@ var (
hclspec.NewLiteral(`67108864`)),
"SetGtidNext": hclspec.NewDefault(hclspec.NewAttr("SetGtidNext", "bool", false),
hclspec.NewLiteral(`false`)),
"DestType": hclspec.NewAttr("DestType", "string", false),
"OracleConfig": hclspec.NewBlock("OracleConfig", false, hclspec.NewObject(map[string]*hclspec.Spec{
"ServiceName": hclspec.NewAttr("ServiceName", "string", true),
"Host": hclspec.NewAttr("Host", "string", true),
Expand Down Expand Up @@ -605,13 +610,6 @@ func (d *Driver) verifyDriverConfig(config common.DtleTaskConfig) error {
return fmt.Errorf("expect 1 <= BulkInsert1 <= BulkInsert2. %v %v", config.BulkInsert1, config.BulkInsert2)
}

if config.ConnectionConfig != nil && config.OracleConfig != nil {
addErrMsgs("only one src connection config should be set")
}
if config.ConnectionConfig != nil && config.KafkaConfig != nil {
addErrMsgs("only one dest connection config should be set")
}

for _, doDb := range config.ReplicateDoDb {
if doDb.TableSchema == "" && doDb.TableSchemaRegex == "" {
addErrMsgs("TableSchema and TableSchemaRegex in ReplicateDoDb cannot both be blank")
Expand Down
28 changes: 24 additions & 4 deletions driver/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mysql
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -188,26 +189,45 @@ func (h *taskHandle) NewRunner(d *Driver) (runner DriverHandle, err error) {
return nil, errors.Wrap(err, "NewExtractor")
}
} else {
runner, err = mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
e, err := mysql.NewExtractor(ctx, h.driverConfig, h.logger, d.storeManager, h.waitCh, h.ctx)
if err != nil {
return nil, errors.Wrap(err, "NewOracleExtractor")
}
runner = e
if h.driverConfig.TwoWaySync {
ctx2 := &common.ExecContext{
Subject: ctx.Subject + "_dtrev",
StateDir: d.config.DataDir,
}
cfg2 := &common.MySQLDriverConfig{
DtleTaskConfig: common.DtleTaskConfig{
DestType: "mysql",
},
}
e.RevApplier, err = mysql.NewApplier(ctx2, cfg2, h.logger, d.storeManager, d.config.NatsAdvertise,
h.waitCh, d.eventer, h.taskConfig, h.ctx)
}

}
case common.TaskTypeDest:
h.logger.Debug("found dest", "allConfig", h.driverConfig)
if h.driverConfig.KafkaConfig != nil {
h.logger.Debug("found kafka", "KafkaConfig", h.driverConfig.KafkaConfig)
switch strings.ToLower(h.driverConfig.DestType) {
case "kafka":
runner, err = kafka.NewKafkaRunner(ctx, h.driverConfig.KafkaConfig, h.logger,
d.storeManager, d.config.NatsAdvertise, h.waitCh, h.ctx)
if err != nil {
return nil, errors.Wrap(err, "NewKafkaRunner")
}
} else {
case "mysql":
runner, err = mysql.NewApplier(ctx, h.driverConfig, h.logger, d.storeManager,
d.config.NatsAdvertise, h.waitCh, d.eventer, h.taskConfig, h.ctx)
if err != nil {
return nil, errors.Wrap(err, "NewApplier")
}
case "":
return nil, fmt.Errorf("DestType for dest task is empty")
default:
return nil, fmt.Errorf("unknown DestType for dest task")
}
case common.TaskTypeUnknown:
return nil, fmt.Errorf("unknown processor type: %+v", h.taskConfig.Name)
Expand Down
Loading

0 comments on commit ed41bb4

Please sign in to comment.