Skip to content

Commit

Permalink
*: bootstrap keyspace group when server is in API mode (#6308)
Browse files Browse the repository at this point in the history
ref #6231

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot authored Apr 13, 2023
1 parent 643b842 commit 4bd23b2
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 54 deletions.
53 changes: 26 additions & 27 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,46 @@ func (manager *Manager) Bootstrap() error {
return err
}
now := time.Now().Unix()
id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(endpoint.Basic)
if err != nil {
return err
}
defaultKeyspace := &keyspacepb.KeyspaceMeta{
defaultKeyspaceMata := &keyspacepb.KeyspaceMeta{
Id: DefaultKeyspaceID,
Name: DefaultKeyspaceName,
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
Config: map[string]string{
UserKindKey: endpoint.Basic.String(),
TSOKeyspaceGroupIDKey: id,
},
}
err = manager.saveNewKeyspace(defaultKeyspace)

config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic)
if err != nil {
return err
}
defaultKeyspaceMata.Config = config
err = manager.saveNewKeyspace(defaultKeyspaceMata)
// It's possible that default keyspace already exists in the storage (e.g. PD restart/recover),
// so we ignore the keyspaceExists error.
if err != nil && err != ErrKeyspaceExists {
return err
}
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, defaultKeyspace.GetId(), opAdd); err != nil {
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], defaultKeyspaceMata.GetId(), opAdd); err != nil {
return err
}
// Initialize pre-alloc keyspace.
preAlloc := manager.config.GetPreAlloc()
for _, keyspaceName := range preAlloc {
id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(endpoint.Basic)
config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic)
if err != nil {
return err
}
keyspace, err := manager.CreateKeyspace(&CreateKeyspaceRequest{
Name: keyspaceName,
Now: now,
Config: map[string]string{
UserKindKey: endpoint.Basic.String(),
TSOKeyspaceGroupIDKey: id,
},
})
req := &CreateKeyspaceRequest{
Name: keyspaceName,
Now: now,
Config: config,
}
keyspace, err := manager.CreateKeyspace(req)
// Ignore the keyspaceExists error for the same reason as saving default keyspace.
if err != nil && err != ErrKeyspaceExists {
return err
}
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, keyspace.GetId(), opAdd); err != nil {
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
return err
}
}
Expand All @@ -177,15 +173,18 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
return nil, err
}
userKind := endpoint.StringUserKind(request.Config[UserKindKey])
id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(userKind)
config, err := manager.kgm.GetKeyspaceConfigByKind(userKind)
if err != nil {
return nil, err
}
if request.Config == nil {
request.Config = make(map[string]string)
if len(config) != 0 {
if request.Config == nil {
request.Config = config
} else {
request.Config[TSOKeyspaceGroupIDKey] = config[TSOKeyspaceGroupIDKey]
request.Config[UserKindKey] = config[UserKindKey]
}
}
request.Config[TSOKeyspaceGroupIDKey] = id
request.Config[UserKindKey] = userKind.String()
// Create and save keyspace metadata.
keyspace := &keyspacepb.KeyspaceMeta{
Id: newID,
Expand All @@ -204,7 +203,7 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
)
return nil, err
}
if err := manager.kgm.UpdateKeyspaceForGroup(userKind, id, keyspace.GetId(), opAdd); err != nil {
if err := manager.kgm.UpdateKeyspaceForGroup(userKind, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil {
return nil, err
}
log.Info("[keyspace] keyspace created",
Expand Down
32 changes: 21 additions & 11 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ func (m *GroupManager) Bootstrap() error {
}

userKind := endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)
// If the group for the userKind does not exist, create a new one.
if _, ok := m.groups[userKind]; !ok {
m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
m.groups[userKind].Put(defaultKeyspaceGroup)

// Load all the keyspace groups from the storage and add to the respective userKind groups.
Expand All @@ -85,9 +81,6 @@ func (m *GroupManager) Bootstrap() error {
}
for _, group := range groups {
userKind := endpoint.StringUserKind(group.UserKind)
if _, ok := m.groups[userKind]; !ok {
m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
m.groups[userKind].Put(group)
}

Expand Down Expand Up @@ -191,20 +184,33 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
})
}

// GetAvailableKeyspaceGroupIDByKind returns the available keyspace group ID by user kind.
func (m *GroupManager) GetAvailableKeyspaceGroupIDByKind(userKind endpoint.UserKind) (string, error) {
// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) {
// when server is not in API mode, we don't need to return the keyspace config
if m == nil {
return map[string]string{}, nil
}
m.RLock()
defer m.RUnlock()
groups, ok := m.groups[userKind]
if !ok {
return "", errors.Errorf("user kind %s not found", userKind)
return map[string]string{}, errors.Errorf("user kind %s not found", userKind)
}
kg := groups.Top()
return strconv.FormatUint(uint64(kg.ID), 10), nil
id := strconv.FormatUint(uint64(kg.ID), 10)
config := map[string]string{
UserKindKey: userKind.String(),
TSOKeyspaceGroupIDKey: id,
}
return config, nil
}

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
if m == nil {
return nil
}
id, err := strconv.ParseUint(groupID, 10, 64)
if err != nil {
return err
Expand Down Expand Up @@ -236,6 +242,10 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI

// UpdateKeyspaceGroup updates the keyspace group.
func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error {
// when server is not in API mode, we don't need to update the keyspace group
if m == nil {
return nil
}
oldID, err := strconv.ParseUint(oldGroupID, 10, 64)
if err != nil {
return err
Expand Down
16 changes: 14 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/gctuner"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/schedule"
Expand Down Expand Up @@ -107,6 +108,8 @@ type Server interface {
GetBasicCluster() *core.BasicCluster
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsAPIServiceMode() bool
}

// RaftCluster is used for cluster config management.
Expand Down Expand Up @@ -156,6 +159,7 @@ type RaftCluster struct {
progressManager *progress.Manager
regionSyncer *syncer.RegionSyncer
changedRegions chan *core.RegionInfo
keyspaceGroupManager *keyspace.GroupManager
}

// Status saves some state information.
Expand Down Expand Up @@ -233,7 +237,8 @@ func (c *RaftCluster) InitCluster(
id id.Allocator,
opt *config.PersistOptions,
storage storage.Storage,
basicCluster *core.BasicCluster) {
basicCluster *core.BasicCluster,
keyspaceGroupManager *keyspace.GroupManager) {
c.core, c.opt, c.storage, c.id = basicCluster, opt, storage, id
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.labelLevelStats = statistics.NewLabelStatistics()
Expand All @@ -244,6 +249,7 @@ func (c *RaftCluster) InitCluster(
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64)
c.unsafeRecoveryController = newUnsafeRecoveryController(c)
c.keyspaceGroupManager = keyspaceGroupManager
}

// Start starts a cluster.
Expand All @@ -256,14 +262,20 @@ func (c *RaftCluster) Start(s Server) error {
c.Lock()
defer c.Unlock()

c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster())
c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager())
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
}
if cluster == nil {
return nil
}
if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
if err != nil {
return err
}
}
c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts())
if c.opt.IsPlacementRulesEnabled() {
err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels())
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,7 +1879,7 @@ func newTestRaftCluster(
basicCluster *core.BasicCluster,
) *RaftCluster {
rc := &RaftCluster{serverCtx: ctx}
rc.InitCluster(id, opt, s, basicCluster)
rc.InitCluster(id, opt, s, basicCluster, nil)
rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt)
if opt.IsPlacementRulesEnabled() {
err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels())
Expand Down
8 changes: 3 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage)
if s.IsAPIServiceMode() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage)
}
s.keyspaceManager = keyspace.NewKeyspaceManager(s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster)
// initial hot_region_storage in here.
Expand Down Expand Up @@ -679,10 +681,6 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe
return nil, err
}

if err := s.GetKeyspaceGroupManager().Bootstrap(); err != nil {
log.Warn("bootstrapping keyspace group manager failed", errs.ZapError(err))
}

if err = s.GetKeyspaceManager().Bootstrap(); err != nil {
log.Warn("bootstrapping keyspace manager failed", errs.ZapError(err))
}
Expand Down
4 changes: 0 additions & 4 deletions tests/server/apiv2/handlers/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -219,9 +218,6 @@ func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, reques
re.NoError(err)
meta := &handlers.KeyspaceMeta{}
re.NoError(json.Unmarshal(data, meta))
// When creating a keyspace, it will be assigned a keyspace group id.
request.Config[keyspace.TSOKeyspaceGroupIDKey] = "0"
request.Config[keyspace.UserKindKey] = endpoint.Basic.String()
checkCreateRequest(re, request, meta.KeyspaceMeta)
return meta.KeyspaceMeta
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) {

func (suite *keyspaceGroupTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
cluster, err := tests.NewTestCluster(suite.ctx, 1)
cluster, err := tests.NewTestAPICluster(suite.ctx, 1)
suite.cluster = cluster
suite.NoError(err)
suite.NoError(cluster.RunInitialServers())
Expand Down
6 changes: 3 additions & 3 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func TestLoadClusterInfo(t *testing.T) {
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager())
raftCluster, err := rc.LoadClusterInfo()
re.NoError(err)
re.Nil(raftCluster)
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestLoadClusterInfo(t *testing.T) {
re.NoError(testStorage.Flush())

raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster)
raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster, svr.GetKeyspaceGroupManager())
raftCluster, err = raftCluster.LoadClusterInfo()
re.NoError(err)
re.NotNil(raftCluster)
Expand Down Expand Up @@ -1438,7 +1438,7 @@ func TestTransferLeaderBack(t *testing.T) {
leaderServer := tc.GetServer(tc.GetLeader())
svr := leaderServer.GetServer()
rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster())
rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager())
storage := rc.GetStorage()
meta := &metapb.Cluster{Id: 123}
re.NoError(storage.SaveMeta(meta))
Expand Down

0 comments on commit 4bd23b2

Please sign in to comment.