From 4bd23b282a76514d52a8f3856677b239fe71dfd9 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 13 Apr 2023 12:25:00 +0800 Subject: [PATCH] *: bootstrap keyspace group when server is in API mode (#6308) ref tikv/pd#6231 Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/keyspace/keyspace.go | 53 +++++++++---------- pkg/keyspace/tso_keyspace_group.go | 32 +++++++---- server/cluster/cluster.go | 16 +++++- server/cluster/cluster_test.go | 2 +- server/server.go | 8 ++- tests/server/apiv2/handlers/keyspace_test.go | 4 -- .../apiv2/handlers/tso_keyspace_group_test.go | 2 +- tests/server/cluster/cluster_test.go | 6 +-- 8 files changed, 69 insertions(+), 54 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 28fa95dc5c0..d3383a7cebf 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -110,50 +110,46 @@ func (manager *Manager) Bootstrap() error { return err } now := time.Now().Unix() - id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(endpoint.Basic) - if err != nil { - return err - } - defaultKeyspace := &keyspacepb.KeyspaceMeta{ + defaultKeyspaceMata := &keyspacepb.KeyspaceMeta{ Id: DefaultKeyspaceID, Name: DefaultKeyspaceName, State: keyspacepb.KeyspaceState_ENABLED, CreatedAt: now, StateChangedAt: now, - Config: map[string]string{ - UserKindKey: endpoint.Basic.String(), - TSOKeyspaceGroupIDKey: id, - }, } - err = manager.saveNewKeyspace(defaultKeyspace) + + config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic) + if err != nil { + return err + } + defaultKeyspaceMata.Config = config + err = manager.saveNewKeyspace(defaultKeyspaceMata) // It's possible that default keyspace already exists in the storage (e.g. PD restart/recover), // so we ignore the keyspaceExists error. if err != nil && err != ErrKeyspaceExists { return err } - if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, defaultKeyspace.GetId(), opAdd); err != nil { + if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], defaultKeyspaceMata.GetId(), opAdd); err != nil { return err } // Initialize pre-alloc keyspace. preAlloc := manager.config.GetPreAlloc() for _, keyspaceName := range preAlloc { - id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(endpoint.Basic) + config, err := manager.kgm.GetKeyspaceConfigByKind(endpoint.Basic) if err != nil { return err } - keyspace, err := manager.CreateKeyspace(&CreateKeyspaceRequest{ - Name: keyspaceName, - Now: now, - Config: map[string]string{ - UserKindKey: endpoint.Basic.String(), - TSOKeyspaceGroupIDKey: id, - }, - }) + req := &CreateKeyspaceRequest{ + Name: keyspaceName, + Now: now, + Config: config, + } + keyspace, err := manager.CreateKeyspace(req) // Ignore the keyspaceExists error for the same reason as saving default keyspace. if err != nil && err != ErrKeyspaceExists { return err } - if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, keyspace.GetId(), opAdd); err != nil { + if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil { return err } } @@ -177,15 +173,18 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac return nil, err } userKind := endpoint.StringUserKind(request.Config[UserKindKey]) - id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(userKind) + config, err := manager.kgm.GetKeyspaceConfigByKind(userKind) if err != nil { return nil, err } - if request.Config == nil { - request.Config = make(map[string]string) + if len(config) != 0 { + if request.Config == nil { + request.Config = config + } else { + request.Config[TSOKeyspaceGroupIDKey] = config[TSOKeyspaceGroupIDKey] + request.Config[UserKindKey] = config[UserKindKey] + } } - request.Config[TSOKeyspaceGroupIDKey] = id - request.Config[UserKindKey] = userKind.String() // Create and save keyspace metadata. keyspace := &keyspacepb.KeyspaceMeta{ Id: newID, @@ -204,7 +203,7 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac ) return nil, err } - if err := manager.kgm.UpdateKeyspaceForGroup(userKind, id, keyspace.GetId(), opAdd); err != nil { + if err := manager.kgm.UpdateKeyspaceForGroup(userKind, config[TSOKeyspaceGroupIDKey], keyspace.GetId(), opAdd); err != nil { return nil, err } log.Info("[keyspace] keyspace created", diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 4c69704d731..cfe4308ff48 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -72,10 +72,6 @@ func (m *GroupManager) Bootstrap() error { } userKind := endpoint.StringUserKind(defaultKeyspaceGroup.UserKind) - // If the group for the userKind does not exist, create a new one. - if _, ok := m.groups[userKind]; !ok { - m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse)) - } m.groups[userKind].Put(defaultKeyspaceGroup) // Load all the keyspace groups from the storage and add to the respective userKind groups. @@ -85,9 +81,6 @@ func (m *GroupManager) Bootstrap() error { } for _, group := range groups { userKind := endpoint.StringUserKind(group.UserKind) - if _, ok := m.groups[userKind]; !ok { - m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse)) - } m.groups[userKind].Put(group) } @@ -191,20 +184,33 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro }) } -// GetAvailableKeyspaceGroupIDByKind returns the available keyspace group ID by user kind. -func (m *GroupManager) GetAvailableKeyspaceGroupIDByKind(userKind endpoint.UserKind) (string, error) { +// GetKeyspaceConfigByKind returns the keyspace config for the given user kind. +func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) { + // when server is not in API mode, we don't need to return the keyspace config + if m == nil { + return map[string]string{}, nil + } m.RLock() defer m.RUnlock() groups, ok := m.groups[userKind] if !ok { - return "", errors.Errorf("user kind %s not found", userKind) + return map[string]string{}, errors.Errorf("user kind %s not found", userKind) } kg := groups.Top() - return strconv.FormatUint(uint64(kg.ID), 10), nil + id := strconv.FormatUint(uint64(kg.ID), 10) + config := map[string]string{ + UserKindKey: userKind.String(), + TSOKeyspaceGroupIDKey: id, + } + return config, nil } // UpdateKeyspaceForGroup updates the keyspace field for the keyspace group. func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error { + // when server is not in API mode, we don't need to update the keyspace for keyspace group + if m == nil { + return nil + } id, err := strconv.ParseUint(groupID, 10, 64) if err != nil { return err @@ -236,6 +242,10 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI // UpdateKeyspaceGroup updates the keyspace group. func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error { + // when server is not in API mode, we don't need to update the keyspace group + if m == nil { + return nil + } oldID, err := strconv.ParseUint(oldGroupID, 10, 64) if err != nil { return err diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c4b9db33ea5..92ad05176ab 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" + "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" "github.com/tikv/pd/pkg/schedule" @@ -107,6 +108,8 @@ type Server interface { GetBasicCluster() *core.BasicCluster GetMembers() ([]*pdpb.Member, error) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error + GetKeyspaceGroupManager() *keyspace.GroupManager + IsAPIServiceMode() bool } // RaftCluster is used for cluster config management. @@ -156,6 +159,7 @@ type RaftCluster struct { progressManager *progress.Manager regionSyncer *syncer.RegionSyncer changedRegions chan *core.RegionInfo + keyspaceGroupManager *keyspace.GroupManager } // Status saves some state information. @@ -233,7 +237,8 @@ func (c *RaftCluster) InitCluster( id id.Allocator, opt *config.PersistOptions, storage storage.Storage, - basicCluster *core.BasicCluster) { + basicCluster *core.BasicCluster, + keyspaceGroupManager *keyspace.GroupManager) { c.core, c.opt, c.storage, c.id = basicCluster, opt, storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() @@ -244,6 +249,7 @@ func (c *RaftCluster) InitCluster( c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64) c.unsafeRecoveryController = newUnsafeRecoveryController(c) + c.keyspaceGroupManager = keyspaceGroupManager } // Start starts a cluster. @@ -256,7 +262,7 @@ func (c *RaftCluster) Start(s Server) error { c.Lock() defer c.Unlock() - c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster()) + c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager()) cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -264,6 +270,12 @@ func (c *RaftCluster) Start(s Server) error { if cluster == nil { return nil } + if s.IsAPIServiceMode() { + err = c.keyspaceGroupManager.Bootstrap() + if err != nil { + return err + } + } c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fa7a8c973ab..e6165eac07c 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1879,7 +1879,7 @@ func newTestRaftCluster( basicCluster *core.BasicCluster, ) *RaftCluster { rc := &RaftCluster{serverCtx: ctx} - rc.InitCluster(id, opt, s, basicCluster) + rc.InitCluster(id, opt, s, basicCluster, nil) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) diff --git a/server/server.go b/server/server.go index 06ad2198288..77554605914 100644 --- a/server/server.go +++ b/server/server.go @@ -435,7 +435,9 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), Step: keyspace.AllocStep, }) - s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage) + if s.IsAPIServiceMode() { + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage) + } s.keyspaceManager = keyspace.NewKeyspaceManager(s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster) // initial hot_region_storage in here. @@ -679,10 +681,6 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe return nil, err } - if err := s.GetKeyspaceGroupManager().Bootstrap(); err != nil { - log.Warn("bootstrapping keyspace group manager failed", errs.ZapError(err)) - } - if err = s.GetKeyspaceManager().Bootstrap(); err != nil { log.Warn("bootstrapping keyspace manager failed", errs.ZapError(err)) } diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index 2a3a197c740..b9b9742b2dc 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/keyspace" - "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -219,9 +218,6 @@ func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, reques re.NoError(err) meta := &handlers.KeyspaceMeta{} re.NoError(json.Unmarshal(data, meta)) - // When creating a keyspace, it will be assigned a keyspace group id. - request.Config[keyspace.TSOKeyspaceGroupIDKey] = "0" - request.Config[keyspace.UserKindKey] = endpoint.Basic.String() checkCreateRequest(re, request, meta.KeyspaceMeta) return meta.KeyspaceMeta } diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index c3ee1e06a3b..d256e0abc00 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -46,7 +46,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) - cluster, err := tests.NewTestCluster(suite.ctx, 1) + cluster, err := tests.NewTestAPICluster(suite.ctx, 1) suite.cluster = cluster suite.NoError(err) suite.NoError(cluster.RunInitialServers()) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 9761ee3415d..b576ef08b64 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -813,7 +813,7 @@ func TestLoadClusterInfo(t *testing.T) { rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) // Cluster is not bootstrapped. - rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster()) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager()) raftCluster, err := rc.LoadClusterInfo() re.NoError(err) re.Nil(raftCluster) @@ -851,7 +851,7 @@ func TestLoadClusterInfo(t *testing.T) { re.NoError(testStorage.Flush()) raftCluster = cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) - raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster) + raftCluster.InitCluster(mockid.NewIDAllocator(), svr.GetPersistOptions(), testStorage, basicCluster, svr.GetKeyspaceGroupManager()) raftCluster, err = raftCluster.LoadClusterInfo() re.NoError(err) re.NotNil(raftCluster) @@ -1438,7 +1438,7 @@ func TestTransferLeaderBack(t *testing.T) { leaderServer := tc.GetServer(tc.GetLeader()) svr := leaderServer.GetServer() rc := cluster.NewRaftCluster(ctx, svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient(), svr.GetHTTPClient()) - rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster()) + rc.InitCluster(svr.GetAllocator(), svr.GetPersistOptions(), svr.GetStorage(), svr.GetBasicCluster(), svr.GetKeyspaceGroupManager()) storage := rc.GetStorage() meta := &metapb.Cluster{Id: 123} re.NoError(storage.SaveMeta(meta))