Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay, syncer(dm): rename UUIDSuffix to RelaySubDirSuffix and others #5566

Merged
merged 8 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error) {
} else {
// set UUIDSuffix even not checkpoint exist
// so we will still remove relay dir
w.cfg.UUIDSuffix = binlog.MinUUIDSuffix
w.cfg.UUIDSuffix = binlog.MinRelaySubDirSuffix
}

// 2. initial relay holder, the cfg's password need decrypt
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func (w *SourceWorker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest
if !w.subTaskEnabled.Load() {
w.l.Info("worker received purge-relay but didn't handling subtasks, read global checkpoint to decided active relay log")

uuid := w.relayHolder.Status(nil).RelaySubDir
subDir := w.relayHolder.Status(nil).RelaySubDir

_, _, subTaskCfgs, _, err := w.fetchSubTasksAndAdjust()
if err != nil {
Expand All @@ -1013,9 +1013,9 @@ func (w *SourceWorker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest
}
w.l.Info("update active relay log with",
zap.String("task name", subTaskCfg.Name),
zap.String("uuid", uuid),
zap.String("subDir", subDir),
zap.String("binlog name", loc.Position.Name))
if err3 := streamer.GetReaderHub().UpdateActiveRelayLog(subTaskCfg.Name, uuid, loc.Position.Name); err3 != nil {
if err3 := streamer.GetReaderHub().UpdateActiveRelayLog(subTaskCfg.Name, subDir, loc.Position.Name); err3 != nil {
w.l.Error("Error when update active relay log", zap.Error(err3))
}
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/binlog/filename.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func ConstructFilename(baseName, seq string) string {

// ConstructFilenameWithUUIDSuffix constructs a binlog filename with UUID suffix.
func ConstructFilenameWithUUIDSuffix(originalName Filename, uuidSuffix string) string {
return fmt.Sprintf("%s%s%s%s%s", originalName.BaseName, posUUIDSuffixSeparator, uuidSuffix, binlogFilenameSep, originalName.Seq)
return fmt.Sprintf("%s%s%s%s%s", originalName.BaseName, posRelaySubDirSuffixSeparator, uuidSuffix, binlogFilenameSep, originalName.Seq)
}

// SplitFilenameWithUUIDSuffix analyzes a binlog filename with UUID suffix.
func SplitFilenameWithUUIDSuffix(filename string) (baseName, uuidSuffix, seq string, err error) {
items1 := strings.Split(filename, posUUIDSuffixSeparator)
items1 := strings.Split(filename, posRelaySubDirSuffixSeparator)
if len(items1) != 2 {
return "", "", "", terror.ErrBinlogInvalidFilenameWithUUIDSuffix.Generate(filename)
}
Expand Down
69 changes: 35 additions & 34 deletions dm/pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ import (
)

const (
// in order to differ binlog pos from multi (switched) masters, we added a UUID-suffix field into binlogPos.Name
// and we also need support: with UUIDSuffix's pos should always > without UUIDSuffix's pos, so we can update from @without to @with automatically
// conversion: originalPos.NamePrefix + posUUIDSuffixSeparator + UUIDSuffix + binlogFilenameSep + originalPos.NameSuffix => convertedPos.Name
// UUIDSuffix is the suffix of sub relay directory name, and when new sub directory created, UUIDSuffix is incremented
// eg. mysql-bin.000003 in c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002 => mysql-bin|000002.000003
// where `000002` in `c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002` is the UUIDSuffix.
posUUIDSuffixSeparator = "|"
// MinUUIDSuffix is same as relay.MinUUIDSuffix.
MinUUIDSuffix = 1
// in order to differ binlog position from multiple (switched) masters, we added a suffix which comes from relay log
// subdirectory into binlogPos.Name. And we also need support position with RelaySubDirSuffix should always > position
// without RelaySubDirSuffix, so we can continue from latter to former automatically.
// convertedPos.BinlogName =
// originalPos.BinlogBaseName + posRelaySubDirSuffixSeparator + RelaySubDirSuffix + binlogFilenameSep + originalPos.BinlogSeq
// eg. mysql-bin.000003 under folder c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002 => mysql-bin|000002.000003
// when new relay log subdirectory is created, RelaySubDirSuffix should increase.
posRelaySubDirSuffixSeparator = "|"
// MinRelaySubDirSuffix is same as relay.MinRelaySubDirSuffix.
MinRelaySubDirSuffix = 1
// FileHeaderLen is the length of binlog file header.
FileHeaderLen = 4
)
Expand Down Expand Up @@ -87,18 +88,18 @@ func PositionFromPosStr(str string) (gmysql.Position, error) {
}

// RealMySQLPos parses a relay position and returns a mysql position and whether error occurs
// if parsed successfully and `UUIDSuffix` exists, sets position Name to
// `originalPos.NamePrefix + binlogFilenameSep + originalPos.NameSuffix`.
// if parsed successfully and `RelaySubDirSuffix` in binlog filename exists, sets position Name to
// `originalPos.BinlogBaseName + binlogFilenameSep + originalPos.BinlogSeq`.
// if parsed failed returns the given position and the traced error.
func RealMySQLPos(pos gmysql.Position) (gmysql.Position, error) {
parsed, err := ParseFilename(pos.Name)
if err != nil {
return pos, err
}

sepIdx := strings.LastIndex(parsed.BaseName, posUUIDSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posUUIDSuffixSeparator) < len(parsed.BaseName) {
if !verifyUUIDSuffix(parsed.BaseName[sepIdx+len(posUUIDSuffixSeparator):]) {
sepIdx := strings.LastIndex(parsed.BaseName, posRelaySubDirSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posRelaySubDirSuffixSeparator) < len(parsed.BaseName) {
if !verifyRelaySubDirSuffix(parsed.BaseName[sepIdx+len(posRelaySubDirSuffixSeparator):]) {
// NOTE: still can't handle the case where `log-bin` has the format of `mysql-bin|666888`.
return pos, nil // pos is just the real pos
}
Expand All @@ -111,27 +112,27 @@ func RealMySQLPos(pos gmysql.Position) (gmysql.Position, error) {
return pos, nil
}

// ExtractSuffix extracts uuidSuffix from input name.
// ExtractSuffix extracts RelaySubDirSuffix from input name.
func ExtractSuffix(name string) (int, error) {
if len(name) == 0 {
return MinUUIDSuffix, nil
return MinRelaySubDirSuffix, nil
}
filename, err := ParseFilename(name)
if err != nil {
return 0, err
}
sepIdx := strings.LastIndex(filename.BaseName, posUUIDSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posUUIDSuffixSeparator) < len(filename.BaseName) {
suffix := filename.BaseName[sepIdx+len(posUUIDSuffixSeparator):]
sepIdx := strings.LastIndex(filename.BaseName, posRelaySubDirSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posRelaySubDirSuffixSeparator) < len(filename.BaseName) {
suffix := filename.BaseName[sepIdx+len(posRelaySubDirSuffixSeparator):]
v, err := strconv.ParseInt(suffix, 10, 64)
return int(v), err
}
return MinUUIDSuffix, nil
return MinRelaySubDirSuffix, nil
}

// ExtractPos extracts (uuidWithSuffix, uuidSuffix, originalPos) from input pos (originalPos or convertedPos).
// ExtractPos extracts (uuidWithSuffix, RelaySubDirSuffix, originalPos) from input position (originalPos or convertedPos).
// nolint:nakedret
func ExtractPos(pos gmysql.Position, uuids []string) (uuidWithSuffix string, uuidSuffix string, realPos gmysql.Position, err error) {
func ExtractPos(pos gmysql.Position, uuids []string) (uuidWithSuffix string, relaySubDirSuffix string, realPos gmysql.Position, err error) {
if len(uuids) == 0 {
err = terror.ErrBinlogExtractPosition.New("empty UUIDs not valid")
return
Expand All @@ -141,46 +142,46 @@ func ExtractPos(pos gmysql.Position, uuids []string) (uuidWithSuffix string, uui
if err != nil {
return
}
sepIdx := strings.LastIndex(parsed.BaseName, posUUIDSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posUUIDSuffixSeparator) < len(parsed.BaseName) {
realBaseName, masterUUIDSuffix := parsed.BaseName[:sepIdx], parsed.BaseName[sepIdx+len(posUUIDSuffixSeparator):]
if !verifyUUIDSuffix(masterUUIDSuffix) {
err = terror.ErrBinlogExtractPosition.Generatef("invalid UUID suffix %s", masterUUIDSuffix)
sepIdx := strings.LastIndex(parsed.BaseName, posRelaySubDirSuffixSeparator)
if sepIdx > 0 && sepIdx+len(posRelaySubDirSuffixSeparator) < len(parsed.BaseName) {
realBaseName, masterRelaySubDirSuffix := parsed.BaseName[:sepIdx], parsed.BaseName[sepIdx+len(posRelaySubDirSuffixSeparator):]
if !verifyRelaySubDirSuffix(masterRelaySubDirSuffix) {
err = terror.ErrBinlogExtractPosition.Generatef("invalid UUID suffix %s", masterRelaySubDirSuffix)
return
}

// NOTE: still can't handle the case where `log-bin` has the format of `mysql-bin|666888` and UUID suffix `666888` exists.
uuid := utils.GetUUIDBySuffix(uuids, masterUUIDSuffix)
uuid := utils.GetUUIDBySuffix(uuids, masterRelaySubDirSuffix)

if len(uuid) > 0 {
// valid UUID found
uuidWithSuffix = uuid
uuidSuffix = masterUUIDSuffix
relaySubDirSuffix = masterRelaySubDirSuffix
realPos = gmysql.Position{
Name: ConstructFilename(realBaseName, parsed.Seq),
Pos: pos.Pos,
}
} else {
err = terror.ErrBinlogExtractPosition.Generatef("UUID suffix %s with UUIDs %v not found", masterUUIDSuffix, uuids)
err = terror.ErrBinlogExtractPosition.Generatef("UUID suffix %s with UUIDs %v not found", masterRelaySubDirSuffix, uuids)
}
return
}

// use the latest
var suffixInt int
uuid := uuids[len(uuids)-1]
_, suffixInt, err = utils.ParseSuffixForUUID(uuid)
_, suffixInt, err = utils.ParseSuffixFromRelaySubDir(uuid)
if err != nil {
return
}
uuidWithSuffix = uuid
uuidSuffix = utils.SuffixIntToStr(suffixInt)
relaySubDirSuffix = utils.SuffixIntToStr(suffixInt)
realPos = pos // pos is realPos
return
}

// verifyUUIDSuffix verifies suffix whether is a valid UUID suffix.
func verifyUUIDSuffix(suffix string) bool {
// verifyRelaySubDirSuffix verifies suffix whether is a valid relay log subdirectory suffix.
func verifyRelaySubDirSuffix(suffix string) bool {
v, err := strconv.ParseInt(suffix, 10, 64)
if err != nil || v <= 0 {
return false
Expand Down
6 changes: 3 additions & 3 deletions dm/pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (t *testPositionSuite) TestVerifyUUIDSuffix(c *C) {
}

for _, cs := range cases {
c.Assert(verifyUUIDSuffix(cs.suffix), Equals, cs.valid)
c.Assert(verifyRelaySubDirSuffix(cs.suffix), Equals, cs.valid)
}
}

Expand Down Expand Up @@ -793,11 +793,11 @@ func (t *testPositionSuite) TestExtractSuffix(c *C) {
}{
{
"",
MinUUIDSuffix,
MinRelaySubDirSuffix,
},
{
"mysql-bin.00005",
MinUUIDSuffix,
MinRelaySubDirSuffix,
},
{
"mysql-bin|000001.000001",
Expand Down
14 changes: 7 additions & 7 deletions dm/pkg/streamer/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
// RelayLogInfo represents information for relay log.
type RelayLogInfo struct {
TaskName string
UUID string
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
SubDir string
UUIDSuffix int
Filename string
}
Expand All @@ -48,7 +48,7 @@ func (info *RelayLogInfo) Earlier(other *RelayLogInfo) bool {

// String implements Stringer.String.
func (info *RelayLogInfo) String() string {
return filepath.Join(info.UUID, info.Filename)
return filepath.Join(info.SubDir, info.Filename)
}

// relayLogInfoHub holds information for all active relay logs.
Expand All @@ -63,8 +63,8 @@ func newRelayLogInfoHub() *relayLogInfoHub {
}
}

func (h *relayLogInfoHub) update(taskName, uuid, filename string) error {
_, suffix, err := utils.ParseSuffixForUUID(uuid)
func (h *relayLogInfoHub) update(taskName, subDir, filename string) error {
_, suffix, err := utils.ParseSuffixFromRelaySubDir(subDir)
if err != nil {
return err
}
Expand All @@ -75,7 +75,7 @@ func (h *relayLogInfoHub) update(taskName, uuid, filename string) error {
defer h.mu.Unlock()
h.logs[taskName] = RelayLogInfo{
TaskName: taskName,
UUID: uuid,
SubDir: subDir,
UUIDSuffix: suffix,
Filename: filename,
}
Expand Down Expand Up @@ -123,8 +123,8 @@ func GetReaderHub() *ReaderHub {
}

// UpdateActiveRelayLog updates active relay log for taskName.
func (h *ReaderHub) UpdateActiveRelayLog(taskName, uuid, filename string) error {
return h.rlih.update(taskName, uuid, filename)
func (h *ReaderHub) UpdateActiveRelayLog(taskName, subDir, filename string) error {
return h.rlih.update(taskName, subDir, filename)
}

// RemoveActiveRelayLog removes active relay log for taskName.
Expand Down
16 changes: 8 additions & 8 deletions dm/pkg/streamer/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (t *testHubSuite) TestRelayLogInfo(c *C) {
c.Assert(rli3.Earlier(&rli1), IsFalse)

// string representation
rli3.UUID = "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000001"
c.Assert(rli3.String(), Equals, filepath.Join(rli3.UUID, rli3.Filename))
rli3.SubDir = "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000001"
c.Assert(rli3.String(), Equals, filepath.Join(rli3.SubDir, rli3.Filename))
}

func (t *testHubSuite) TestRelayLogInfoHub(c *C) {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (t *testHubSuite) TestRelayLogInfoHub(c *C) {
c.Assert(err, IsNil)
taskName, earliest = rlih.earliest()
c.Assert(taskName, Equals, cs.taskName)
c.Assert(earliest.UUID, Equals, cs.uuid)
c.Assert(earliest.SubDir, Equals, cs.uuid)
c.Assert(earliest.Filename, Equals, cs.filename)
}
c.Assert(len(rlih.logs), Equals, 3)
Expand All @@ -98,7 +98,7 @@ func (t *testHubSuite) TestRelayLogInfoHub(c *C) {
taskName, earliest = rlih.earliest()
cs = cases[1]
c.Assert(taskName, Equals, cs.taskName)
c.Assert(earliest.UUID, Equals, cs.uuid)
c.Assert(earliest.SubDir, Equals, cs.uuid)
c.Assert(earliest.Filename, Equals, cs.filename)

// remove non-earliest
Expand All @@ -109,7 +109,7 @@ func (t *testHubSuite) TestRelayLogInfoHub(c *C) {
taskName, earliest = rlih.earliest()
cs = cases[1]
c.Assert(taskName, Equals, cs.taskName)
c.Assert(earliest.UUID, Equals, cs.uuid)
c.Assert(earliest.SubDir, Equals, cs.uuid)
c.Assert(earliest.Filename, Equals, cs.filename)

// all removed
Expand Down Expand Up @@ -141,7 +141,7 @@ func (t *testHubSuite) TestReaderHub(c *C) {
// the only one is the earliest
erli = h.EarliestActiveRelayLog()
c.Assert(erli, NotNil)
c.Assert(erli.UUID, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000004")
c.Assert(erli.SubDir, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000004")
c.Assert(erli.Filename, Equals, "mysql-bin.000001")

// update an earlier one
Expand All @@ -151,7 +151,7 @@ func (t *testHubSuite) TestReaderHub(c *C) {
// the earlier one is the earliest
erli = h.EarliestActiveRelayLog()
c.Assert(erli, NotNil)
c.Assert(erli.UUID, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002")
c.Assert(erli.SubDir, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000002")
c.Assert(erli.Filename, Equals, "mysql-bin.000002")

// remove the earlier one
Expand All @@ -160,7 +160,7 @@ func (t *testHubSuite) TestReaderHub(c *C) {
// the only one is the earliest
erli = h.EarliestActiveRelayLog()
c.Assert(erli, NotNil)
c.Assert(erli.UUID, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000004")
c.Assert(erli.SubDir, Equals, "c6ae5afe-c7a3-11e8-a19d-0242ac130006.000004")
c.Assert(erli.Filename, Equals, "mysql-bin.000001")

// remove the only one
Expand Down
33 changes: 6 additions & 27 deletions dm/pkg/utils/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
uuidIndexSeparator = "."
)

// ParseUUIDIndex parses server-uuid.index.
// ParseUUIDIndex parses UUIDIndexFilename, return a list of relay log subdirectory names and error.
func ParseUUIDIndex(indexPath string) ([]string, error) {
fd, err := os.Open(indexPath)
if os.IsNotExist(err) {
Expand Down Expand Up @@ -67,18 +67,19 @@ func ParseUUIDIndex(indexPath string) ([]string, error) {
return uuids, nil
}

// AddSuffixForUUID adds a suffix for UUID.
// AddSuffixForUUID adds a suffix for UUID, returns the name for relay log subdirectory.
func AddSuffixForUUID(uuid string, id int) string {
return fmt.Sprintf("%s%s%06d", uuid, uuidIndexSeparator, id) // eg. 53ea0ed1-9bf8-11e6-8bea-64006a897c73.000001
}

// SuffixIntToStr convert int-represented suffix to string-represented.
// TODO: assign RelaySubDirSuffix a type and implement Stringer.
func SuffixIntToStr(id int) string {
return fmt.Sprintf("%06d", id)
}

// ParseSuffixForUUID parses UUID (with suffix) to (UUID without suffix, suffix) pair.
func ParseSuffixForUUID(uuid string) (string, int, error) {
// ParseSuffixFromRelaySubDir parses relay log subdirectory name to (server UUID, RelaySubDirSuffix) pair.
func ParseSuffixFromRelaySubDir(uuid string) (string, int, error) {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
parts := strings.Split(uuid, uuidIndexSeparator)
if len(parts) != 2 || len(parts[1]) != 6 {
return "", 0, terror.ErrRelayParseUUIDSuffix.Generate(uuid)
Expand All @@ -90,29 +91,7 @@ func ParseSuffixForUUID(uuid string) (string, int, error) {
return parts[0], ID, nil
}

// GetSuffixUUID gets UUID (with suffix) by UUID (without suffix)
// when multi UUIDs (without suffix) are the same, the newest will be return.
func GetSuffixUUID(indexPath, uuid string) (string, error) {
uuids, err := ParseUUIDIndex(indexPath)
if err != nil {
return "", err
}

// newer is preferred
for i := len(uuids) - 1; i >= 0; i-- {
uuid2, _, err := ParseSuffixForUUID(uuids[i])
if err != nil {
return "", err
}
if uuid2 == uuid {
return uuids[i], nil
}
}

return "", terror.ErrRelayUUIDWithSuffixNotFound.Generate(uuid, indexPath, uuids)
}

// GetUUIDBySuffix gets UUID from uuids by suffix.
// GetUUIDBySuffix gets relay log subdirectory name by matching suffix.
func GetUUIDBySuffix(uuids []string, suffix string) string {
suffix2 := fmt.Sprintf("%s%s", uuidIndexSeparator, suffix)
for _, uuid := range uuids {
Expand Down
Loading