Skip to content

Commit

Permalink
migration(ticdc): fix backup key issue (#6232)
Browse files Browse the repository at this point in the history
ref #5301
  • Loading branch information
sdojjy authored Jul 8, 2022
1 parent 9d283fc commit b4259d5
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string {

// MigrateBackupKey is the key of backup data during a migration.
func MigrateBackupKey(version int, backupKey string) string {
if strings.HasPrefix(backupKey, "/") {
return fmt.Sprintf("%s/%d%s", migrateBackupPrefix, version, backupKey)
}
return fmt.Sprintf("%s/%d/%s", migrateBackupPrefix, version, backupKey)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,10 @@ func TestExtractKeySuffix(t *testing.T) {
}
}
}

func TestMigrateBackupKey(t *testing.T) {
key := MigrateBackupKey(1, "/tidb/cdc/capture/abcd")
require.Equal(t, "/tidb/cdc/__backup__/1/tidb/cdc/capture/abcd", key)
key = MigrateBackupKey(1, "abcdc")
require.Equal(t, "/tidb/cdc/__backup__/1/abcdc", key)
}
14 changes: 7 additions & 7 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createPDClient(ctx context.Context,
// 1. check and put metaVersion
// 2. campaign old owner
// 3. update keys
// 4. check meta data consistency
// 4. check metadata consistency
// 5. update metaVersion
func (m *migrator) migrate(ctx context.Context, etcdNoMetaVersion bool, oldVersion int) error {
pdClient, err := m.createPDClientFunc(ctx,
Expand Down Expand Up @@ -292,7 +292,7 @@ func cleanOldData(ctx context.Context, client *etcd.Client) {
if strings.HasPrefix(key, oldChangefeedPrefix) {
value = maskChangefeedInfo(kvPair.Value)
}
// 0 is the backup verion. For now, we only support verion 0
// 0 is the backup version. For now, we only support version 0
newKey := etcd.MigrateBackupKey(0, key)
log.Info("renaming old etcd data",
zap.String("key", key),
Expand All @@ -312,7 +312,7 @@ func cleanOldData(ctx context.Context, client *etcd.Client) {
}
}

// old key prefix that should be remove
// old key prefix that should be removed
var oldKeyPrefix = []string{
"/tidb/cdc/changefeed/info",
"/tidb/cdc/job",
Expand Down Expand Up @@ -469,7 +469,7 @@ func (m *migrator) Migrate(ctx context.Context) error {
return m.migrate(ctx, version == noMetaVersion, oldVersion)
}

// ShouldMigrate checks if we should migrate etcd meta data
// ShouldMigrate checks if we should migrate etcd metadata
func (m *migrator) ShouldMigrate(ctx context.Context) (bool, error) {
version, err := getMetaVersion(ctx, m.cli.Client, m.cli.ClusterID)
if err != nil {
Expand Down Expand Up @@ -568,17 +568,17 @@ func getMetaVersion(ctx context.Context, cli *etcd.Client, clusterID string) (in
type NoOpMigrator struct{}

// ShouldMigrate checks if we need to migrate metadata
func (f *NoOpMigrator) ShouldMigrate(ctx context.Context) (bool, error) {
func (f *NoOpMigrator) ShouldMigrate(_ context.Context) (bool, error) {
return false, nil
}

// Migrate migrates the cdc metadata
func (f *NoOpMigrator) Migrate(ctx context.Context) error {
func (f *NoOpMigrator) Migrate(_ context.Context) error {
return nil
}

// WaitMetaVersionMatched wait util migration is done
func (f *NoOpMigrator) WaitMetaVersionMatched(ctx context.Context) error {
func (f *NoOpMigrator) WaitMetaVersionMatched(_ context.Context) error {
return nil
}

Expand Down

0 comments on commit b4259d5

Please sign in to comment.