Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg(dm): modify the type of pos returned by function GetMasterStatus (#7815) #7842

Open
wants to merge 1 commit into
base: release-6.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions dm/pkg/conn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,255 @@ package conn

import (
"context"
<<<<<<< HEAD

"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/dbutil"

"github.com/pingcap/tiflow/dm/dm/config"
=======
"fmt"
"math"
"strconv"
"strings"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/dumpling/export"
tmysql "github.com/pingcap/tidb/parser/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
"go.uber.org/zap"
)

// GetGlobalVariable gets server's global variable.
func GetGlobalVariable(ctx *tcontext.Context, db *BaseDB, variable string) (value string, err error) {
failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
ctx.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}
variableName := items[0]
errCode, err1 := strconv.ParseUint(items[1], 10, 16)
if err1 != nil {
ctx.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}
if variable == variableName {
err = tmysql.NewErr(uint16(errCode))
ctx.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err))
failpoint.Return("", terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError))
}
})

conn, err := db.GetBaseConn(ctx.Context())
if err != nil {
return "", err
}
defer db.CloseConnWithoutErr(conn)
return getVariable(ctx, conn, variable, true)
}

func getVariable(ctx *tcontext.Context, conn *BaseConn, variable string, isGlobal bool) (value string, err error) {
var template string
if isGlobal {
template = "SHOW GLOBAL VARIABLES LIKE '%s'"
} else {
template = "SHOW VARIABLES LIKE '%s'"
}
query := fmt.Sprintf(template, variable)
row, err := conn.QuerySQL(ctx, query)
if err != nil {
return "", err
}
defer row.Close()

// Show an example.
/*
mysql> SHOW GLOBAL VARIABLES LIKE "binlog_format";
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
*/

if !row.Next() {
return "", terror.WithScope(terror.ErrDBDriverError.Generatef("variable %s not found", variable), conn.Scope)
}

err = row.Scan(&variable, &value)
if err != nil {
return "", terror.DBErrorAdapt(err, conn.Scope, terror.ErrDBDriverError)
}
return value, nil
}

// GetMasterStatus gets status from master.
// When the returned error is nil, the gtid must be not nil.
func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
string, uint64, string, string, string, error,
) {
var (
binlogName string
pos uint64
binlogDoDB string
binlogIgnoreDB string
gtidStr string
err error
)
// need REPLICATION SLAVE privilege
rows, err := db.QueryContext(ctx, `SHOW MASTER STATUS`)
if err != nil {
err = terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
defer rows.Close()

// Show an example.
/*
MySQL [test]> SHOW MASTER STATUS;
+-----------+----------+--------------+------------------+--------------------------------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+-----------+----------+--------------+------------------+--------------------------------------------+
| ON.000001 | 4822 | | | 85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46
+-----------+----------+--------------+------------------+--------------------------------------------+
*/
/*
For MariaDB,SHOW MASTER STATUS:
+--------------------+----------+--------------+------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
+--------------------+----------+--------------+------------------+
| mariadb-bin.000016 | 475 | | |
+--------------------+----------+--------------+------------------+
SELECT @@global.gtid_binlog_pos;
+--------------------------+
| @@global.gtid_binlog_pos |
+--------------------------+
| 0-1-2 |
+--------------------------+
*/

var rowsResult [][]string
if flavor == gmysql.MySQLFlavor {
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set")
if err != nil {
err = terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

switch {
case len(rowsResult) == 0:
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
case len(rowsResult[0]) != 5:
ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MySQL is not equal to 5, will not use the retrieved information")
err = terror.ErrIncorrectReturnColumnsNum.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
default:
binlogName = rowsResult[0][0]
var posInt uint64
posInt, err = strconv.ParseUint(rowsResult[0][1], 10, 64)
if err != nil {
err = terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = posInt
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
gtidStr = rowsResult[0][4]
}
} else {
rowsResult, err = export.GetSpecifiedColumnValuesAndClose(rows, "File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB")
if err != nil {
err = terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

switch {
case len(rowsResult) == 0:
err = terror.ErrNoMasterStatus.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
case len(rowsResult[0]) != 4:
ctx.L().DPanic("The number of columns that SHOW MASTER STATUS returns for MariaDB is not equal to 4, will not use the retrieved information")
err = terror.ErrIncorrectReturnColumnsNum.Generate()
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
default:
binlogName = rowsResult[0][0]
var posInt uint64
posInt, err = strconv.ParseUint(rowsResult[0][1], 10, 64)
if err != nil {
err = terror.DBErrorAdapt(err, db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
pos = posInt
binlogDoDB = rowsResult[0][2]
binlogIgnoreDB = rowsResult[0][3]
}
}

if flavor == gmysql.MariaDBFlavor {
gtidStr, err = GetGlobalVariable(ctx, db, "gtid_binlog_pos")
if err != nil {
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
}

if len(rowsResult) > 1 {
ctx.L().Warn("SHOW MASTER STATUS returns more than one row, will only use first row")
}
if rows.Close() != nil {
err = terror.DBErrorAdapt(rows.Err(), db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}
if rows.Err() != nil {
err = terror.DBErrorAdapt(rows.Err(), db.Scope, terror.ErrDBDriverError)
return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

return binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err
}

// GetPosAndGs get binlog position and gmysql.GTIDSet from `show master status`.
func GetPosAndGs(ctx *tcontext.Context, db *BaseDB, flavor string) (
binlogPos gmysql.Position,
gs gmysql.GTIDSet,
err error,
) {
binlogName, pos, _, _, gtidStr, err := GetMasterStatus(ctx, db, flavor)
if err != nil {
return
}
if pos > math.MaxUint32 {
ctx.L().Warn("the pos returned by GetMasterStatus beyonds the range of uint32")
}
binlogPos = gmysql.Position{
Name: binlogName,
Pos: uint32(pos),
}

gs, err = gtid.ParserGTID(flavor, gtidStr)
return
}

// GetBinlogDB get binlog_do_db and binlog_ignore_db from `show master status`.
func GetBinlogDB(ctx *tcontext.Context, db *BaseDB, flavor string) (string, string, error) {
// nolint:dogsled
_, _, binlogDoDB, binlogIgnoreDB, _, err := GetMasterStatus(ctx, db, flavor)
return binlogDoDB, binlogIgnoreDB, err
}

// LowerCaseTableNamesFlavor represents the type of db `lower_case_table_names` settings.
type LowerCaseTableNamesFlavor uint8

const (
// LCTableNamesSensitive represent lower_case_table_names = 0, case sensitive.
LCTableNamesSensitive LowerCaseTableNamesFlavor = 0
// LCTableNamesInsensitive represent lower_case_table_names = 1, case insensitive.
LCTableNamesInsensitive = 1
// LCTableNamesMixed represent lower_case_table_names = 2, table names are case-sensitive, but case-insensitive in usage.
LCTableNamesMixed = 2
>>>>>>> 195353e150 (pkg(dm): modify the type of pos returned by function GetMasterStatus (#7815))
)

// FetchTimeZoneSetting fetch target db global time_zone setting.
Expand Down
69 changes: 69 additions & 0 deletions dm/pkg/conn/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var _ = Suite(testUtilSuite{})

type testUtilSuite struct{}

<<<<<<< HEAD
func (s testUtilSuite) TestFetchTZSetting(c *C) {
m := InitMockDB(c)

Expand All @@ -33,4 +34,72 @@ func (s testUtilSuite) TestFetchTZSetting(c *C) {
tz, err := FetchTimeZoneSetting(context.Background(), &config.DBConfig{})
c.Assert(err, IsNil)
c.Assert(tz, Equals, "+01:00")
=======
cases := []struct {
rows *sqlmock.Rows
binlogName string
pos uint64
binlogDoDB string
binlogIgnoreDB string
gtidStr string
err error
flavor string
}{
// For MySQL
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}).
AddRow("ON.000001", 4822, "", "", "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46"),
"ON.000001",
4822,
"",
"",
"85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-46",
nil,
gmysql.MySQLFlavor,
},
// test unit64 position for MySQL
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}).
AddRow("ON.000002", 429496729500, "", "", "85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-500"),
"ON.000002",
429496729500,
"",
"",
"85ab69d1-b21f-11e6-9c5e-64006a8978d2:1-500",
nil,
gmysql.MySQLFlavor,
},
// For MariaDB
{
sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB"}).
AddRow("mariadb-bin.000016", 475, "", ""),
"mariadb-bin.000016",
475,
"",
"",
"0-1-2",
nil,
gmysql.MariaDBFlavor,
},
}
for _, ca := range cases {
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(ca.rows)
// For MariaDB
if ca.flavor == gmysql.MariaDBFlavor {
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'gtid_binlog_pos'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("gtid_binlog_pos", "0-1-2"),
)
}
binlogName, pos, binlogDoDB, binlogIgnoreDB, gtidStr, err := GetMasterStatus(tctx, baseDB, ca.flavor)
require.IsType(t, uint64(0), pos)
require.NoError(t, err)
require.Equal(t, ca.binlogName, binlogName)
require.Equal(t, ca.pos, pos)
require.Equal(t, ca.binlogDoDB, binlogDoDB)
require.Equal(t, ca.binlogIgnoreDB, binlogIgnoreDB)
require.Equal(t, ca.gtidStr, gtidStr)
require.NoError(t, mock.ExpectationsWereMet())
}
>>>>>>> 195353e150 (pkg(dm): modify the type of pos returned by function GetMasterStatus (#7815))
}