Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg(dm): improve the backward compatibility of “SHOW SLAVE HOSTS“ command #7372

Merged
merged 21 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
899f734
improve backward compatibility of 'SHOW SLAVE HOSTS' command
lyzx2001 Oct 14, 2022
cc3266f
Merge branch 'master' into issue5017
lyzx2001 Oct 26, 2022
2dcd1fa
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 26, 2022
49f4a58
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 26, 2022
904eaf0
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 26, 2022
b654076
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 27, 2022
c5c9e8d
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 27, 2022
1a72e6a
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Oct 28, 2022
8c3dbd1
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 2, 2022
9cf3acf
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 2, 2022
fd98b5d
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
103bfd2
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
bf3e38b
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
c255c95
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
4510e80
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
cfe9a0c
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 3, 2022
18519ad
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 4, 2022
ceb4210
improve backward compatibility of 'SHAOW SLAVE HOSTS' command
lyzx2001 Nov 4, 2022
1c100ab
improve backward compatibility of 'SHOW SLAVE HOSTS' command
lyzx2001 Nov 10, 2022
94f530a
pkg(dm): improve backward compatibility of 'SHOW SLAVE HOSTS' command
lyzx2001 Nov 11, 2022
afa3f8f
Merge branch 'master' into issue5017
ti-chi-bot Nov 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions dm/pkg/binlog/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package binlog

import (
"path"
"strconv"
"time"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/terror"
Expand Down Expand Up @@ -49,28 +51,18 @@ func GetBinaryLogs(ctx *tcontext.Context, db *conn.BaseDB) (FileSizes, error) {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer rows.Close()

rowColumns, err := rows.Columns()
files := make([]binlogSize, 0, 10)
var rowsResult [][]string
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "Log_name", "File_size")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
files := make([]binlogSize, 0, 10)
for rows.Next() {
var file string
var pos int64
var nullPtr interface{}
if len(rowColumns) == 2 {
err = rows.Scan(&file, &pos)
} else {
err = rows.Scan(&file, &pos, &nullPtr)
}
for _, rowResult := range rowsResult {
pos, err := strconv.ParseInt(rowResult[1], 10, 64)
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
files = append(files, binlogSize{name: file, size: pos})
}
if rows.Err() != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
files = append(files, binlogSize{name: rowResult[0], size: pos})
}
return files, nil
}
Expand Down
59 changes: 48 additions & 11 deletions dm/pkg/conn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
tmysql "github.com/pingcap/tidb/parser/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/gtid"
Expand Down Expand Up @@ -117,10 +118,6 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
}
defer rows.Close()

if !rows.Next() {
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
// Show an example.
/*
MySQL [test]> SHOW MASTER STATUS;
Expand All @@ -144,23 +141,63 @@ func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
| 0-1-2 |
+--------------------------+
*/

var rowsResult [][]string
if flavor == gmysql.MySQLFlavor {
err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB, &gtidStr)
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set")
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

if len(rowsResult) == 0 {
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
} else if len(rowsResult[0]) != 0 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
binlogName = rowsResult[0][0]
var posInt int
posInt, err = strconv.Atoi(rowsResult[0][1])
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = uint32(posInt)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
gtidStr = rowsResult[0][4]
}
} else {
err = rows.Scan(&binlogName, &pos, &binlogDoDB, &binlogIgnoreDB)
}
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB")
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

if len(rowsResult) == 0 {
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
} else if len(rowsResult[0]) != 0 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
binlogName = rowsResult[0][0]
var posInt int
posInt, err = strconv.Atoi(rowsResult[0][1])
if err != nil {
err = terror.DBErrorAdapt(err, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = uint32(posInt)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
}
}

if flavor == gmysql.MariaDBFlavor {
gtidStr, err = GetGlobalVariable(ctx, db, "gtid_binlog_pos")
if err != nil {
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
}

if rows.Next() {
if len(rowsResult) > 1 {
ctx.L().Warn("SHOW MASTER STATUS returns more than one row, will only use first row")
}
if rows.Close() != nil {
Expand Down
69 changes: 69 additions & 0 deletions dm/pkg/conn/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
gmysql "github.com/go-mysql-org/go-mysql/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -59,3 +60,71 @@ func TestGetBinlogDB(t *testing.T) {
require.Equal(t, binlogIgnoreDB, "ignore_db")
require.Nil(t, mock.ExpectationsWereMet())
}

func TestGetMasterStatus(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), DefaultDBTimeout)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

tctx := tcontext.NewContext(ctx, log.L())

db, mock, err := sqlmock.New()
require.NoError(t, err)
baseDB := NewBaseDB(db)

cases := []struct {
rows *sqlmock.Rows
binlogName string
pos uint32
binlogDoDB string
binlogIgnoreDB string
gtidStr string
err error
flavor string
}{
// For MySQL
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}).
AddRow("ON.000001", 4822, "", "", "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46"),
"ON.000001",
4822,
"",
"",
"85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46",
nil,
gmysql.MySQLFlavor,
},
// For MariaDB
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}).
AddRow("mariadb-bin.000016", 475, "", ""),
"mariadb-bin.000016",
475,
"",
"",
"0-1-2",
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
nil,
gmysql.MariaDBFlavor,
},
}

for _, ca := range cases {
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(ca.rows)
// For MariaDB
if ca.flavor == gmysql.MariaDBFlavor {
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'gtid_binlog_pos'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("gtid_binlog_pos", "0-1-2"),
)
}
binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err := GetMasterStatus(tctx, baseDB, ca.flavor)
require.NoError(t, err)
require.Equal(t, ca.binlogName, binlogName)
require.Equal(t, ca.pos, pos)
require.Equal(t, ca.binlogDoDB, binlogDoDB)
require.Equal(t, ca.binlogIgnoreDB, binlogIgnoreDB)
require.Equal(t, ca.gtidStr, gtidStr)
require.NoError(t, mock.ExpectationsWereMet())
}

require.NoError(t, mock.ExpectationsWereMet())
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
41 changes: 10 additions & 31 deletions dm/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbutil"
Expand Down Expand Up @@ -107,11 +108,6 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
}
defer rows.Close()

rowColumns, err := rows.Columns()
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

/*
in MySQL:
mysql> SHOW SLAVE HOSTS;
Expand All @@ -132,37 +128,20 @@ func GetSlaveServerID(ctx context.Context, db *sql.DB) (map[uint32]struct{}, err
+------------+-----------+------+-----------+
*/

var (
serverID sql.NullInt64
host sql.NullString
port sql.NullInt64
masterID sql.NullInt64
slaveUUID sql.NullString
)
serverIDs := make(map[uint32]struct{})
for rows.Next() {
if len(rowColumns) == 5 {
err = rows.Scan(&serverID, &host, &port, &masterID, &slaveUUID)
} else {
err = rows.Scan(&serverID, &host, &port, &masterID)
}
var rowsResult []string
rowsResult, err = export.GetSpecifiedColumnValueAndClose(rows, "Server_id")
if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
for _, serverID := range rowsResult {
// serverID will not be null
serverIDInt, err := strconv.Atoi(serverID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may raise error when int is 32-bit but serverID is in the upper-half of uint32 values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use https://pkg.go.dev/strconv#ParseUint, don't use int because its range is undetermined

if err != nil {
return nil, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}

if serverID.Valid {
serverIDs[uint32(serverID.Int64)] = struct{}{}
} else {
// should never happened
log.L().Warn("get invalid server_id when execute `SHOW SLAVE HOSTS;`")
continue
}
}

if rows.Err() != nil {
return nil, terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
serverIDs[uint32(serverIDInt)] = struct{}{}
}

return serverIDs, nil
}

Expand Down
39 changes: 39 additions & 0 deletions dm/pkg/utils/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,42 @@ func TestCreateTableSQLToOneRow(t *testing.T) {
expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin"
require.Equal(t, expected, CreateTableSQLToOneRow(input))
}

func TestGetSlaveServerID(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)

cases := []struct {
rows *sqlmock.Rows
results map[uint32]struct{}
}{
{
sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id", "Slave_UUID"}).
AddRow(192168010, "iconnect2", 3306, 192168011, "14cb6624-7f93-11e0-b2c0-c80aa9429562").
AddRow(1921680101, "athena", 3306, 192168011, "07af4990-f41f-11df-a566-7ac56fdaf645"),
map[uint32]struct{}{
192168010: {}, 1921680101: {},
},
},
{
sqlmock.NewRows([]string{"Server_id", "Host", "Port", "Master_id"}).
AddRow(192168010, "iconnect2", 3306, 192168011).
AddRow(1921680101, "athena", 3306, 192168011),
map[uint32]struct{}{
192168010: {}, 1921680101: {},
},
},
}

for _, ca := range cases {
mock.ExpectQuery("SHOW SLAVE HOSTS").WillReturnRows(ca.rows)
results, err2 := GetSlaveServerID(context.Background(), db)
require.NoError(t, err2)
require.Equal(t, ca.results, results)
require.NoError(t, mock.ExpectationsWereMet())
}

require.NoError(t, mock.ExpectationsWereMet())
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}