Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, apiv2: support to split keyspace group with the keyspace ID range #6646

Merged
merged 4 commits into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,16 @@ func (manager *Manager) allocID() (uint32, error) {
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID uint32) error {
if startKeyspaceID > manager.nextPatrolStartID {
manager.nextPatrolStartID = startKeyspaceID
}
if endKeyspaceID != 0 && endKeyspaceID < manager.nextPatrolStartID {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
log.Info("[keyspace] end keyspace id is smaller than the next patrol start id, skip patrol",
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("next-patrol-start-id", manager.nextPatrolStartID))
return nil
}
var (
// Some statistics info.
start = time.Now()
Expand All @@ -675,6 +684,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
Expand Down Expand Up @@ -706,8 +717,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `maxEtcdTxnOps` keyspaces,
// we have reached the end of the keyspace list.
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
var (
assigned = false
Expand All @@ -722,6 +733,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if ks == nil {
continue
}
if endKeyspaceID != 0 && ks.Id > endKeyspaceID {
moreToPatrol = false
break
}
patrolledKeyspaceCount++
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
Expand All @@ -744,6 +759,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
Expand All @@ -756,6 +773,8 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
Expand Down
49 changes: 46 additions & 3 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand All @@ -435,6 +435,49 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(maxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
} else {
re.NotContains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
}
}
}

// Benchmark the keyspace assignment patrol.
func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(1000, b)
Expand Down Expand Up @@ -471,7 +514,7 @@ func benchmarkPatrolKeyspaceAssignmentN(
// Benchmark the keyspace assignment patrol.
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := suite.manager.PatrolKeyspaceAssignment()
err := suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
}
b.StopTimer()
Expand Down
98 changes: 71 additions & 27 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
func (m *GroupManager) SplitKeyspaceGroupByID(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to support it in pd-ctl?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will implement it in a new PR after this one is merged.

splitSourceID, splitTargetID uint32,
keyspaces []uint32, keyspaceIDRange ...uint32,
) error {
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -542,34 +545,17 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
keyspaceNum := len(keyspaces)
sourceKeyspaceNum := len(splitSourceKg.Keyspaces)
// Check if the keyspaces are all in the old keyspace group.
if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum {
return ErrKeyspaceNotInKeyspaceGroup
var startKeyspaceID, endKeyspaceID uint32
if len(keyspaceIDRange) >= 2 {
startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1]
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum)
newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum)
)
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return ErrKeyspaceNotInKeyspaceGroup
}
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum)
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces(
splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID)
if err != nil {
return err
}
// Update the old keyspace group.
splitSourceKg.Keyspaces = splitKeyspaces
splitSourceKg.Keyspaces = splitSourceKeyspaces
splitSourceKg.SplitState = &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
}
Expand All @@ -581,7 +567,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
// Keep the same user kind and members as the old keyspace group.
UserKind: splitSourceKg.UserKind,
Members: splitSourceKg.Members,
Keyspaces: keyspaces,
Keyspaces: splitTargetKeyspaces,
SplitState: &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
},
Expand All @@ -597,6 +583,64 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
return nil
}

func buildSplitKeyspaces(
// `old` is the original keyspace list which will be split out,
// `new` is the keyspace list which will be split from the old keyspace list.
old, new []uint32,
startKeyspaceID, endKeyspaceID uint32,
) ([]uint32, []uint32, error) {
oldNum, newNum := len(old), len(new)
// Split according to the new keyspace list.
if newNum != 0 {
if newNum > oldNum {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, oldNum)
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range new {
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace list for the old keyspace group.
oldSplit := make([]uint32, 0, oldNum-newNum)
for _, keyspace := range old {
if _, ok := newKeyspaceMap[keyspace]; !ok {
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, new, nil
}
// Split according to the start and end keyspace ID.
if startKeyspaceID == 0 && endKeyspaceID == 0 {
return nil, nil, ErrKeyspaceNotInKeyspaceGroup
}
var (
newSplit = make([]uint32, 0, oldNum)
newKeyspaceMap = make(map[uint32]struct{}, newNum)
)
for _, keyspace := range old {
if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID {
newSplit = append(newSplit, keyspace)
newKeyspaceMap[keyspace] = struct{}{}
}
}
// Get the split keyspace list for the old keyspace group.
oldSplit := make([]uint32, 0, oldNum-len(newSplit))
for _, keyspace := range old {
if _, ok := newKeyspaceMap[keyspace]; !ok {
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, newSplit, nil
}

// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup
Expand Down
Loading