Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move keyspace group primary path code to key_path.go #6755

Merged
merged 7 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package discovery
import (
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils"
)

const (
registryPrefix = "/ms"
registryKey = "registry"
registryKey = "registry"
)

// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand Down Expand Up @@ -366,10 +367,10 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)
utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down
12 changes: 3 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/url"
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -63,12 +63,6 @@ import (
)

const (
// pdRootPath is the old path for storing the tso related root path.
pdRootPath = "/pd"
msServiceRootPath = "/ms"
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName
// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -535,8 +529,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
Expand All @@ -59,6 +59,8 @@ const (
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"
// KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups.
KeyspaceGroupsPrimaryKey = "primary"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
80 changes: 72 additions & 8 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

const (
pdRootPath = "/pd"
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
Expand Down Expand Up @@ -58,8 +59,9 @@ const (
// TimestampKey is the key of timestamp oracle used for the suffix.
TimestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -230,13 +232,13 @@ func EncodeKeyspaceID(spaceID uint32) string {
// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id))
}

// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Expand All @@ -245,6 +247,54 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
return regexp.MustCompile(pattern)
}

// ResourceManagerSvcRootPath returns the root path of resource manager service.
// Path: /ms/{cluster_id}/resource_manager
func ResourceManagerSvcRootPath(clusterID uint64) string {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
// Path: /ms/{cluster_id}/tso
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// Path: /pd/{cluster_id}
func LegacyRootPath(clusterID uint64) string {
return path.Join(pdRootPath, strconv.FormatUint(clusterID, 10))
}

// KeyspaceGroupPrimaryPath returns the path of keyspace group primary.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID)
return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string {
if keyspaceGroupID == utils.DefaultKeyspaceGroupID {
return path.Join(rootPath, "00000")
}
return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID))
}

// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp {
rootPath := TSOSvcRootPath(clusterID)
pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand All @@ -264,19 +314,33 @@ func buildPath(withSuffix bool, str ...string) string {
return sb.String()
}

// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func GetKeyspaceGroupTSPath(groupID uint32) string {
func KeyspaceGroupTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func GetTimestampPath(tsPath string) string {
// TimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func TimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
}

// FullTimestampPath returns the full timestamp path.
// 1. for the default keyspace group:
// /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// /ms/{cluster_id}/tso/{group}/gta/timestamp
func FullTimestampPath(clusterID uint64, groupID uint32) string {
rootPath := TSOSvcRootPath(clusterID)
tsPath := TimestampPath(KeyspaceGroupTSPath(groupID))
if groupID == utils.DefaultKeyspaceGroupID {
rootPath = LegacyRootPath(clusterID)
}
return path.Join(rootPath, tsPath)
}
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID),
tsPath: endpoint.KeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
47 changes: 6 additions & 41 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -51,9 +50,6 @@ import (
)

const (
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
// mergingCheckInterval is the interval for merging check to see if the keyspace groups
// merging process could be moved forward.
mergingCheckInterval = 5 * time.Second
Expand Down Expand Up @@ -256,32 +252,6 @@ func (s *state) getNextPrimaryToReset(
return nil, nil, 0, groupID
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -355,7 +325,6 @@ type KeyspaceGroupManager struct {
// mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group.
mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc

primaryPathBuilder *kgPrimaryPathBuilder
primaryPriorityCheckInterval time.Duration

// tsoNodes is the registered tso servers.
Expand Down Expand Up @@ -406,10 +375,6 @@ func NewKeyspaceGroupManager(
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand Down Expand Up @@ -718,8 +683,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, endpoint.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID),
mcsutils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1276,7 +1241,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey)
leaderPath := endpoint.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down Expand Up @@ -1305,7 +1270,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// calculate the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id))
if err != nil {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1455,8 +1420,8 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.GetTimestampPath(
endpoint.GetKeyspaceGroupTSPath(groupID),
endpoint.TimestampPath(
endpoint.KeyspaceGroupTSPath(groupID),
),
)
if err != nil {
Expand Down
Loading