diff --git a/dm/pkg/binlog/common/replication.go b/dm/pkg/binlog/common/replication.go index 7e1b53c5b8c..3cd20911fa5 100644 --- a/dm/pkg/binlog/common/replication.go +++ b/dm/pkg/binlog/common/replication.go @@ -22,7 +22,7 @@ import ( var ( // MaxBinlogSyncerReconnect is the max reconnection times for binlog syncer in go-mysql. MaxBinlogSyncerReconnect = 60 - // SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout + // SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-replica.html#sysvar_slave_net_timeout SlaveReadTimeout = 1 * time.Minute // MasterHeartbeatPeriod is the master server send heartbeat period, ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html MasterHeartbeatPeriod = 30 * time.Second diff --git a/dm/pkg/binlog/event/common.go b/dm/pkg/binlog/event/common.go index 5437ec4f7fc..57f6b306b13 100644 --- a/dm/pkg/binlog/event/common.go +++ b/dm/pkg/binlog/event/common.go @@ -163,7 +163,11 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) { mariaGTID := singleGTID.(*gmysql.MariadbGTID) mariaGTID.SequenceNumber++ gtidSet := new(gmysql.MariadbGTIDSet) - gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID} + gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{ + mariaGTID.DomainID: { + mariaGTID.ServerID: mariaGTID, + }, + } clone = gtidSet default: err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor) @@ -203,11 +207,15 @@ func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) { if !ok { return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet) } - if len(mariaGTIDs.Sets) != 1 { - return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(mariaGTIDs.Sets), gSet) - } + gtidCount := 0 var mariaGTID *gmysql.MariadbGTID - for _, mariaGTID = range mariaGTIDs.Sets { + for _, set := range mariaGTIDs.Sets { + gtidCount += len(set) + for _, mariaGTID = range set { + } + } + if gtidCount != 1 { + return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(gtidCount, gSet) } return mariaGTID, nil default: diff --git a/dm/pkg/binlog/event/event.go b/dm/pkg/binlog/event/event.go index 1d8ca835dd1..f7b62536a93 100644 --- a/dm/pkg/binlog/event/event.go +++ b/dm/pkg/binlog/event/event.go @@ -736,21 +736,23 @@ func GenMariaDBGTIDListEvent(header *replication.EventHeader, latestPos uint32, return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Number of GTIDs %d", numOfGTIDs) } - for _, mGTID := range mariaDBGSet.Sets { - // Replication Domain ID, 4 bytes - err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID) - if err != nil { - return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID) - } - // Server_ID, 4 bytes - err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID) - if err != nil { - return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID) - } - // GTID sequence, 8 bytes - err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber) - if err != nil { - return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber) + for _, set := range mariaDBGSet.Sets { + for _, mGTID := range set { + // Replication Domain ID, 4 bytes + err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID) + if err != nil { + return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID) + } + // Server_ID, 4 bytes + err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID) + if err != nil { + return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID) + } + // GTID sequence, 8 bytes + err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber) + if err != nil { + return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber) + } } } diff --git a/dm/pkg/binlog/event/event_test.go b/dm/pkg/binlog/event/event_test.go index 7fa80e5af14..ad7a24de0ad 100644 --- a/dm/pkg/binlog/event/event_test.go +++ b/dm/pkg/binlog/event/event_test.go @@ -519,7 +519,6 @@ func TestGenRowsEvent(t *testing.T) { require.Equal(t, tableID, rowsEvBody.TableID) require.Equal(t, uint64(len(rows[0])), rowsEvBody.ColumnCount) require.Equal(t, 0, rowsEvBody.Version) // WRITE_ROWS_EVENTv0 - require.Nil(t, rowsEvBody.ExtraData) require.Equal(t, rows, rowsEvBody.Rows) // multi rows, with different length, invalid @@ -664,7 +663,7 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) { require.True(t, ok) require.NotNil(t, gtidListEvBody) require.Len(t, gtidListEvBody.GTIDs, 1) - require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID], gtidListEvBody.GTIDs[0]) + require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID][gtidListEvBody.GTIDs[0].ServerID], gtidListEvBody.GTIDs[0]) // valid gSet with multi GTIDs gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4") @@ -684,7 +683,9 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) { require.NotNil(t, gtidListEvBody) require.Len(t, gtidListEvBody.GTIDs, 4) for _, mGTID := range gtidListEvBody.GTIDs { - mGTID2, ok := mGSet.Sets[mGTID.DomainID] + set, ok := mGSet.Sets[mGTID.DomainID] + require.True(t, ok) + mGTID2, ok := set[mGTID.ServerID] require.True(t, ok) require.Equal(t, *mGTID2, mGTID) } diff --git a/dm/pkg/binlog/event/generator.go b/dm/pkg/binlog/event/generator.go index a2a4bb2b95c..ac7dca4605c 100644 --- a/dm/pkg/binlog/event/generator.go +++ b/dm/pkg/binlog/event/generator.go @@ -86,7 +86,11 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat if !ok || prevGSet == nil { return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs) } - prevGTID, ok := prevGSet.Sets[mariaGTID.DomainID] + set, ok := prevGSet.Sets[mariaGTID.DomainID] + if !ok { + return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs) + } + prevGTID, ok := set[mariaGTID.ServerID] if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber { return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs) } diff --git a/go.mod b/go.mod index 6c178eea201..d9d7f4b27ea 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/glebarez/go-sqlite v1.21.2 github.com/glebarez/sqlite v1.7.0 - github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd + github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2 github.com/go-ozzo/ozzo-validation/v4 v4.3.0 github.com/go-sql-driver/mysql v1.7.1 github.com/goccy/go-json v0.10.2 diff --git a/go.sum b/go.sum index 3e19a919235..3ad070d1b09 100644 --- a/go.sum +++ b/go.sum @@ -364,8 +364,8 @@ github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ4 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= -github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd h1:lqWdv8GEYqF1deivEmnSx81GfcAUZ/FoxilGxm/kwWs= -github.com/go-mysql-org/go-mysql v1.7.1-0.20230619063055-fd67d94318fd/go.mod h1:kOk/pFv3q5EPspyQfDRGLmEA6wfMvIeV4DmThwzkNzs= +github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2 h1:DU4x9rUmzjuOnXg5hgX5kUqj1HmKzN+rU3bsIy7oWok= +github.com/go-mysql-org/go-mysql v1.7.1-0.20240507075657-2bd4573edde2/go.mod h1:kwbF156Z9Sy8amP3E1SZp7/s/0PuJj/xKaOWToQiq0Y= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=