diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index 1b169d96ec5..af03c3b1fce 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -129,6 +129,7 @@ ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low] ErrMetadataNoBinlogLoc,[code=11123:class=functional:scope=upstream:level=low], "Message: didn't found binlog location in dumped metadata file %s, Workaround: Please check log of dump unit, there maybe errors when read upstream binlog status" ErrPreviousGTIDNotExist,[code=11124:class=functional:scope=internal:level=high], "Message: no previous gtid event from binlog %s" ErrNoMasterStatus,[code=11125:class=functional:scope=upstream:level=medium], "Message: upstream returns an empty result for SHOW MASTER STATUS, Workaround: Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS." +ErrIncorrectReturnColumnsNum,[code=11130:class=functional:scope=upstream:level=medium], "Message: upstream returns incorrect number of columns for SHOW MASTER STATUS, Workaround: Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS." ErrBinlogNotLogColumn,[code=11126:class=binlog-op:scope=upstream:level=high], "Message: upstream didn't log enough columns in binlog, Workaround: Please check if session `binlog_row_image` variable is not FULL, restart task to the location from where FULL binlog_row_image is used." ErrShardDDLOptimismNeedSkipAndRedirect,[code=11127:class=functional:scope=internal:level=high], "Message: receive conflict DDL for the optimistic shard ddl lock %s: %s. Now DM does not support conflicting DDLs, such as 'modify column'/'rename column'/'add column not null non default'." ErrShardDDLOptimismAddNotFullyDroppedColumn,[code=11128:class=functional:scope=internal:level=medium], "Message: fail to resolve adding not fully dropped columns for optimistic shard ddl lock %s: %s, Workaround: Please use `binlog skip` command to skip this error." diff --git a/dm/errors.toml b/dm/errors.toml index 497f9209195..5cab806bf21 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -808,6 +808,12 @@ description = "" workaround = "Please manually check the error from TiDB and handle it." tags = ["internal", "high"] +[error.DM-functional-11130] +message = "upstream returns incorrect number of columns for SHOW MASTER STATUS" +description = "" +workaround = "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS." +tags = ["upstream", "medium"] + [error.DM-config-20001] message = "checking item %s is not supported\n%s" description = "" diff --git a/dm/pkg/binlog/status.go b/dm/pkg/binlog/status.go index 2d9e6dce371..bf9f178bb59 100644 --- a/dm/pkg/binlog/status.go +++ b/dm/pkg/binlog/status.go @@ -15,9 +15,11 @@ package binlog import ( "path" + "strconv" "time" gmysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/pingcap/tidb/dumpling/export" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/terror" @@ -49,28 +51,18 @@ func GetBinaryLogs(ctx *tcontext.Context, db *conn.BaseDB) (FileSizes, error) { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } defer rows.Close() - - rowColumns, err := rows.Columns() + files := make([]binlogSize, 0, 10) + var rowsResult [][]string + rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "Log_name", "File_size") if err != nil { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - files := make([]binlogSize, 0, 10) - for rows.Next() { - var file string - var pos int64 - var nullPtr interface{} - if len(rowColumns) == 2 { - err = rows.Scan(&file, &pos) - } else { - err = rows.Scan(&file, &pos, &nullPtr) - } + for _, rowResult := range rowsResult { + pos, err := strconv.ParseInt(rowResult[1], 10, 64) if err != nil { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - files = append(files, binlogSize{name: file, size: pos}) - } - if rows.Err() != nil { - return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + files = append(files, binlogSize{name: rowResult[0], size: pos}) } return files, nil } diff --git a/dm/pkg/conn/utils.go b/dm/pkg/conn/utils.go index dc6acfb0d62..35576b68a9f 100644 --- a/dm/pkg/conn/utils.go +++ b/dm/pkg/conn/utils.go @@ -21,6 +21,7 @@ import ( gmysql "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/dumpling/export" tmysql "github.com/pingcap/tidb/parser/mysql" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/gtid" @@ -117,10 +118,6 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) ( } defer rows.Close() - if !rows.Next() { - err = terror.ErrNoMasterStatus.Generate() - return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err - } // Show an example. /* MySQL [test]> SHOW MASTER STATUS; @@ -144,15 +141,65 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) ( | 0-1-2 | +--------------------------+ */ + + var rowsResult [][]string if flavor == gmysql.MySQLFlavor { - err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB, >idStr) + rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set") + if err != nil { + err = terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + } + + switch { + case len(rowsResult) == 0: + err = terror.ErrNoMasterStatus.Generate() + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + case len(rowsResult[0]) != 5: + ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MySQL is not equal to 5, will not use the retrieved information") + err = terror.ErrIncorrectReturnColumnsNum.Generate() + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + default: + binlogName = rowsResult[0][0] + var posInt int64 + posInt, err = strconv.ParseInt(rowsResult[0][1], 10, 64) + if err != nil { + err = terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + } + pos = uint32(posInt) + binlogDoDB = rowsResult[0][2] + binlogIgnoreDB = rowsResult[0][3] + gtidStr = rowsResult[0][4] + } } else { - err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB) - } - if err != nil { - err = terror.DBErrorAdapt(err, terror.ErrDBDriverError) - return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB") + if err != nil { + err = terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + } + + switch { + case len(rowsResult) == 0: + err = terror.ErrNoMasterStatus.Generate() + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + case len(rowsResult[0]) != 4: + ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MariaDB is not equal to 4, will not use the retrieved information") + err = terror.ErrIncorrectReturnColumnsNum.Generate() + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + default: + binlogName = rowsResult[0][0] + var posInt int64 + posInt, err = strconv.ParseInt(rowsResult[0][1], 10, 64) + if err != nil { + err = terror.DBErrorAdapt(err, terror.ErrDBDriverError) + return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err + } + pos = uint32(posInt) + binlogDoDB = rowsResult[0][2] + binlogIgnoreDB = rowsResult[0][3] + } } + if flavor == gmysql.MariaDBFlavor { gtidStr, err = GetGlobalVariable(ctx, db, "gtid_binlog_pos") if err != nil { @@ -160,7 +207,7 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) ( } } - if rows.Next() { + if len(rowsResult) > 1 { ctx.L().Warn("SHOW MASTER STATUS returns more than one row, will only use first row") } if rows.Close() != nil { diff --git a/dm/pkg/conn/utils_test.go b/dm/pkg/conn/utils_test.go index af00418674d..318812d23ea 100644 --- a/dm/pkg/conn/utils_test.go +++ b/dm/pkg/conn/utils_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + gmysql "github.com/go-mysql-org/go-mysql/mysql" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/stretchr/testify/require" @@ -59,3 +60,67 @@ func TestGetBinlogDB(t *testing.T) { require.Equal(t, binlogIgnoreDB, "ignore_db") require.Nil(t, mock.ExpectationsWereMet()) } + +func TestGetMasterStatus(t *testing.T) { + ctx := context.Background() + tctx := tcontext.NewContext(ctx, log.L()) + + db, mock, err := sqlmock.New() + require.NoError(t, err) + baseDB := NewBaseDB(db) + + cases := []struct { + rows *sqlmock.Rows + binlogName string + pos uint32 + binlogDoDB string + binlogIgnoreDB string + gtidStr string + err error + flavor string + }{ + // For MySQL + { + sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}). + AddRow("ON.000001", 4822, "", "", "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46"), + "ON.000001", + 4822, + "", + "", + "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46", + nil, + gmysql.MySQLFlavor, + }, + // For MariaDB + { + sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}). + AddRow("mariadb-bin.000016", 475, "", ""), + "mariadb-bin.000016", + 475, + "", + "", + "0-1-2", + nil, + gmysql.MariaDBFlavor, + }, + } + + for _, ca := range cases { + mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(ca.rows) + // For MariaDB + if ca.flavor == gmysql.MariaDBFlavor { + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'gtid_binlog_pos'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("gtid_binlog_pos", "0-1-2"), + ) + } + binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err := GetMasterStatus(tctx, baseDB, ca.flavor) + require.NoError(t, err) + require.Equal(t, ca.binlogName, binlogName) + require.Equal(t, ca.pos, pos) + require.Equal(t, ca.binlogDoDB, binlogDoDB) + require.Equal(t, ca.binlogIgnoreDB, binlogIgnoreDB) + require.Equal(t, ca.gtidStr, gtidStr) + require.NoError(t, mock.ExpectationsWereMet()) + } +} diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 6a312dba353..687908228b1 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -198,6 +198,9 @@ const ( // syncer. codeSyncerCancelledDDL + + // pkg/utils. + codeIncorrectReturnColumnsNum ) // Config related error code list. @@ -894,7 +897,8 @@ var ( ErrPreviousGTIDNotExist = New(codePreviousGTIDNotExist, ClassFunctional, ScopeInternal, LevelHigh, "no previous gtid event from binlog %s", "") // pkg/utils. - ErrNoMasterStatus = New(codeNoMasterStatus, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns an empty result for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.") + ErrNoMasterStatus = New(codeNoMasterStatus, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns an empty result for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.") + ErrIncorrectReturnColumnsNum = New(codeIncorrectReturnColumnsNum, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns incorrect number of columns for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.") // pkg/binlog. ErrBinlogNotLogColumn = New(codeBinlogNotLogColumn, ClassBinlogOp, ScopeUpstream, LevelHigh, "upstream didn't log enough columns in binlog", "Please check if session `binlog_row_image` variable is not FULL, restart task to the location from where FULL binlog_row_image is used.") diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index 8a0d24b5aee..875d3808b25 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -28,6 +28,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/dumpling/export" "github.com/pingcap/tidb/parser" tmysql "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/dbutil" @@ -107,11 +108,6 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err } defer rows.Close() - rowColumns, err := rows.Columns() - if err != nil { - return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) - } - /* in MySQL: mysql> SHOW SLAVE HOSTS; @@ -132,37 +128,20 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err +------------+-----------+------+-----------+ */ - var ( - serverID sql.NullInt64 - host sql.NullString - port sql.NullInt64 - masterID sql.NullInt64 - slaveUUID sql.NullString - ) serverIDs := make(map[uint32]struct{}) - for rows.Next() { - if len(rowColumns) == 5 { - err = rows.Scan(&serverID, &host, &port, &masterID, &slaveUUID) - } else { - err = rows.Scan(&serverID, &host, &port, &masterID) - } + var rowsResult []string + rowsResult, err = export.GetSpecifiedColumnValueAndClose(rows, "Server_id") + if err != nil { + return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + for _, serverID := range rowsResult { + // serverID will not be null + serverIDUInt, err := strconv.ParseUint(serverID, 10, 32) if err != nil { return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError) } - - if serverID.Valid { - serverIDs[uint32(serverID.Int64)] = struct{}{} - } else { - // should never happened - log.L().Warn("get invalid server_id when execute `SHOW SLAVE HOSTS;`") - continue - } - } - - if rows.Err() != nil { - return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError) + serverIDs[uint32(serverIDUInt)] = struct{}{} } - return serverIDs, nil } diff --git a/dm/pkg/utils/db_test.go b/dm/pkg/utils/db_test.go index 20357f1ff46..fc655d62b84 100644 --- a/dm/pkg/utils/db_test.go +++ b/dm/pkg/utils/db_test.go @@ -389,3 +389,50 @@ func TestCreateTableSQLToOneRow(t *testing.T) { expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin" require.Equal(t, expected, CreateTableSQLToOneRow(input)) } + +func TestGetSlaveServerID(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + + cases := []struct { + rows *sqlmock.Rows + results map[uint32]struct{} + }{ + // For MySQL + { + sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id", "Slave_UUID"}). + AddRow(192168010, "iconnect2", 3306, 192168011, "14cb6624-7f93-11e0-b2c0-c80aa9429562"). + AddRow(1921680101, "athena", 3306, 192168011, "07af4990-f41f-11df-a566-7ac56fdaf645"), + map[uint32]struct{}{ + 192168010: {}, 1921680101: {}, + }, + }, + // For MariaDB + { + sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id"}). + AddRow(192168010, "iconnect2", 3306, 192168011). + AddRow(1921680101, "athena", 3306, 192168011), + map[uint32]struct{}{ + 192168010: {}, 1921680101: {}, + }, + }, + // For MariaDB, with Server_id greater than 2^31, to test uint conversion + { + sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id"}). + AddRow(2147483649, "iconnect2", 3306, 192168011). + AddRow(2147483650, "athena", 3306, 192168011), + map[uint32]struct{}{ + 2147483649: {}, 2147483650: {}, + }, + }, + } + + for _, ca := range cases { + mock.ExpectQuery("SHOW SLAVE HOSTS").WillReturnRows(ca.rows) + results, err2 := GetSlaveServerID(context.Background(), db) + require.NoError(t, err2) + require.Equal(t, ca.results, results) + } +}