diff --git a/client/client.go b/client/client.go index 22b037d1ede..249caa4d149 100644 --- a/client/client.go +++ b/client/client.go @@ -1167,7 +1167,9 @@ func IsLeaderChange(err error) bool { return true } errMsg := err.Error() - return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr) + return strings.Contains(errMsg, errs.NotLeaderErr) || + strings.Contains(errMsg, errs.MismatchLeaderErr) || + strings.Contains(errMsg, errs.NotServedErr) } func trimHTTPPrefix(str string) string { diff --git a/client/errs/errno.go b/client/errs/errno.go index 43c08396f1e..9ed860681be 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -21,11 +21,19 @@ import ( ) const ( - // NotLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. NotLeaderErr = "is not leader" - // MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" - RetryTimeoutErr = "retry timeout" + // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. + NotServedErr = "is not served" + RetryTimeoutErr = "retry timeout" ) // client errors diff --git a/errors.toml b/errors.toml index 440015b7bf5..5b9ecd0a345 100644 --- a/errors.toml +++ b/errors.toml @@ -1,6 +1,16 @@ # AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen # YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. +["ErrLoadKeyspaceGroupsTerminated"] +error = ''' +load keyspace groups terminated +''' + +["ErrLoadKeyspaceGroupsTimeout"] +error = ''' +load keyspace groups timeout +''' + ["PD:ErrEncryptionKMS"] error = ''' KMS error @@ -731,11 +741,21 @@ error = ''' get allocator failed, %s ''' +["PD:tso:ErrGetAllocatorManager"] +error = ''' +get allocator manager failed, %s +''' + ["PD:tso:ErrGetLocalAllocator"] error = ''' get local allocator failed, %s ''' +["PD:tso:ErrKeyspaceGroupIDInvalid"] +error = ''' +the keyspace group id is invalid, %s +''' + ["PD:tso:ErrLogicOverflow"] error = ''' logic part overflow diff --git a/go.mod b/go.mod index 4c4090694b9..fd7a07c0d53 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 84793917ca9..1deb285df94 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -17,10 +17,18 @@ package errs import "github.com/pingcap/errors" const ( - // NotLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. NotLeaderErr = "is not leader" - // MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" + // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. + NotServedErr = "is not served" ) // common error in multiple packages @@ -31,14 +39,18 @@ var ( // tso errors var ( - ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) - ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) - ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) - ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) - ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) - ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) - ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) - ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) + ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) + ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) + ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) + ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) + ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) + ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) + ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid")) + ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager")) + ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) + ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) ) // member errors diff --git a/pkg/mcs/tso/server/handler.go b/pkg/mcs/tso/server/handler.go index 40ddd2f726b..14f83bccc61 100644 --- a/pkg/mcs/tso/server/handler.go +++ b/pkg/mcs/tso/server/handler.go @@ -17,6 +17,7 @@ package server import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/tso" "go.uber.org/zap" ) @@ -31,12 +32,18 @@ func newHandler(s *Server) *Handler { } // ResetTS resets the ts with specified tso. +// TODO: Support multiple keyspace groups. func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error { log.Info("reset-ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) - tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation) + tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get allocator manager", errs.ZapError(err)) + return err + } + tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) if err != nil { return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index db8e8a239f2..642952b16e4 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -111,7 +111,10 @@ type Server struct { // Callback functions for different stages // startCallbacks will be called after the server is started. - startCallbacks []func() + startCallbacks []func() + + // for service registry + serviceID *discovery.ServiceRegistryEntry serviceRegister *discovery.ServiceRegister } @@ -199,14 +202,30 @@ func (s *Server) AddStartCallback(callbacks ...func()) { // IsServing implements basicserver. It returns whether the server is the leader // if there is embedded etcd, or the primary otherwise. +// TODO: support multiple keyspace groups func (s *Server) IsServing() bool { - return atomic.LoadInt64(&s.isRunning) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader() + if atomic.LoadInt64(&s.isRunning) == 0 { + return false + } + + member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get election member", errs.ZapError(err)) + return false + } + return member.IsLeader() } // GetLeaderListenUrls gets service endpoints from the leader in election group. // The entry at the index 0 is the primary's service endpoint. func (s *Server) GetLeaderListenUrls() []string { - return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls() + member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get election member", errs.ZapError(err)) + return nil + } + + return member.GetLeaderListenUrls() } // AddServiceReadyCallback implements basicserver. @@ -229,8 +248,8 @@ func (s *Server) IsClosed() bool { } // GetTSOAllocatorManager returns the manager of TSO Allocator. -func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { - return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID) +func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) { + return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) } // IsLocalRequest checks if the forwarded host is the current host @@ -416,11 +435,16 @@ func (s *Server) startServer() (err error) { } s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) + legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) + s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg) - s.keyspaceGroupManager.Initialize() + s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) + // The param `false` means that we don't initialize the keyspace group manager + // by loading the keyspace group meta from etcd. + if err := s.keyspaceGroupManager.Initialize(false); err != nil { + return err + } s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.service = &Service{Server: s} @@ -448,8 +472,7 @@ func (s *Server) startServer() (err error) { } // Server has started. - entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} - serializedEntry, err := entry.Serialize() + serializedEntry, err := s.serviceID.Serialize() if err != nil { return err } diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 1cf258c08f9..f84459fbd7d 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -17,6 +17,7 @@ package endpoint import ( "fmt" "path" + "regexp" "strconv" "strings" @@ -239,6 +240,22 @@ func KeyspaceGroupIDPath(id uint32) string { return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id)) } +// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains +// the pattern of `tso/keyspace_groups/membership/(\d{5})$`. +func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) { + pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/") + re := regexp.MustCompile(pattern) + match := re.FindStringSubmatch(path) + if match == nil { + return 0, fmt.Errorf("invalid keyspace group id path: %s", path) + } + id, err := strconv.ParseUint(match[1], 10, 32) + if err != nil { + return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err) + } + return uint32(id), nil +} + // encodeKeyspaceGroupID from uint32 to string. func encodeKeyspaceGroupID(groupID uint32) string { return fmt.Sprintf("%05d", groupID) diff --git a/pkg/storage/endpoint/key_path_test.go b/pkg/storage/endpoint/key_path_test.go index d6ef584105a..270d1e266fe 100644 --- a/pkg/storage/endpoint/key_path_test.go +++ b/pkg/storage/endpoint/key_path_test.go @@ -27,3 +27,51 @@ func BenchmarkRegionPath(b *testing.B) { _ = RegionPath(uint64(i)) } } + +func TestExtractKeyspaceGroupIDFromPath(t *testing.T) { + re := require.New(t) + + rightCases := []struct { + path string + id uint32 + }{ + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999}, + {path: "tso/keyspace_groups/membership/00000", id: 0}, + {path: "tso/keyspace_groups/membership/00001", id: 1}, + {path: "tso/keyspace_groups/membership/12345", id: 12345}, + {path: "tso/keyspace_groups/membership/99999", id: 99999}, + } + + for _, tt := range rightCases { + id, err := ExtractKeyspaceGroupIDFromPath(tt.path) + re.Equal(tt.id, id) + re.NoError(err) + } + + wrongCases := []struct { + path string + }{ + {path: ""}, + {path: "00001"}, + {path: "xxx/keyspace_groups/membership/00001"}, + {path: "tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"}, + {path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"}, + } + + for _, tt := range wrongCases { + _, err := ExtractKeyspaceGroupIDFromPath(tt.path) + re.Error(err) + } +} diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 2d65dfee28a..3e4b5f2235e 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -22,11 +22,19 @@ import ( "go.etcd.io/etcd/clientv3" ) +// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group. +type KeyspaceGroupMember struct { + Address string `json:"address"` +} + // KeyspaceGroup is the keyspace group. type KeyspaceGroup struct { ID uint32 `json:"id"` UserKind string `json:"user-kind"` - // TODO: add `Members` field + // Members are the election members which campaign for the primary of the keyspace group. + Members []KeyspaceGroupMember `json:"members"` + // Keyspaces are the keyspace IDs which belong to the keyspace group. + Keyspaces []uint32 `json:"keyspaces"` } // KeyspaceGroupStorage is the interface for keyspace group storage. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 362ff6763b4..c074e25a6d9 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -294,8 +294,8 @@ func (am *AllocatorManager) close() { log.Info("closed the allocator manager") } -func (am *AllocatorManager) getMember() *ElectionMember { - return &am.member +func (am *AllocatorManager) getMember() ElectionMember { + return am.member } // SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location @@ -1072,6 +1072,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation)) return pdpb.Timestamp{}, err } + return allocatorGroup.allocator.GenerateTSO(count) } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 09febeccc3b..d79a9b2f0b7 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -193,7 +193,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // Have dc-locations configured in the cluster, use the Global TSO generation way. // (whit synchronization with other Local TSO Allocators) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(gta.ctx) defer cancel() for i := 0; i < maxRetryCount; i++ { var ( @@ -237,7 +237,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) skipCheck = true goto SETTING_PHASE } - // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valide. + // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 { tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc() } @@ -309,7 +309,7 @@ type syncResp struct { // SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap. // If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into -// each allocator and remines the same after the synchronization. +// each allocator and remains the same after the synchronization. // If not, it will be replaced with the new max Local TSO and return. func (gta *GlobalTSOAllocator) SyncMaxTS( ctx context.Context, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2598da4a6b0..e2cfba24658 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -16,53 +16,81 @@ package tso import ( "context" + "encoding/json" "fmt" "path" + "strings" + "sync" + "sync/atomic" + "time" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) -// primaryElectionSuffix is the suffix of the key for keyspace group primary election -const primaryElectionSuffix = "primary" +const ( + // primaryElectionSuffix is the suffix of the key for keyspace group primary election + primaryElectionSuffix = "primary" + // defaultLoadKeyspaceGroupsTimeout is the default timeout for loading the initial + // keyspace group assignment + defaultLoadKeyspaceGroupsTimeout = 30 * time.Second + defaultLoadKeyspaceGroupsBatchSize = int64(400) + loadFromEtcdMaxRetryTimes = 6 + loadFromEtcdRetryInterval = 500 * time.Millisecond + watchKEtcdChangeRetryInterval = 1 * time.Second +) -// KeyspaceGroupManager manages the primary/secondaries of the keyspace groups -// assigned to this host. The primaries provide the tso service for the corresponding +// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. +// The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. type KeyspaceGroupManager struct { - // ksgAllocatorManagers[i] stores the AllocatorManager of the keyspace group i. + // ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned + // with an allocator manager managing its global/local tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. - // TODO: change item type to atomic.Value stored as *AllocatorManager after we - // support online keyspace group assignment. - ksgAllocatorManagers [mcsutils.MaxKeyspaceGroupCountInUse]*AllocatorManager + ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager] + // ksgs stores the keyspace groups' membership/distribution meta. + ksgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup] + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - etcdClient *clientv3.Client + // tsoServiceID is the service ID of the TSO service, registered in the service discovery + tsoServiceID *discovery.ServiceRegistryEntry + etcdClient *clientv3.Client // electionNamePrefix is the name prefix to generate the unique name of a participant, // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string - // defaultKsgStorageTSRootPath is the root path of the default keyspace group in the - // storage endpoiont which is used for LoadTimestamp/SaveTimestamp. - // This is the legacy root path in the format of "/pd/{cluster_id}". - // Below is the entire path of in the legacy format (used by the default keyspace group) - // Key: /pd/{cluster_id}/timestamp - // Value: ts(time.Time) - // Key: /pd/{cluster_id}/lta/{dc-location}/timestamp - // Value: ts(time.Time) - defaultKsgStorageTSRootPath string - // tsoSvcRootPath defines the root path for all etcd paths used for different purposes. + // legacySvcRootPath defines the legacy root path for all etcd paths which derives from + // the PD/API service. It's in the format of "/pd/{cluster_id}". + // The main paths for different usages include: + // 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the + // storage endpoint. + // Key: /pd/{cluster_id}/timestamp + // Value: ts(time.Time) + // Key: /pd/{cluster_id}/lta/{dc-location}/timestamp + // Value: ts(time.Time) + // 2. The path for storing keyspace group membership/distribution metadata. + // Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} + // Value: endpoint.KeyspaceGroup + // Note: The {group} is 5 digits integer with leading zeros. + legacySvcRootPath string + // tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices. // It is in the format of "/ms//tso". - // The main paths for different usages in the tso microservice include: + // The main paths for different usages include: // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. @@ -72,16 +100,24 @@ type KeyspaceGroupManager struct { // Value: ts(time.Time) // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string + // legacySvcStorage is storage with legacySvcRootPath. + legacySvcStorage *endpoint.StorageEndpoint + // tsoSvcStorage is storage with tsoSvcRootPath. + tsoSvcStorage *endpoint.StorageEndpoint // cfg is the TSO config cfg ServiceConfig + // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment. + loadKeyspaceGroupsTimeout time.Duration + loadKeyspaceGroupsBatchSize int64 } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. func NewKeyspaceGroupManager( ctx context.Context, + tsoServiceID *discovery.ServiceRegistryEntry, etcdClient *clientv3.Client, electionNamePrefix string, - defaultKsgStorageTSRootPath string, + legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, ) *KeyspaceGroupManager { @@ -92,60 +128,380 @@ func NewKeyspaceGroupManager( } ctx, cancel := context.WithCancel(ctx) - ksgMgr := &KeyspaceGroupManager{ + kgm := &KeyspaceGroupManager{ ctx: ctx, cancel: cancel, + tsoServiceID: tsoServiceID, etcdClient: etcdClient, electionNamePrefix: electionNamePrefix, - defaultKsgStorageTSRootPath: defaultKsgStorageTSRootPath, + legacySvcRootPath: legacySvcRootPath, tsoSvcRootPath: tsoSvcRootPath, cfg: cfg, + loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout, + loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize, } - return ksgMgr + kgm.legacySvcStorage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) + kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) + return kgm } // Initialize this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Initialize() { - // TODO: dynamically load keyspace group assignment from the persistent storage and add watch for the assignment change - kgm.initDefaultKeyspaceGroup() +func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { + // Initialize the default keyspace group if not loading from storage + if !loadFromStorage { + group := &endpoint.KeyspaceGroup{ + ID: mcsutils.DefaultKeySpaceGroupID, + Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, + Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, + } + kgm.updateKeyspaceGroup(group) + return nil + } + + // Load the initial keyspace group assignment from storage with time limit + done := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(kgm.ctx) + go kgm.checkInitProgress(ctx, cancel, done) + watchStartRevision, err := kgm.initAssignment(ctx) + done <- struct{}{} + if err != nil { + log.Error("failed to initialize keyspace group manager", errs.ZapError(err)) + // We might have partially loaded/initialized the keyspace groups. Close the manager to clean up. + kgm.Close() + return err + } + + // Watch/apply keyspace group membership/distribution meta changes dynamically. + kgm.wg.Add(1) + go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision) + + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.closeKeyspaceGroups() + + log.Info("keyspace group manager closed") +} + +func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() { + log.Info("closing all keyspace groups") + + wg := sync.WaitGroup{} + for i := range kgm.ams { + if am := kgm.ams[i].Load(); am != nil { + wg.Add(1) + go func(am *AllocatorManager) { + defer wg.Done() + am.close() + log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.ksgID)) + }(am) + } + } + wg.Wait() + + log.Info("All keyspace groups closed") +} + +func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) { + select { + case <-done: + return + case <-time.After(kgm.loadKeyspaceGroupsTimeout): + log.Error("failed to initialize keyspace group manager", + zap.Any("timeout-setting", kgm.loadKeyspaceGroupsTimeout), + errs.ZapError(errs.ErrLoadKeyspaceGroupsTimeout)) + cancel() + case <-ctx.Done(): + } + <-done } -// Initialize this the default keyspace group -func (kgm *KeyspaceGroupManager) initDefaultKeyspaceGroup() { - uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, mcsutils.DefaultKeySpaceGroupID) - uniqueID := memberutil.GenerateUniqueID(uniqueName) - log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) +// initAssignment loads initial keyspace group assignment from storage and initialize the group manager. +func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) { + var ( + // The start revision for watching keyspace group membership/distribution change + watchStartRevision int64 + groups []*endpoint.KeyspaceGroup + more bool + err error + keyspaceGroupsLoaded uint32 + revision int64 + ) + + // Load all keyspace groups from etcd and apply the ones assigned to this tso service. + for { + revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize) + if err != nil { + return 0, err + } + + keyspaceGroupsLoaded += uint32(len(groups)) + + if watchStartRevision == 0 || revision < watchStartRevision { + watchStartRevision = revision + } + + // Update the keyspace groups + for _, group := range groups { + select { + case <-ctx.Done(): + return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated + default: + } - participant := member.NewParticipant(kgm.etcdClient) - participant.InitInfo(uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", mcsutils.DefaultKeySpaceGroupID)), - primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + kgm.updateKeyspaceGroup(group) + } - defaultKsgGroupStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(kgm.etcdClient, kgm.defaultKsgStorageTSRootPath), nil) - kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID] = - NewAllocatorManager( - kgm.ctx, mcsutils.DefaultKeySpaceGroupID, participant, - kgm.defaultKsgStorageTSRootPath, defaultKsgGroupStorage, - kgm.cfg, true) + if !more { + break + } + } + + log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded)) + return watchStartRevision, nil +} + +// loadKeyspaceGroups loads keyspace groups from the start ID with limit. +// If limit is 0, it will load all keyspace groups from the start ID. +func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( + ctx context.Context, startID uint32, limit int64, +) (revison int64, ksgs []*endpoint.KeyspaceGroup, more bool, err error) { + rootPath := kgm.legacySvcRootPath + startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(startID)}, "/") + endKey := strings.Join( + []string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/") + opOption := []clientv3.OpOption{clientv3.WithRange(endKey), clientv3.WithLimit(limit)} + + var resp *clientv3.GetResponse + for i := 0; i < loadFromEtcdMaxRetryTimes; i++ { + resp, err = etcdutil.EtcdKVGet(kgm.etcdClient, startKey, opOption...) + if err == nil && resp != nil { + break + } + select { + case <-ctx.Done(): + return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsTerminated + case <-time.After(loadFromEtcdRetryInterval): + } + } + + kgs := make([]*endpoint.KeyspaceGroup, 0, len(resp.Kvs)) + for _, item := range resp.Kvs { + kg := &endpoint.KeyspaceGroup{} + if err = json.Unmarshal(item.Value, kg); err != nil { + return 0, nil, false, err + } + kgs = append(kgs, kg) + } + + if resp.Header != nil { + revison = resp.Header.Revision + } + + return revison, kgs, resp.More, nil +} + +// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution +// and apply the change dynamically. +func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) { + defer logutil.LogPanic() + defer kgm.wg.Done() + + // Repeatedly watch/apply keyspace group membership/distribution changes until the context is canceled. + for { + select { + case <-kgm.ctx.Done(): + return + default: + } + + nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision) + if err != nil { + log.Error("watcher canceled unexpectedly. Will start a new watcher after a while", + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Error(err)) + time.Sleep(watchKEtcdChangeRetryInterval) + } + } +} + +// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution +// and apply the change dynamically. +func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (int64, error) { + watcher := clientv3.NewWatcher(kgm.etcdClient) + defer watcher.Close() + + ksgPrefix := strings.Join([]string{kgm.legacySvcRootPath, endpoint.KeyspaceGroupIDPrefix()}, "/") + + for { + watchChan := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range watchChan { + if wresp.CompactRevision != 0 { + log.Warn("Required revision has been compacted, the watcher will watch again with the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + break + } + if wresp.Err() != nil { + log.Error("watch is canceled or closed", + zap.Int64("required-revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return revision, wresp.Err() + } + for _, event := range wresp.Events { + id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) + if err != nil { + log.Warn("failed to extract keyspace group ID from the key path", + zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) + continue + } + + switch event.Type { + case clientv3.EventTypePut: + group := &endpoint.KeyspaceGroup{} + if err := json.Unmarshal(event.Kv.Value, group); err != nil { + log.Warn("failed to unmarshal keyspace group", + zap.Uint32("keysapce-group-id", id), + zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) + } else { + kgm.updateKeyspaceGroup(group) + } + case clientv3.EventTypeDelete: + kgm.deleteKeyspaceGroup(id) + } + } + revision = wresp.Header.Revision + } + + select { + case <-kgm.ctx.Done(): + return revision, nil + default: + } + } +} + +func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { + for _, member := range group.Members { + if member.Address == kgm.tsoServiceID.ServiceAddr { + return true + } + } + return false +} + +// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to +// this host/pod, it will join the primary election. +func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGroup) { + if group.ID >= uint32(len(kgm.ams)) { + log.Warn("keyspace group ID is out of range, ignore it", + zap.Uint32("keyspace-group-id", group.ID), zap.Int("max-keyspace-group-id", len(kgm.ams)-1)) + return + } + + assignedToMe := kgm.isAssignedToMe(group) + if assignedToMe { + if kgm.ams[group.ID].Load() != nil { + log.Info("keyspace group already initialized, so update meta only", + zap.Uint32("keyspace-group-id", group.ID)) + kgm.ksgs[group.ID].Store(group) + return + } + + uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID) + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("participant-name", uniqueName), + zap.Uint64("participant-id", uniqueID)) + + participant := member.NewParticipant(kgm.etcdClient) + participant.InitInfo( + uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)), + primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + + // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. + var ( + tsRootPath string + storage *endpoint.StorageEndpoint + ) + if group.ID == mcsutils.DefaultKeySpaceGroupID { + tsRootPath = kgm.legacySvcRootPath + storage = kgm.legacySvcStorage + } else { + tsRootPath = kgm.tsoSvcRootPath + storage = kgm.tsoSvcStorage + } + + kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)) + kgm.ksgs[group.ID].Store(group) + } else { + // Not assigned to me. If this host/pod owns this keyspace group, it should resign. + kgm.deleteKeyspaceGroup(group.ID) + } +} + +// deleteKeyspaceGroup deletes the given keyspace group. +func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) { + kgm.ksgs[id].Store(nil) + am := kgm.ams[id].Swap(nil) + if am == nil { + return + } + am.close() + log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id)) } // GetAllocatorManager returns the AllocatorManager of the given keyspace group -func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) *AllocatorManager { - return kgm.ksgAllocatorManagers[keyspaceGroupID] +func (kgm *KeyspaceGroupManager) GetAllocatorManager(id uint32) (*AllocatorManager, error) { + if err := kgm.checkKeySpaceGroupID(id); err != nil { + return nil, err + } + if am := kgm.ams[id].Load(); am != nil { + return am, nil + } + return nil, errs.ErrGetAllocatorManager.FastGenByArgs( + fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", id, errs.NotServedErr)) } // GetElectionMember returns the election member of the given keyspace group -func (kgm *KeyspaceGroupManager) GetElectionMember(keyspaceGroupID uint32) ElectionMember { - return *kgm.ksgAllocatorManagers[keyspaceGroupID].getMember() +func (kgm *KeyspaceGroupManager) GetElectionMember(id uint32) (ElectionMember, error) { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + return nil, err + } + return am.getMember(), nil } // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. -func (kgm *KeyspaceGroupManager) HandleTSORequest(keyspaceGroupID uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) { - return kgm.ksgAllocatorManagers[keyspaceGroupID].HandleRequest(dcLocation, count) +func (kgm *KeyspaceGroupManager) HandleTSORequest(id uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + return pdpb.Timestamp{}, err + } + return am.HandleRequest(dcLocation, count) } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - kgm.cancel() - kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID].close() +func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { + if id < mcsutils.MaxKeyspaceGroupCountInUse { + return nil + } + return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs( + fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index c8f7f1a81c2..8b9a0b3d6c5 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -16,56 +16,414 @@ package tso import ( "context" + "encoding/json" + "fmt" + "math/rand" "path" + "reflect" + "sort" + "strconv" + "strings" + "sync" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/mcs/utils" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/discovery" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/testutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/goleak" ) -func TestNewKeyspaceGroupManager(t *testing.T) { - re := require.New(t) - backendpoints, etcdClient, clean := startEmbeddedEtcd(t) - defer clean() +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.LeakOptions...) +} + +type keyspaceGroupManagerTestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + backendEndpoints string + etcdClient *clientv3.Client + clean func() + cfg *TestServiceConfig +} + +func TestKeyspaceGroupManagerTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceGroupManagerTestSuite)) +} - cfg := &TestServiceConfig{ +func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { + t := suite.T() + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + + suite.cfg = &TestServiceConfig{ Name: "tso-test-name", - BackendEndpoints: backendpoints, + BackendEndpoints: suite.backendEndpoints, ListenAddr: "http://127.0.0.1:3379", AdvertiseListenAddr: "http://127.0.0.1:3379", - LeaderLease: utils.DefaultLeaderLease, + LeaderLease: mcsutils.DefaultLeaderLease, LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, - TSOSaveInterval: time.Duration(utils.DefaultLeaderLease) * time.Second, + TSOSaveInterval: time.Duration(mcsutils.DefaultLeaderLease) * time.Second, MaxResetTSGap: time.Hour * 24, TLSConfig: nil, } +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defaultKsgStorageTSRootPath := path.Join("/pd/1") - tsoSvcRootPath := "/ms/1/tso" - electionNamePrefix := "tso-server-1" +func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { + suite.clean() + suite.cancel() +} - keyspaceGroupManager := NewKeyspaceGroupManager( - ctx, etcdClient, electionNamePrefix, defaultKsgStorageTSRootPath, tsoSvcRootPath, cfg) - keyspaceGroupManager.Initialize() +// TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. +// It should initialize the allocator manager with the desired configurations and parameters. +func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { + re := suite.Require() - re.Equal(etcdClient, keyspaceGroupManager.etcdClient) - re.Equal(electionNamePrefix, keyspaceGroupManager.electionNamePrefix) - re.Equal(defaultKsgStorageTSRootPath, keyspaceGroupManager.defaultKsgStorageTSRootPath) - re.Equal(tsoSvcRootPath, keyspaceGroupManager.tsoSvcRootPath) - re.Equal(cfg, keyspaceGroupManager.cfg) + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} + guid := uuid.New().String() + legacySvcRootPath := path.Join("/pd", guid) + tsoSvcRootPath := path.Join("/ms", guid, "tso") + electionNamePrefix := "tso-server-" + guid - am := keyspaceGroupManager.GetAllocatorManager(utils.DefaultKeySpaceGroupID) + ksgMgr := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, suite.cfg) + err := ksgMgr.Initialize(false) + re.NoError(err) + + re.Equal(tsoServiceID, ksgMgr.tsoServiceID) + re.Equal(suite.etcdClient, ksgMgr.etcdClient) + re.Equal(electionNamePrefix, ksgMgr.electionNamePrefix) + re.Equal(legacySvcRootPath, ksgMgr.legacySvcRootPath) + re.Equal(tsoSvcRootPath, ksgMgr.tsoSvcRootPath) + re.Equal(suite.cfg, ksgMgr.cfg) + re.Equal(defaultLoadKeyspaceGroupsBatchSize, ksgMgr.loadKeyspaceGroupsBatchSize) + re.Equal(defaultLoadKeyspaceGroupsTimeout, ksgMgr.loadKeyspaceGroupsTimeout) + + am, err := ksgMgr.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + re.NoError(err) re.False(am.enableLocalTSO) - re.Equal(utils.DefaultKeySpaceGroupID, am.ksgID) - re.Equal(utils.DefaultLeaderLease, am.leaderLease) + re.Equal(mcsutils.DefaultKeySpaceGroupID, am.ksgID) + re.Equal(mcsutils.DefaultLeaderLease, am.leaderLease) re.Equal(time.Hour*24, am.maxResetTSGap()) - re.Equal(defaultKsgStorageTSRootPath, am.rootPath) - re.Equal(time.Duration(utils.DefaultLeaderLease)*time.Second, am.saveInterval) + re.Equal(legacySvcRootPath, am.rootPath) + re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - keyspaceGroupManager.Close() + ksgMgr.Close() +} + +// TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. +func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsAssignment() { + re := suite.Require() + maxCountInUse := int(mcsutils.MaxKeyspaceGroupCountInUse) + // Test loading of empty keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0, 100) + // Test loading of single keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0, 100) + // Test loading of multiple keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0, 100) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse-1, 0, 10) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse, 0, 10) + // Test loading of the keyspace group assignment which exceeds the maximum + // keyspace group count. In this case, the manager should only load/serve the + // first MaxKeyspaceGroupCountInUse keyspace groups and ignore the rest + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse+1, 0, 10) +} + +// TestLoadWithDifferentBatchSize tests the loading of the keyspace group assignment with the different batch size. +func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { + re := suite.Require() + + batchSize := int64(17) + maxCount := uint32(1024) + params := []struct { + batchSize int64 + count int + probabilityAssignToMe int // percentage of assigning keyspace groups to this host/pod + }{ + {batchSize: 1, count: 1, probabilityAssignToMe: 100}, + {batchSize: 2, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: 7, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: batchSize, count: int(batchSize), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount / 13), count: int(maxCount / 13), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount), count: int(maxCount / 13), probabilityAssignToMe: 10}, + } + + for _, param := range params { + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count-1, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count+1, param.batchSize, param.probabilityAssignToMe) + } +} + +// TestWatchAndDynamicallyApplyChanges tests the keyspace group manager watch and dynamically apply +// keyspace groups' membership/distribution meta changes. +func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges() { + re := suite.Require() + + // Start with the empty keyspace group assignment. + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 0) + re.NotNil(mgr) + defer mgr.Close() + err := mgr.Initialize(true) + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + // Initialize PUT/DELETE events + events := []*etcdEvent{} + // Assign keyspace group 0 to this host/pod/keyspace-group-manager. + // final result: [0] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 0, []string{svcAddr}) + // Assign keyspace group 1 to this host/pod/keyspace-group-manager. + // final result: [0,1] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 1, []string{"unknown", svcAddr}) + // Assign keyspace group 2 to other host/pod/keyspace-group-manager. + // final result: [0,1] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{"unknown"}) + // Assign keyspace group 3 to this host/pod/keyspace-group-manager. + // final result: [0,1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{svcAddr}) + // Delete keyspace group 0 + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.DELETE, 0, []string{}) + // Put keyspace group 4 which doesn't belong to anyone. + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{}) + // Put keyspace group 5 which doesn't belong to anyone. + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 5, []string{}) + // Assign keyspace group 2 to this host/pod/keyspace-group-manager. + // final result: [1,2,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{svcAddr}) + // Reassign keyspace group 3 to no one. + // final result: [1,2] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{}) + // Reassign keyspace group 4 to this host/pod/keyspace-group-manager. + // final result: [1,2,4] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{svcAddr}) + + // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. + idsExpected := []int{1, 2, 4} + + // Apply the keyspace group assignment change events to etcd. + for _, event := range events { + switch event.eventType { + case mvccpb.PUT: + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + case mvccpb.DELETE: + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + re.NoError(err) + } + } + + // Verify the keyspace group assignment. + testutil.Eventually(re, func() bool { + idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(idsExpected, idsAssigned) + }) +} + +type etcdEvent struct { + eventType mvccpb.Event_EventType + ksg *endpoint.KeyspaceGroup +} + +func generateKeyspaceGroupEvent( + events []*etcdEvent, eventType mvccpb.Event_EventType, id uint32, addrs []string, +) []*etcdEvent { + members := []endpoint.KeyspaceGroupMember{} + for _, addr := range addrs { + members = append(members, endpoint.KeyspaceGroupMember{Address: addr}) + } + + return append(events, + &etcdEvent{ + eventType: eventType, + ksg: &endpoint.KeyspaceGroup{ + ID: id, + Members: members, + Keyspaces: []uint32{id}, + }, + }, + ) +} + +// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. +func runTestLoadKeyspaceGroupsAssignment( + ctx context.Context, + re *require.Assertions, + etcdClient *clientv3.Client, + cfg *TestServiceConfig, + numberOfKeypaceGroupsToAdd int, + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod +) { + idsExpected := []int{} + mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize) + re.NotNil(mgr) + defer mgr.Close() + + step := 30 + mux := sync.Mutex{} + wg := sync.WaitGroup{} + for i := 0; i < numberOfKeypaceGroupsToAdd; i += step { + wg.Add(1) + go func(startID int) { + defer wg.Done() + + endID := startID + step + if endID > numberOfKeypaceGroupsToAdd { + endID = numberOfKeypaceGroupsToAdd + } + + randomGen := rand.New(rand.NewSource(time.Now().UnixNano())) + for j := startID; j < endID; j++ { + assignToMe := false + // Assign the keyspace group to this host/pod with the given probability, + // and the keyspace group manager only loads the keyspace groups with id + // less than len(mgr.ams). + if j < len(mgr.ams) && randomGen.Intn(100) < probabilityAssignToMe { + assignToMe = true + mux.Lock() + idsExpected = append(idsExpected, j) + mux.Unlock() + } + addKeyspaceGroupAssignment( + ctx, etcdClient, assignToMe, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j)) + } + }(i) + } + wg.Wait() + + err := mgr.Initialize(true) + re.NoError(err) + + // Verify the keyspace group assignment. + sort.Ints(idsExpected) + idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) + re.Equal(idsExpected, idsAssigned) +} + +func newUniqueKeyspaceGroupManager( + ctx context.Context, etcdClient *clientv3.Client, cfg *TestServiceConfig, + loadKeyspaceGroupsTimeout time.Duration, // set to 0 to use the default value + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value +) *KeyspaceGroupManager { + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.AdvertiseListenAddr} + uniqueID := memberutil.GenerateUniqueID(uuid.New().String()) + uniqueStr := strconv.FormatUint(uniqueID, 10) + legacySvcRootPath := path.Join("/pd", uniqueStr) + tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") + electionNamePrefix := "kgm-test-" + uniqueStr + + keyspaceGroupManager := NewKeyspaceGroupManager( + ctx, tsoServiceID, etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, cfg) + if loadKeyspaceGroupsTimeout != 0 { + keyspaceGroupManager.loadKeyspaceGroupsTimeout = loadKeyspaceGroupsTimeout + } + if loadKeyspaceGroupsBatchSize != 0 { + keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize + } + return keyspaceGroupManager +} + +// putKeyspaceGroupToEtcd puts a keyspace group to etcd. +func putKeyspaceGroupToEtcd( + ctx context.Context, etcdClient *clientv3.Client, + rootPath string, group *endpoint.KeyspaceGroup, +) error { + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(group.ID)}, "/") + value, err := json.Marshal(group) + if err != nil { + return err + } + + if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + return err + } + + return nil +} + +// deleteKeyspaceGroupInEtcd deletes a keyspace group in etcd. +func deleteKeyspaceGroupInEtcd( + ctx context.Context, etcdClient *clientv3.Client, + rootPath string, id uint32, +) error { + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + + if _, err := etcdClient.Delete(ctx, key); err != nil { + return err + } + + return nil +} + +// addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. +func addKeyspaceGroupAssignment( + ctx context.Context, etcdClient *clientv3.Client, + assignToMe bool, rootPath, svcAddr string, id uint32, +) error { + var location string + if assignToMe { + location = svcAddr + } else { + location = uuid.NewString() + } + group := &endpoint.KeyspaceGroup{ + ID: id, + Members: []endpoint.KeyspaceGroupMember{{Address: location}}, + Keyspaces: []uint32{id}, + } + + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + value, err := json.Marshal(group) + if err != nil { + return err + } + + if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + return err + } + + return nil +} + +func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGroupManager) []int { + ids := []int{} + for i := 0; i < len(ksgMgr.ksgs); i++ { + ksg := ksgMgr.ksgs[i].Load() + if ksg == nil { + re.Nil(ksgMgr.ams[i].Load(), fmt.Sprintf("ksg is nil but am is not nil for id %d", i)) + } else { + am := ksgMgr.ams[i].Load() + re.NotNil(am, fmt.Sprintf("ksg is not nil but am is nil for id %d", i)) + re.Equal(i, int(am.ksgID)) + re.Equal(i, int(ksg.ID)) + for _, m := range ksg.Members { + if m.Address == ksgMgr.tsoServiceID.ServiceAddr { + ids = append(ids, i) + break + } + } + } + } + + return ids } diff --git a/server/server.go b/server/server.go index b2573032d7f..b85d8f026e4 100644 --- a/server/server.go +++ b/server/server.go @@ -1299,7 +1299,7 @@ func (s *Server) GetServiceRateLimiter() *ratelimit.Limiter { return s.serviceRateLimiter } -// IsInRateLimitAllowList returns whethis given service label is in allow lost +// IsInRateLimitAllowList returns whether given service label is in allow lost func (s *Server) IsInRateLimitAllowList(serviceLabel string) bool { return s.serviceRateLimiter.IsInAllowList(serviceLabel) }