Skip to content

Commit

Permalink
pkg(dm): improve the backward compatibility of “SHOW SLAVE HOSTS“ com…
Browse files Browse the repository at this point in the history
…mand (#7372)

close #5017
  • Loading branch information
lyzx2001 authored Nov 11, 2022
1 parent 8216bab commit 04173f6
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 59 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ ErrReaderReachEndOfFile,[code=11122:class=functional:scope=internal:level=low]
ErrMetadataNoBinlogLoc,[code=11123:class=functional:scope=upstream:level=low], "Message: didn't found binlog location in dumped metadata file %s, Workaround: Please check log of dump unit, there maybe errors when read upstream binlog status"
ErrPreviousGTIDNotExist,[code=11124:class=functional:scope=internal:level=high], "Message: no previous gtid event from binlog %s"
ErrNoMasterStatus,[code=11125:class=functional:scope=upstream:level=medium], "Message: upstream returns an empty result for SHOW MASTER STATUS, Workaround: Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS."
ErrIncorrectReturnColumnsNum,[code=11130:class=functional:scope=upstream:level=medium], "Message: upstream returns incorrect number of columns for SHOW MASTER STATUS, Workaround: Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS."
ErrBinlogNotLogColumn,[code=11126:class=binlog-op:scope=upstream:level=high], "Message: upstream didn't log enough columns in binlog, Workaround: Please check if session `binlog_row_image` variable is not FULL, restart task to the location from where FULL binlog_row_image is used."
ErrShardDDLOptimismNeedSkipAndRedirect,[code=11127:class=functional:scope=internal:level=high], "Message: receive conflict DDL for the optimistic shard ddl lock %s: %s. Now DM does not support conflicting DDLs, such as 'modify column'/'rename column'/'add column not null non default'."
ErrShardDDLOptimismAddNotFullyDroppedColumn,[code=11128:class=functional:scope=internal:level=medium], "Message: fail to resolve adding not fully dropped columns for optimistic shard ddl lock %s: %s, Workaround: Please use `binlog skip` command to skip this error."
Expand Down
6 changes: 6 additions & 0 deletions dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,12 @@ description = ""
workaround = "Please manually check the error from TiDB and handle it."
tags = ["internal", "high"]

[error.DM-functional-11130]
message = "upstream returns incorrect number of columns for SHOW MASTER STATUS"
description = ""
workaround = "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS."
tags = ["upstream", "medium"]

[error.DM-config-20001]
message = "checking item %s is not supported\n%s"
description = ""
Expand Down
24 changes: 8 additions & 16 deletions dm/pkg/binlog/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package binlog

import (
"path"
"strconv"
"time"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down Expand Up @@ -49,28 +51,18 @@ func GetBinaryLogs(ctx *tcontext.Context, db *conn.BaseDB) (FileSizes, error) {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer rows.Close()

rowColumns, err := rows.Columns()
files := make([]binlogSize, 0, 10)
var rowsResult [][]string
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "Log_name", "File_size")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
files := make([]binlogSize, 0, 10)
for rows.Next() {
var file string
var pos int64
var nullPtr interface{}
if len(rowColumns) == 2 {
err = rows.Scan(&file, &pos)
} else {
err = rows.Scan(&file, &pos, &nullPtr)
}
for _, rowResult := range rowsResult {
pos, err := strconv.ParseInt(rowResult[1], 10, 64)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
files = append(files, binlogSize{name: file, size: pos})
}
if rows.Err() != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
files = append(files, binlogSize{name: rowResult[0], size: pos})
}
return files, nil
}
Expand Down
69 changes: 58 additions & 11 deletions dm/pkg/conn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
tmysql "github.com/pingcap/tidb/parser/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/gtid"
Expand Down Expand Up @@ -117,10 +118,6 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
}
defer rows.Close()

if !rows.Next() {
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
// Show an example.
/*
MySQL [test]> SHOW MASTER STATUS;
Expand All @@ -144,23 +141,73 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
| 0-1-2 |
+--------------------------+
*/

var rowsResult [][]string
if flavor == gmysql.MySQLFlavor {
err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB, &gtidStr)
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set")
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

switch {
case len(rowsResult) == 0:
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
case len(rowsResult[0]) != 5:
ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MySQL is not equal to 5, will not use the retrieved information")
err = terror.ErrIncorrectReturnColumnsNum.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
default:
binlogName = rowsResult[0][0]
var posInt int64
posInt, err = strconv.ParseInt(rowsResult[0][1], 10, 64)
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = uint32(posInt)
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
gtidStr = rowsResult[0][4]
}
} else {
err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB)
}
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB")
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

switch {
case len(rowsResult) == 0:
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
case len(rowsResult[0]) != 4:
ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MariaDB is not equal to 4, will not use the retrieved information")
err = terror.ErrIncorrectReturnColumnsNum.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
default:
binlogName = rowsResult[0][0]
var posInt int64
posInt, err = strconv.ParseInt(rowsResult[0][1], 10, 64)
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = uint32(posInt)
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
}
}

if flavor == gmysql.MariaDBFlavor {
gtidStr, err = GetGlobalVariable(ctx, db, "gtid_binlog_pos")
if err != nil {
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
}

if rows.Next() {
if len(rowsResult) > 1 {
ctx.L().Warn("SHOW MASTER STATUS returns more than one row, will only use first row")
}
if rows.Close() != nil {
Expand Down
65 changes: 65 additions & 0 deletions dm/pkg/conn/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -59,3 +60,67 @@ func TestGetBinlogDB(t *testing.T) {
require.Equal(t, binlogIgnoreDB, "ignore_db")
require.Nil(t, mock.ExpectationsWereMet())
}

func TestGetMasterStatus(t *testing.T) {
ctx := context.Background()
tctx := tcontext.NewContext(ctx, log.L())

db, mock, err := sqlmock.New()
require.NoError(t, err)
baseDB := NewBaseDB(db)

cases := []struct {
rows *sqlmock.Rows
binlogName string
pos uint32
binlogDoDB string
binlogIgnoreDB string
gtidStr string
err error
flavor string
}{
// For MySQL
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}).
AddRow("ON.000001", 4822, "", "", "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46"),
"ON.000001",
4822,
"",
"",
"85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46",
nil,
gmysql.MySQLFlavor,
},
// For MariaDB
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}).
AddRow("mariadb-bin.000016", 475, "", ""),
"mariadb-bin.000016",
475,
"",
"",
"0-1-2",
nil,
gmysql.MariaDBFlavor,
},
}

for _, ca := range cases {
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(ca.rows)
// For MariaDB
if ca.flavor == gmysql.MariaDBFlavor {
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'gtid_binlog_pos'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("gtid_binlog_pos", "0-1-2"),
)
}
binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err := GetMasterStatus(tctx, baseDB, ca.flavor)
require.NoError(t, err)
require.Equal(t, ca.binlogName, binlogName)
require.Equal(t, ca.pos, pos)
require.Equal(t, ca.binlogDoDB, binlogDoDB)
require.Equal(t, ca.binlogIgnoreDB, binlogIgnoreDB)
require.Equal(t, ca.gtidStr, gtidStr)
require.NoError(t, mock.ExpectationsWereMet())
}
}
6 changes: 5 additions & 1 deletion dm/pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ const (

// syncer.
codeSyncerCancelledDDL

// pkg/utils.
codeIncorrectReturnColumnsNum
)

// Config related error code list.
Expand Down Expand Up @@ -894,7 +897,8 @@ var (
ErrPreviousGTIDNotExist = New(codePreviousGTIDNotExist, ClassFunctional, ScopeInternal, LevelHigh, "no previous gtid event from binlog %s", "")

// pkg/utils.
ErrNoMasterStatus = New(codeNoMasterStatus, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns an empty result for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.")
ErrNoMasterStatus = New(codeNoMasterStatus, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns an empty result for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.")
ErrIncorrectReturnColumnsNum = New(codeIncorrectReturnColumnsNum, ClassFunctional, ScopeUpstream, LevelMedium, "upstream returns incorrect number of columns for SHOW MASTER STATUS", "Please check the upstream settings like privileges, RDS settings to read data from SHOW MASTER STATUS.")

// pkg/binlog.
ErrBinlogNotLogColumn = New(codeBinlogNotLogColumn, ClassBinlogOp, ScopeUpstream, LevelHigh, "upstream didn't log enough columns in binlog", "Please check if session `binlog_row_image` variable is not FULL, restart task to the location from where FULL binlog_row_image is used.")
Expand Down
41 changes: 10 additions & 31 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbutil"
Expand Down Expand Up @@ -107,11 +108,6 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
}
defer rows.Close()

rowColumns, err := rows.Columns()
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

/*
in MySQL:
mysql> SHOW SLAVE HOSTS;
Expand All @@ -132,37 +128,20 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
+------------+-----------+------+-----------+
*/

var (
serverID sql.NullInt64
host sql.NullString
port sql.NullInt64
masterID sql.NullInt64
slaveUUID sql.NullString
)
serverIDs := make(map[uint32]struct{})
for rows.Next() {
if len(rowColumns) == 5 {
err = rows.Scan(&serverID, &host, &port, &masterID, &slaveUUID)
} else {
err = rows.Scan(&serverID, &host, &port, &masterID)
}
var rowsResult []string
rowsResult, err = export.GetSpecifiedColumnValueAndClose(rows, "Server_id")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
for _, serverID := range rowsResult {
// serverID will not be null
serverIDUInt, err := strconv.ParseUint(serverID, 10, 32)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

if serverID.Valid {
serverIDs[uint32(serverID.Int64)] = struct{}{}
} else {
// should never happened
log.L().Warn("get invalid server_id when execute `SHOW SLAVE HOSTS;`")
continue
}
}

if rows.Err() != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
serverIDs[uint32(serverIDUInt)] = struct{}{}
}

return serverIDs, nil
}

Expand Down
47 changes: 47 additions & 0 deletions dm/pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,50 @@ func TestCreateTableSQLToOneRow(t *testing.T) {
expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
require.Equal(t, expected, CreateTableSQLToOneRow(input))
}

func TestGetSlaveServerID(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)

cases := []struct {
rows *sqlmock.Rows
results map[uint32]struct{}
}{
// For MySQL
{
sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id", "Slave_UUID"}).
AddRow(192168010, "iconnect2", 3306, 192168011, "14cb6624-7f93-11e0-b2c0-c80aa9429562").
AddRow(1921680101, "athena", 3306, 192168011, "07af4990-f41f-11df-a566-7ac56fdaf645"),
map[uint32]struct{}{
192168010: {}, 1921680101: {},
},
},
// For MariaDB
{
sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id"}).
AddRow(192168010, "iconnect2", 3306, 192168011).
AddRow(1921680101, "athena", 3306, 192168011),
map[uint32]struct{}{
192168010: {}, 1921680101: {},
},
},
// For MariaDB, with Server_id greater than 2^31, to test uint conversion
{
sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id"}).
AddRow(2147483649, "iconnect2", 3306, 192168011).
AddRow(2147483650, "athena", 3306, 192168011),
map[uint32]struct{}{
2147483649: {}, 2147483650: {},
},
},
}

for _, ca := range cases {
mock.ExpectQuery("SHOW SLAVE HOSTS").WillReturnRows(ca.rows)
results, err2 := GetSlaveServerID(context.Background(), db)
require.NoError(t, err2)
require.Equal(t, ca.results, results)
}
}

0 comments on commit 04173f6

Please sign in to comment.