Skip to content

Commit

Permalink
dm: reset optimistic sharding group keeper when syncer resume (#9707)
Browse files Browse the repository at this point in the history
close #9588
  • Loading branch information
GMHDBJD authored Sep 11, 2023
1 parent 85dcc86 commit 78d59fc
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
8 changes: 8 additions & 0 deletions dm/syncer/opt_sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,11 @@ func (k *OptShardingGroupKeeper) RemoveSchema(schema string) {
}
}
}

// Reset resets the keeper.
func (k *OptShardingGroupKeeper) Reset() {
k.Lock()
defer k.Unlock()
k.groups = make(map[string]*OptShardingGroup)
k.shardingReSyncs = make(map[string]binlog.Location)
}
5 changes: 5 additions & 0 deletions dm/syncer/opt_sharding_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (s *optShardingGroupSuite) TestLowestFirstPosInOptGroups() {
k.removeShardingReSync(&ShardingReSync{targetTable: utils.UnpackTableID(db2tbl)})
// should be pos11 now, pos21 is totally resolved
require.Equal(s.T(), pos11.Position, k.lowestFirstLocationInGroups().Position)

// reset
k.Reset()
require.Len(s.T(), k.groups, 0)
require.Len(s.T(), k.shardingReSyncs, 0)
}

func (s *optShardingGroupSuite) TestSync() {
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ func (s *Syncer) reset() {
s.sgk.ResetGroups()
s.pessimist.Reset()
case config.ShardOptimistic:
s.osgk.Reset()
s.optimist.Reset()
}
}
Expand Down

0 comments on commit 78d59fc

Please sign in to comment.