Skip to content

Commit

Permalink
dm: reduce gSet.String() usage by using zeroGSet for checking empty (#…
Browse files Browse the repository at this point in the history
…9944)

close #9676
  • Loading branch information
lichunzhu authored Nov 7, 2023
1 parent 1397f3f commit 955296d
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 20 deletions.
3 changes: 2 additions & 1 deletion dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -172,7 +173,7 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {

// verifySingleGTID verifies gSet whether only containing a single valid GTID.
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if gSet == nil || len(gSet.String()) == 0 {
if gtid.CheckGTIDSetEmpty(gSet) {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

Expand Down
3 changes: 2 additions & 1 deletion dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -717,7 +718,7 @@ func GenXIDEvent(header *replication.EventHeader, latestPos uint32, xid uint64)
// GenMariaDBGTIDListEvent generates a MariadbGTIDListEvent.
// ref: https://mariadb.com/kb/en/library/gtid_list_event/
func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, gSet gmysql.GTIDSet) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
if gtid.CheckGTIDSetEmpty(gSet) {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}

Expand Down
13 changes: 2 additions & 11 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,18 +365,9 @@ func IsFreshPosition(location Location, flavor string, cmpGTID bool) bool {
// -1, true if gSet1 is less than gSet2
//
// but if can't compare gSet1 and gSet2, will returns 0, false.
var (
emptyMySQLGTIDSet, _ = gmysql.ParseMysqlGTIDSet("")
emptyMariaDBGTIDSet, _ = gmysql.ParseMariadbGTIDSet("")
)

func CheckGTIDSetEmpty(gSet gmysql.GTIDSet) bool {
return gSet == nil || gSet.Equal(emptyMySQLGTIDSet) || gSet.Equal(emptyMariaDBGTIDSet)
}

func CompareGTID(gSet1, gSet2 gmysql.GTIDSet) (int, bool) {
gSetIsEmpty1 := CheckGTIDSetEmpty(gSet1)
gSetIsEmpty2 := CheckGTIDSetEmpty(gSet2)
gSetIsEmpty1 := gtid.CheckGTIDSetEmpty(gSet1)
gSetIsEmpty2 := gtid.CheckGTIDSetEmpty(gSet2)

switch {
case gSetIsEmpty1 && gSetIsEmpty2:
Expand Down
39 changes: 35 additions & 4 deletions dm/pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
)

var (
emptyMySQLGTIDSet, _ = mysql.ParseMysqlGTIDSet("")
emptyMariaDBGTIDSet, _ = mysql.ParseMariadbGTIDSet("")
)

// CheckGTIDSetEmpty is used to check whether a GTID set is zero.
func CheckGTIDSetEmpty(gSet mysql.GTIDSet) bool {
return gSet == nil || gSet.Equal(emptyMySQLGTIDSet) || gSet.Equal(emptyMariaDBGTIDSet)
}

// ParserGTID parses GTID from string. If the flavor is not specified, it will
// try mysql GTID first and then MariaDB GTID.
func ParserGTID(flavor, gtidStr string) (mysql.GTIDSet, error) {
Expand All @@ -42,7 +52,11 @@ func ParserGTID(flavor, gtidStr string) (mysql.GTIDSet, error) {
gtid, err = mysql.ParseGTIDSet(fla, gtidStr)
}
case mysql.MariaDBFlavor:
gtid, err = mysql.ParseGTIDSet(fla, gtidStr)
if IsZeroMariaDBGTIDSet(gtidStr) {
gtid, err = mysql.ParseGTIDSet(fla, "")
} else {
gtid, err = mysql.ParseGTIDSet(fla, gtidStr)
}
case "":
fla = mysql.MySQLFlavor
gtid, err = mysql.ParseGTIDSet(fla, gtidStr)
Expand Down Expand Up @@ -74,9 +88,6 @@ func MustZeroGTIDSet(flavor string) mysql.GTIDSet {
// IsZeroMySQLGTIDSet is used to meet this usage: when user wants to start binlog
// replication from scratch, a "uuid:0" (MySQL flavor) or "0-0-0" (mariaDB) GTID
// set must be written, in order to distinguish that user forgets to write it.
//
// For above two flavor, only "uuid:0" is illegal, so we use IsZeroMySQLGTIDSet
// to handle it.
func IsZeroMySQLGTIDSet(gStr string) bool {
sp := strings.Split(gStr, ",")
if len(sp) != 1 {
Expand All @@ -90,3 +101,23 @@ func IsZeroMySQLGTIDSet(gStr string) bool {
interval := strings.TrimSpace(sep[1])
return interval == "0"
}

// IsZeroMariaDBGTIDSet is used to meet this usage: when user wants to start binlog
// replication from scratch, a "uuid:0" (MySQL flavor) or "0-0-0" (mariaDB) GTID
// set must be written, in order to distinguish that user forgets to write it.
//
// For MariaDB, the GTID set like "0-0-0" will confuse IsZeroGTIDSet function,
// so we also need to check the interval part.
func IsZeroMariaDBGTIDSet(gStr string) bool {
sp := strings.Split(gStr, ",")
if len(sp) != 1 {
return false
}

sep := strings.Split(sp[0], "-")
if len(sep) != 3 {
return false
}
interval := strings.TrimSpace(sep[2])
return interval == "0"
}
60 changes: 59 additions & 1 deletion dm/pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestParseGTIDNoFlavor(t *testing.T) {
require.Error(t, err)
}

func TestIsNilGTIDSet(t *testing.T) {
func TestIsNilMySQLGTIDSet(t *testing.T) {
t.Parallel()

require.False(t, IsZeroMySQLGTIDSet(""))
Expand All @@ -79,6 +79,17 @@ func TestIsNilGTIDSet(t *testing.T) {
require.True(t, IsZeroMySQLGTIDSet(" xxxxx:0 "))
}

func TestIsNilMariaDBGTIDSet(t *testing.T) {
t.Parallel()

require.False(t, IsZeroMariaDBGTIDSet(""))
require.False(t, IsZeroMariaDBGTIDSet("xxxxx"))
require.False(t, IsZeroMariaDBGTIDSet("a-b-0,c-d:0"))
require.False(t, IsZeroMariaDBGTIDSet("xxxxx:1"))
require.True(t, IsZeroMariaDBGTIDSet("x-y-0"))
require.True(t, IsZeroMariaDBGTIDSet(" x-y-0 "))
}

func TestParseZeroAsEmptyGTIDSet(t *testing.T) {
t.Parallel()

Expand All @@ -94,3 +105,50 @@ func TestParseZeroAsEmptyGTIDSet(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "", gset.String())
}

func TestCheckGTIDSetEmpty(t *testing.T) {
t.Parallel()

testCases := []struct {
gsetStr string
isEmpty bool
flavor string
}{
{
"",
true,
mysql.MySQLFlavor,
},
{
"",
true,
mysql.MariaDBFlavor,
},
{
"3ccc475b-2343-11e7-be21-6c0b84d59f30:0",
true,
mysql.MySQLFlavor,
},
{
"0-0-0",
true,
mysql.MariaDBFlavor,
},
{
"3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14",
false,
mysql.MySQLFlavor,
},
{
"1-2-3",
false,
mysql.MariaDBFlavor,
},
}
for i, testCase := range testCases {
t.Logf("test case %d", i)
gset, err := ParserGTID(testCase.flavor, testCase.gsetStr)
require.NoError(t, err)
require.Equal(t, testCase.isEmpty, CheckGTIDSetEmpty(gset))
}
}
2 changes: 1 addition & 1 deletion dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func (cp *RemoteCheckPoint) IsOlderThanTablePoint(table *filter.Table, location
}
oldLocation := point.MySQLLocation()
// if we update enable-gtid = false to true, we need to compare binlog position instead of GTID before we save table point
cmpGTID := cp.cfg.EnableGTID && !(binlog.CheckGTIDSetEmpty(oldLocation.GetGTID()) && binlog.ComparePosition(oldLocation.Position, binlog.MinPosition) > 0)
cmpGTID := cp.cfg.EnableGTID && !(gtid.CheckGTIDSetEmpty(oldLocation.GetGTID()) && binlog.ComparePosition(oldLocation.Position, binlog.MinPosition) > 0)
cp.logCtx.L().Debug("compare table location whether is newer", zap.Stringer("location", location), zap.Stringer("old location", oldLocation), zap.Bool("cmpGTID", cmpGTID))

return binlog.CompareLocation(location, oldLocation, cmpGTID) <= 0
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/gtid"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
parserpkg "github.com/pingcap/tiflow/dm/pkg/parser"
Expand Down Expand Up @@ -3497,7 +3498,7 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
// 2. location already has GTID position
// 3. location is totally new, has no position info
// 4. location is too early thus not a COMMIT location, which happens when it's reset by other logic
if !s.cfg.EnableGTID || !binlog.CheckGTIDSetEmpty(location.GetGTID()) || location.Position.Name == "" || location.Position.Pos == 4 {
if !s.cfg.EnableGTID || !gtid.CheckGTIDSetEmpty(location.GetGTID()) || location.Position.Name == "" || location.Position.Pos == 4 {
return false, nil
}
// set enableGTID to false for new streamerController
Expand Down

0 comments on commit 955296d

Please sign in to comment.