Skip to content

Commit

Permalink
keyspace, api2: fix the keyspace assignment patrol consistency (tikv#…
Browse files Browse the repository at this point in the history
…6397)

ref tikv#6232

Fix the keyspace assignment patrol consistency.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
  • Loading branch information
2 people authored and zeminzhou committed May 10, 2023
1 parent 583fa11 commit 6853a69
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 102 deletions.
88 changes: 84 additions & 4 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package keyspace

import (
"context"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -72,7 +74,8 @@ type Manager struct {
ctx context.Context
// config is the configurations of the manager.
config Config
kgm *GroupManager
// kgm is the keyspace group manager of the server.
kgm *GroupManager
}

// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
Expand All @@ -86,7 +89,9 @@ type CreateKeyspaceRequest struct {
}

// NewKeyspaceManager creates a Manager of keyspace related data.
func NewKeyspaceManager(store endpoint.KeyspaceStorage,
func NewKeyspaceManager(
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster schedule.Cluster,
idAllocator id.Allocator,
config Config,
Expand All @@ -97,7 +102,7 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage,
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: context.TODO(),
ctx: ctx,
config: config,
kgm: kgm,
}
Expand Down Expand Up @@ -540,7 +545,18 @@ func (manager *Manager) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspac
if startID > spaceIDMax {
return nil, errors.Errorf("startID of the scan %d exceeds spaceID Max %d", startID, spaceIDMax)
}
return manager.store.LoadRangeKeyspace(startID, limit)
var (
keyspaces []*keyspacepb.KeyspaceMeta
err error
)
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
keyspaces, err = manager.store.LoadRangeKeyspace(txn, startID, limit)
return err
})
if err != nil {
return nil, err
}
return keyspaces, nil
}

// allocID allocate a new keyspace id.
Expand All @@ -555,3 +571,67 @@ func (manager *Manager) allocID() (uint32, error) {
}
return id32, nil
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
// TODO: since the number of keyspaces might be large, we should consider to assign them in batches.
return manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0)
if err != nil {
return err
}
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, len(keyspaces))
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else {
// If the keyspace already has a group ID, skip it.
_, ok := ks.Config[TSOKeyspaceGroupIDKey]
if ok {
manager.metaLock.Unlock(ks.Id)
continue
}
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Uint32("ID", ks.Id), zap.Error(err))
return err
}
}
if assigned {
return manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
}
return nil
})
}
30 changes: 29 additions & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(suite.manager.Bootstrap())
}
Expand Down Expand Up @@ -354,3 +355,30 @@ func updateKeyspaceConfig(re *require.Assertions, manager *Manager, name string,
oldMeta = updatedMeta
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Check if the keyspace is not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111))
}
51 changes: 3 additions & 48 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,9 @@ type GroupManager struct {
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
// patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces
// and assign them to the keyspace groups.
patrolKeyspaceAssignmentOnce bool

// store is the storage for keyspace group related information.
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
}
store endpoint.KeyspaceGroupStorage

client *clientv3.Client

Expand All @@ -88,10 +82,7 @@ type GroupManager struct {
// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(
ctx context.Context,
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
},
store endpoint.KeyspaceGroupStorage,
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
Expand Down Expand Up @@ -170,38 +161,6 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (m *GroupManager) patrolKeyspaceAssignment() error {
m.Lock()
defer m.Unlock()
if m.patrolKeyspaceAssignmentOnce {
return nil
}
keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic)
if err != nil {
return err
}
for _, ks := range keyspaces {
if ks == nil {
continue
}
groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64)
if err != nil {
return err
}
err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd)
if err != nil {
return err
}
}
m.patrolKeyspaceAssignmentOnce = true
return nil
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand Down Expand Up @@ -602,14 +561,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
err := m.patrolKeyspaceAssignment()
if err != nil {
return err
}
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
Expand Down
31 changes: 1 addition & 30 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand Down Expand Up @@ -48,7 +47,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
suite.kgm = NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
}

Expand Down Expand Up @@ -321,31 +320,3 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Force the patrol to run once.
suite.kgm.patrolKeyspaceAssignmentOnce = false
// Create a keyspace group without any keyspace.
err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Members: make([]endpoint.KeyspaceGroupMember, 2),
},
})
re.NoError(err)
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Split to see if the keyspace is attached to the group.
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111})
re.NoError(err)
}
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type KeyspaceStorage interface {
SaveKeyspaceID(txn kv.Txn, id uint32, name string) error
LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32, error)
// LoadRangeKeyspace loads no more than limit keyspaces starting at startID.
LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error)
LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error)
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

Expand Down Expand Up @@ -104,10 +104,10 @@ func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) erro

// LoadRangeKeyspace loads keyspaces starting at startID.
// limit specifies the limit of loaded keyspaces.
func (se *StorageEndpoint) LoadRangeKeyspace(startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) {
func (se *StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) {
startKey := KeyspaceMetaPath(startID)
endKey := clientv3.GetPrefixRangeEnd(KeyspaceMetaPrefix())
keys, values, err := se.LoadRange(startKey, endKey, limit)
keys, values, err := txn.LoadRange(startKey, endKey, limit)
if err != nil {
return nil, err
}
Expand Down
30 changes: 17 additions & 13 deletions pkg/storage/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,31 +72,35 @@ func TestSaveLoadKeyspace(t *testing.T) {
func TestLoadRangeKeyspaces(t *testing.T) {
re := require.New(t)
storage := NewStorageWithMemoryBackend()

// Store test keyspace meta.
keyspaces := makeTestKeyspaces()
// Store test keyspace meta.
err := storage.RunInTxn(context.TODO(), func(txn kv.Txn) error {
for _, keyspace := range keyspaces {
re.NoError(storage.SaveKeyspaceMeta(txn, keyspace))
}
return nil
})
re.NoError(err)
// Test load range keyspaces.
err = storage.RunInTxn(context.TODO(), func(txn kv.Txn) error {
// Load all keyspaces.
loadedKeyspaces, err := storage.LoadRangeKeyspace(txn, keyspaces[0].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces, loadedKeyspaces)

// Load all keyspaces.
loadedKeyspaces, err := storage.LoadRangeKeyspace(keyspaces[0].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces, loadedKeyspaces)
// Load keyspaces with id >= second test keyspace's id.
loadedKeyspaces2, err := storage.LoadRangeKeyspace(txn, keyspaces[1].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces[1:], loadedKeyspaces2)

// Load keyspaces with id >= second test keyspace's id.
loadedKeyspaces2, err := storage.LoadRangeKeyspace(keyspaces[1].GetId(), 0)
re.NoError(err)
re.ElementsMatch(keyspaces[1:], loadedKeyspaces2)
// Load keyspace with the smallest id.
loadedKeyspace3, err := storage.LoadRangeKeyspace(txn, 1, 1)
re.NoError(err)
re.ElementsMatch(keyspaces[:1], loadedKeyspace3)

// Load keyspace with the smallest id.
loadedKeyspace3, err := storage.LoadRangeKeyspace(1, 1)
return nil
})
re.NoError(err)
re.ElementsMatch(keyspaces[:1], loadedKeyspace3)
}

func makeTestKeyspaces() []*keyspacepb.KeyspaceMeta {
Expand Down
Loading

0 comments on commit 6853a69

Please sign in to comment.