Skip to content

Commit

Permalink
Support to split keyspace group with the keyspace ID range
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jun 20, 2023
1 parent 77083e6 commit ae44c86
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 37 deletions.
24 changes: 20 additions & 4 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,13 @@ func (manager *Manager) allocID() (uint32, error) {
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID uint32) error {
if startKeyspaceID > manager.nextPatrolStartID {
manager.nextPatrolStartID = startKeyspaceID
}
if endKeyspaceID != 0 && endKeyspaceID < manager.nextPatrolStartID {
return nil
}
var (
// Some statistics info.
start = time.Now()
Expand All @@ -675,6 +681,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
Expand Down Expand Up @@ -706,9 +714,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `maxEtcdTxnOps` keyspaces,
// we have reached the end of the keyspace list.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps && endKeyspaceID != 0 && nextStartID < endKeyspaceID
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand All @@ -722,6 +730,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if ks == nil {
continue
}
if endKeyspaceID != 0 && ks.Id > endKeyspaceID {
moreToPatrol = false
break
}
patrolledKeyspaceCount++
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
Expand All @@ -744,6 +756,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
Expand All @@ -756,6 +770,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
Expand Down
43 changes: 40 additions & 3 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand All @@ -435,6 +435,43 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [10, 20]
err = suite.manager.PatrolKeyspaceAssignment(10, 20)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
if i >= 10 && i <= 20 {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
} else {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}
}

// Benchmark the keyspace assignment patrol.
func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(1000, b)
Expand Down Expand Up @@ -471,7 +508,7 @@ func benchmarkPatrolKeyspaceAssignmentN(
// Benchmark the keyspace assignment patrol.
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := suite.manager.PatrolKeyspaceAssignment()
err := suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
}
b.StopTimer()
Expand Down
98 changes: 71 additions & 27 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
func (m *GroupManager) SplitKeyspaceGroupByID(
splitSourceID, splitTargetID uint32,
keyspaces []uint32, keyspaceIDRange ...uint32,
) error {
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -542,34 +545,17 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
keyspaceNum := len(keyspaces)
sourceKeyspaceNum := len(splitSourceKg.Keyspaces)
// Check if the keyspaces are all in the old keyspace group.
if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum {
return ErrKeyspaceNotInKeyspaceGroup
var startKeyspaceID, endKeyspaceID uint32
if len(keyspaceIDRange) >= 2 {
startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1]
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum)
newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum)
)
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return ErrKeyspaceNotInKeyspaceGroup
}
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum)
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces(
splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID)
if err != nil {
return err
}
// Update the old keyspace group.
splitSourceKg.Keyspaces = splitKeyspaces
splitSourceKg.Keyspaces = splitSourceKeyspaces
splitSourceKg.SplitState = &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
}
Expand All @@ -581,7 +567,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
// Keep the same user kind and members as the old keyspace group.
UserKind: splitSourceKg.UserKind,
Members: splitSourceKg.Members,
Keyspaces: keyspaces,
Keyspaces: splitTargetKeyspaces,
SplitState: &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
},
Expand All @@ -597,6 +583,64 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
return nil
}

func buildSplitKeyspaces(
// `old` is the original keyspace list which will be split out,
// `new` is the keyspace list which will be split from the old keyspace list.
old, new []uint32,
startKeyspaceID, endKeyspaceID uint32,
) ([]uint32, []uint32, error) {
oldNum, newNum := len(old), len(new)
// Split according to the new keyspace list.
if newNum != 0 {
if newNum > oldNum {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, oldNum)
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range new {
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace list for the old keyspace group.
oldSplit := make([]uint32, 0, oldNum-newNum)
for _, keyspace := range old {
if _, ok := newKeyspaceMap[keyspace]; !ok {
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, new, nil
}
// Split according to the start and end keyspace ID.
if startKeyspaceID == 0 && endKeyspaceID == 0 {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
var (
newSplit = make([]uint32, 0, oldNum)
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID {
newSplit = append(newSplit, keyspace)
newKeyspaceMap[keyspace] = struct{}{}
}
}
// Get the split keyspace list for the old keyspace group.
oldSplit := make([]uint32, 0, oldNum-len(newSplit))
for _, keyspace := range old {
if _, ok := newKeyspaceMap[keyspace]; !ok {
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, newSplit, nil
}

// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup
Expand Down
Loading

0 comments on commit ae44c86

Please sign in to comment.