Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, tso, apiv2: impl the interface to merge all keyspace groups into the default #6757

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// We use 120 here to leave some space for other operations.
// See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61
maxEtcdTxnOps = 120
MaxEtcdTxnOps = 120
)

// Config is the interface for keyspace config.
Expand Down Expand Up @@ -681,7 +681,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -705,7 +705,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps)
if err != nil {
return err
}
Expand All @@ -715,9 +715,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
moreToPatrol = keyspaceNum == MaxEtcdTxnOps
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand Down Expand Up @@ -756,7 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -770,7 +770,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", maxEtcdTxnOps),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand Down
18 changes: 9 additions & 9 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
Expand All @@ -430,15 +430,15 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -453,22 +453,22 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1]
// Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(maxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1
startKeyspaceID = uint32(MaxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
Expand Down
101 changes: 100 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
// - Load and delete the keyspace groups in the merge list.
// - Load and update the target keyspace group.
// So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction.
if (mergeListNum+1)*2 > maxEtcdTxnOps {
if (mergeListNum+1)*2 > MaxEtcdTxnOps {
return ErrExceedMaxEtcdTxnOps
}
if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) {
Expand Down Expand Up @@ -1013,6 +1013,105 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
return nil
}

// MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group.
func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
defer logutil.LogPanic()
// Since we don't take the default keyspace group into account,
// the number of unmerged keyspace groups is -1.
unmergedGroupNum := -1
// Calculate the total number of keyspace groups to merge.
for _, groups := range m.groups {
unmergedGroupNum += groups.Len()
}
mergedGroupNum := 0
// Start to merge all keyspace groups into the default one.
for userKind, groups := range m.groups {
mergeNum := groups.Len()
log.Info("start to merge all keyspace groups into the default one",
zap.Stringer("user-kind", userKind),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
if mergeNum == 0 {
continue
}
var (
maxBatchSize = MaxEtcdTxnOps/2 - 1
groupsToMerge = make([]uint32, 0, maxBatchSize)
)
for idx, group := range groups.GetAll() {
if group.ID == utils.DefaultKeyspaceGroupID {
continue
}
groupsToMerge = append(groupsToMerge, group.ID)
if len(groupsToMerge) < maxBatchSize && idx < mergeNum-1 {
continue
}
log.Info("merge keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
// Reach the batch size, merge them into the default keyspace group.
if err := m.MergeKeyspaceGroups(utils.DefaultKeyspaceGroupID, groupsToMerge); err != nil {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
log.Error("failed to merge all keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum),
zap.Error(err))
return err
}
// Wait for the merge to finish.
ctx, cancel := context.WithTimeout(m.ctx, time.Minute)
ticker := time.NewTicker(time.Second)
checkLoop:
for {
select {
case <-ctx.Done():
log.Info("cancel merging all keyspace groups into the default one",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
cancel()
ticker.Stop()
return nil
case <-ticker.C:
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
if err != nil {
log.Error("failed to check the default keyspace group merge state",
zap.Int("index", idx),
zap.Int("batch-size", len(groupsToMerge)),
zap.Int("merge-num", mergeNum),
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum),
zap.Error(err))
cancel()
ticker.Stop()
return err
}
if !kg.IsMergeTarget() {
break checkLoop
}
}
}
cancel()
ticker.Stop()
mergedGroupNum += len(groupsToMerge)
unmergedGroupNum -= len(groupsToMerge)
groupsToMerge = groupsToMerge[:0]
}
}
log.Info("finish merging all keyspace groups into the default one",
zap.Int("merged-group-num", mergedGroupNum),
zap.Int("unmerged-group-num", unmergedGroupNum))
return nil
}

// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID.
func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
// check if the keyspace group exists
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5})
re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error())
// merge with the number of keyspace groups exceeds the limit
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2))
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
// merge the default keyspace group
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID})
Expand Down
15 changes: 12 additions & 3 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ func FinishSplitKeyspaceByID(c *gin.Context) {

// MergeKeyspaceGroupsParams defines the params for merging the keyspace groups.
type MergeKeyspaceGroupsParams struct {
MergeList []uint32 `json:"merge-list"`
MergeList []uint32 `json:"merge-list"`
MergeAllIntoDefault bool `json:"merge-all-into-default"`
}

// MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group.
Expand All @@ -292,10 +293,14 @@ func MergeKeyspaceGroups(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if len(mergeParams.MergeList) == 0 {
if len(mergeParams.MergeList) == 0 && !mergeParams.MergeAllIntoDefault {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list")
return
}
if len(mergeParams.MergeList) > 0 && mergeParams.MergeAllIntoDefault {
c.AbortWithStatusJSON(http.StatusBadRequest, "non-empty merge list when merge all into default")
return
}
for _, mergeID := range mergeParams.MergeList {
if !isValid(mergeID) {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
Expand All @@ -310,7 +315,11 @@ func MergeKeyspaceGroups(c *gin.Context) {
return
}
// Merge keyspace group.
err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList)
if mergeParams.MergeAllIntoDefault {
err = groupManager.MergeAllIntoDefaultKeyspaceGroup()
} else {
err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList)
}
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
Expand Down
70 changes: 57 additions & 13 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tso

import (
"context"
"math/rand"
"strings"
"sync"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -465,15 +467,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
re.Equal(uint32(0), kg.ID)
re.Equal([]uint32{0}, kg.Keyspaces)
re.False(kg.IsSplitting())
// wait for finishing alloc nodes
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
testConfig := map[string]string{
"config": "1",
"tso_keyspace_group_id": "0",
Expand All @@ -483,15 +478,19 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
Name: "test_keyspace",
Config: testConfig,
})
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
testutil.Eventually(re, func() bool {
kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0)
return len(kg.Members) == 2
})
waitFinishAllocNodes(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func waitFinishAllocNodes(re *require.Assertions, server *tests.TestServer, groupID uint32) {
testutil.Eventually(re, func() bool {
kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, groupID)
re.Equal(groupID, kg.ID)
return len(kg.Members) == mcsutils.DefaultKeyspaceGroupReplicaCount
})
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -749,3 +748,48 @@ func TestGetTSOImmediately(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller"))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

var (
keyspaceGroupNum = keyspace.MaxEtcdTxnOps
keyspaceGroups = make([]*endpoint.KeyspaceGroup, 0, keyspaceGroupNum)
keyspaces = make([]uint32, 0, keyspaceGroupNum)
)
for i := 1; i <= keyspaceGroupNum; i++ {
keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{
ID: uint32(i),
UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(),
Keyspaces: []uint32{uint32(i)},
})
keyspaces = append(keyspaces, uint32(i))
if len(keyspaceGroups) < keyspace.MaxEtcdTxnOps/2 && i != keyspaceGroupNum {
continue
}
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: keyspaceGroups,
})
keyspaceGroups = keyspaceGroups[:0]
}
// Check if all the keyspace groups are created.
groups := handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0")
re.Len(groups, keyspaceGroupNum+1)
// Wait for all the keyspace groups to be served.
svr := suite.tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(svr)
svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(keyspaceGroupNum), uint32(keyspaceGroupNum))
re.NotNil(svr)
// Merge all the keyspace groups into the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeAllIntoDefault: true,
})
// Wait for all the keyspace groups to be merged.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, keyspaces)
// Check if all the keyspace groups are merged.
groups = handlersutil.MustLoadKeyspaceGroups(re, suite.pdLeaderServer, "0", "0")
re.Len(groups, 1)

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}