Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ ErrSyncerOperatorNotExist,[code=36065:class=sync-unit:scope=internal:level=low],
ErrSyncerEventNotExist,[code=36066:class=sync-unit:scope=internal:level=high], "Message: replace or inject event not exist, location: %s"
ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed."
ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode"
ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`."
ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog` or `mariadb-binlog`."
ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found"
ErrSyncerCancelledDDL,[code=11129:class=sync-unit:scope=internal:level=high], "Message: DDL %s executed in background and met error, Workaround: Please manually check the error from TiDB and handle it."
ErrSyncerReprocessWithSafeModeFail,[code=36071:class=sync-unit:scope=internal:level=medium], "Message: your `safe-mode-duration` in task.yaml is set to 0s, the task can't be re-processed without safe mode currently, Workaround: Please stop and re-start this task. If you want to start task successfully, you need set `safe-mode-duration` greater than `0s`."
Expand Down
6 changes: 6 additions & 0 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if _, ok := c.checkingItems[config.BinlogDBChecking]; ok {
c.checkList = append(c.checkList, checker.NewBinlogDBChecker(instance.sourceDB, instance.sourceDBinfo, info.sourceID2InterestedDB[i], instance.cfg.CaseSensitive))
}

if strings.Contains(instance.sourceDB.Version, "MariaDB") {
if _, ok := c.checkingItems[config.BinlogLegacyEventPosChecking]; ok {
c.checkList = append(c.checkList, checker.NewMariaDBBinlogLegacyEventPosChecker(instance.sourceDB.DB, instance.sourceDBinfo))
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
BinlogEnableChecking = "binlog_enable"
BinlogFormatChecking = "binlog_format"
BinlogRowImageChecking = "binlog_row_image"
BinlogLegacyEventPosChecking = "binlog_legacy_event_pos"
TableSchemaChecking = "table_schema"
ShardTableSchemaChecking = "schema_of_shard_tables"
ShardAutoIncrementIDChecking = "auto_increment_ID"
Expand Down Expand Up @@ -63,6 +64,7 @@ var AllCheckingItems = map[string]string{
BinlogEnableChecking: "binlog enable checking item",
BinlogFormatChecking: "binlog format checking item",
BinlogRowImageChecking: "binlog row image checking item",
BinlogLegacyEventPosChecking: "binlog legacy event pos checking item",
TableSchemaChecking: "table schema compatibility checking item",
ShardTableSchemaChecking: "consistent schema of shard tables checking item",
ShardAutoIncrementIDChecking: "conflict auto increment ID of shard tables checking item",
Expand Down
2 changes: 1 addition & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2173,7 +2173,7 @@ tags = ["internal", "high"]
[error.DM-sync-unit-36069]
message = "get binlog event error: %v"
description = ""
workaround = "Please check if the binlog file could be parsed by `mysqlbinlog`."
workaround = "Please check if the binlog file could be parsed by `mysqlbinlog` or `mariadb-binlog`."
tags = ["upstream", "high"]

[error.DM-sync-unit-36070]
Expand Down
67 changes: 66 additions & 1 deletion dm/pkg/checker/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (pc *MySQLBinlogFormatChecker) Check(ctx context.Context) *Result {
}
if strings.ToUpper(value) != "ROW" {
result.Errors = append(result.Errors, NewError("binlog_format is %s, and should be ROW", value))
result.Instruction = "MySQL as source: please execute 'set global binlog_format=ROW;'; AWS Aurora (MySQL)/RDS MySQL as source: please refer to the document to create a new DB parameter group and set the binlog_format=row: https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_WorkingWithDBInstanceParamGroups.html. Then modify the instance to use the new DB parameter group and restart the instance to take effect."
result.Instruction = "MySQL or MariaDB as source: please execute 'SET GLOBAL binlog_format=ROW;'; AWS Aurora (MySQL)/RDS MySQL as source: please refer to the document to create a new DB parameter group and set the binlog_format=row: https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_WorkingWithDBInstanceParamGroups.html. Then modify the instance to use the new DB parameter group and restart the instance to take effect."
return result
}
result.State = StateSuccess
Expand Down Expand Up @@ -195,6 +195,71 @@ func (pc *MySQLBinlogRowImageChecker) Name() string {
return "mysql_binlog_row_image"
}

/*****************************************************/

var mariaDBBinlogLegacyEventPosCheckingRequired MySQLVersion = [3]uint{11, 4, 0}

// MariaDBBinlogLegacyEventPosChecker checks mysql binlog_format.
type MariaDBBinlogLegacyEventPosChecker struct {
db *sql.DB
dbinfo *dbutil.DBConfig
}

// NewMariaDBBinlogLegacyEventPosChecker returns a RealChecker.
func NewMariaDBBinlogLegacyEventPosChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker {
return &MariaDBBinlogLegacyEventPosChecker{db: db, dbinfo: dbinfo}
}

// Check implements the RealChecker interface.
func (pc *MariaDBBinlogLegacyEventPosChecker) Check(ctx context.Context) *Result {
result := &Result{
Name: pc.Name(),
Desc: "check whether mariadb binlog_legacy_event_pos is ON",
State: StateFailure,
Extra: fmt.Sprintf("address of db instance - %s:%d", pc.dbinfo.Host, pc.dbinfo.Port),
}

// check version firstly
value, err := dbutil.ShowVersion(ctx, pc.db)
if err != nil {
markCheckError(result, err)
return result
}

version, err := toMySQLVersion(value)
if err != nil {
markCheckError(result, err)
return result
}

// for mariadb.version < 11.4, we don't need to check binlog_legacy_event_pos
if conn.IsMariaDB(value) && !version.Ge(mariaDBBinlogLegacyEventPosCheckingRequired) {
result.State = StateSuccess
return result
}

value, err = dbutil.ShowMySQLVariable(ctx, pc.db, "binlog_legacy_event_pos")
if err != nil {
markCheckError(result, err)
return result
}
if strings.ToUpper(value) != "ON" {
result.Errors = append(result.Errors, NewError("binlog_legacy_event_pos is %s, and should be ON", value))
result.Instruction = "MariaDB 11.4 and newer as source: please execute 'SET GLOBAL binlog_legacy_event_pos=ON;' or update the configuration and apply the configuration. You also need to start replicating from a binlog position that was created after chaning this setting."
return result
}
result.State = StateSuccess

return result
}

// Name implements the RealChecker interface.
func (pc *MariaDBBinlogLegacyEventPosChecker) Name() string {
return "mariadb_binlog_legacy_event_pos"
}

/*****************************************************/

// BinlogDBChecker checks if migrated dbs are in binlog_do_db or binlog_ignore_db.
type BinlogDBChecker struct {
db *conn.BaseDB
Expand Down
8 changes: 4 additions & 4 deletions dm/pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ type BaseDB struct {
doNotClose bool

// SELECT VERSION()
version string
Version string
}

// NewBaseDB returns *BaseDB object for test.
Expand All @@ -213,7 +213,7 @@ func NewBaseDB(db *sql.DB, scope terror.ErrScope, version string, doFuncInClose
Retry: &retry.FiniteRetryStrategy{},
Scope: scope,
doFuncInClose: doFuncInClose,
version: version,
Version: version,
}
}

Expand Down Expand Up @@ -385,15 +385,15 @@ func (d *BaseDB) needsModernTerminology() bool {
// - https://mariadb.com/docs/server/reference/sql-statements/administrative-sql-statements/show/show-replica-hosts
//
// Old syntax is still accepted.
if strings.Contains(d.version, "MariaDB") {
if strings.Contains(d.Version, "MariaDB") {
return false
}

// https://dev.mysql.com/doc/relnotes/mysql/8.4/en/news-8-4-0.html#mysqld-8-4-0-deprecation-removal
// MySQL 8.4 removed `SHOW MASTER STATUS`.
minVer := semver.New("8.4.0")

v, err := semver.NewVersion(d.version)
v, err := semver.NewVersion(d.Version)
if err != nil {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestNeedsModernTerminology(t *testing.T) {
}

for _, tc := range cases {
b.version = tc.version
require.Equal(t, tc.modern, b.needsModernTerminology(), b.version)
b.Version = tc.version
require.Equal(t, tc.modern, b.needsModernTerminology(), b.Version)
}
}
2 changes: 1 addition & 1 deletion dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ var (
ErrSyncerEventNotExist = New(codeSyncerEventNotExist, ClassSyncUnit, ScopeInternal, LevelHigh, "replace or inject event not exist, location: %s", "")
ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.")
ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "")
ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.")
ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog` or `mariadb-binlog`.")
ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "")
ErrSyncerCancelledDDL = New(codeSyncerCancelledDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "DDL %s executed in background and met error", "Please manually check the error from TiDB and handle it.")
ErrSyncerReprocessWithSafeModeFail = New(codeSyncerReprocessWithSafeModeFail, ClassSyncUnit, ScopeInternal, LevelMedium, "your `safe-mode-duration` in task.yaml is set to 0s, the task can't be re-processed without safe mode currently", "Please stop and re-start this task. If you want to start task successfully, you need set `safe-mode-duration` greater than `0s`.")
Expand Down