From de484bc6b1d9bfd201b745c8a406a965055cfc21 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 6 Apr 2023 00:30:40 -0700 Subject: [PATCH] Add filter and batch handlng when loading keyspace groups meta from storage Fix root path for LoadTimestamp/SaveTimestamp Separate initial keyspace groups assignment loading from onlne dynamical change. Implement loadKeyspaceGroup() which also returns revision. Changed the timeout mechanism for watching the progress of loading and initialize the keyspace groups assignment Watch keyspace group membership/distribution meta change and dynamically apply the change Add TestLoadInitialKeyspaceGroupsAssignment and TestExtractKeyspaceGroupIDFromPath Add test cases for loading keyspace group assignment Add retry if there is temporary failure when loading keyspace group assignment from etcd Removed the mutex. The original purpose was to prevent concurrent addition/removal of keyspace groups during critical periods such as service shutdown and online keyspace group, while the former requires snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is added/initialized after that. Now we have ensured that's case by always cancelling the watch loop before closing keyspace groups in Close(). Added unitest for watching and dynamically applying keyspace group meta changes Signed-off-by: Bin Shi --- client/client.go | 4 +- client/errs/errno.go | 14 +- errors.toml | 20 + go.mod | 2 +- pkg/errs/errno.go | 32 +- pkg/mcs/tso/server/handler.go | 9 +- pkg/mcs/tso/server/server.go | 43 +- pkg/storage/endpoint/key_path.go | 17 + pkg/storage/endpoint/key_path_test.go | 48 +++ pkg/storage/endpoint/tso_keyspace_group.go | 10 +- pkg/tso/allocator_manager.go | 5 +- pkg/tso/global_allocator.go | 6 +- pkg/tso/keyspace_group_manager.go | 462 ++++++++++++++++++--- pkg/tso/keyspace_group_manager_test.go | 414 ++++++++++++++++-- server/server.go | 2 +- 15 files changed, 974 insertions(+), 114 deletions(-) diff --git a/client/client.go b/client/client.go index 22b037d1ede..249caa4d149 100644 --- a/client/client.go +++ b/client/client.go @@ -1167,7 +1167,9 @@ func IsLeaderChange(err error) bool { return true } errMsg := err.Error() - return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr) + return strings.Contains(errMsg, errs.NotLeaderErr) || + strings.Contains(errMsg, errs.MismatchLeaderErr) || + strings.Contains(errMsg, errs.NotServedErr) } func trimHTTPPrefix(str string) string { diff --git a/client/errs/errno.go b/client/errs/errno.go index 43c08396f1e..9ed860681be 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -21,11 +21,19 @@ import ( ) const ( - // NotLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. NotLeaderErr = "is not leader" - // MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" - RetryTimeoutErr = "retry timeout" + // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. + // Note: keep the same as the ones defined on the server side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. + NotServedErr = "is not served" + RetryTimeoutErr = "retry timeout" ) // client errors diff --git a/errors.toml b/errors.toml index 440015b7bf5..5b9ecd0a345 100644 --- a/errors.toml +++ b/errors.toml @@ -1,6 +1,16 @@ # AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen # YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. +["ErrLoadKeyspaceGroupsTerminated"] +error = ''' +load keyspace groups terminated +''' + +["ErrLoadKeyspaceGroupsTimeout"] +error = ''' +load keyspace groups timeout +''' + ["PD:ErrEncryptionKMS"] error = ''' KMS error @@ -731,11 +741,21 @@ error = ''' get allocator failed, %s ''' +["PD:tso:ErrGetAllocatorManager"] +error = ''' +get allocator manager failed, %s +''' + ["PD:tso:ErrGetLocalAllocator"] error = ''' get local allocator failed, %s ''' +["PD:tso:ErrKeyspaceGroupIDInvalid"] +error = ''' +the keyspace group id is invalid, %s +''' + ["PD:tso:ErrLogicOverflow"] error = ''' logic part overflow diff --git a/go.mod b/go.mod index 4c4090694b9..fd7a07c0d53 100644 --- a/go.mod +++ b/go.mod @@ -111,7 +111,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.4.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 84793917ca9..1deb285df94 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -17,10 +17,18 @@ package errs import "github.com/pingcap/errors" const ( - // NotLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // NotLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. NotLeaderErr = "is not leader" - // MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader. + // MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. MismatchLeaderErr = "mismatch leader id" + // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. + // Note: keep the same as the ones defined on the client side, because the client side checks if an error message + // contains this string to judge whether the leader is changed. + NotServedErr = "is not served" ) // common error in multiple packages @@ -31,14 +39,18 @@ var ( // tso errors var ( - ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) - ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) - ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) - ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) - ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) - ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) - ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) - ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) + ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) + ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) + ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) + ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) + ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) + ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) + ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid")) + ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager")) + ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) + ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) ) // member errors diff --git a/pkg/mcs/tso/server/handler.go b/pkg/mcs/tso/server/handler.go index 40ddd2f726b..14f83bccc61 100644 --- a/pkg/mcs/tso/server/handler.go +++ b/pkg/mcs/tso/server/handler.go @@ -17,6 +17,7 @@ package server import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/tso" "go.uber.org/zap" ) @@ -31,12 +32,18 @@ func newHandler(s *Server) *Handler { } // ResetTS resets the ts with specified tso. +// TODO: Support multiple keyspace groups. func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error { log.Info("reset-ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) - tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation) + tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get allocator manager", errs.ZapError(err)) + return err + } + tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) if err != nil { return err } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index db8e8a239f2..642952b16e4 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -111,7 +111,10 @@ type Server struct { // Callback functions for different stages // startCallbacks will be called after the server is started. - startCallbacks []func() + startCallbacks []func() + + // for service registry + serviceID *discovery.ServiceRegistryEntry serviceRegister *discovery.ServiceRegister } @@ -199,14 +202,30 @@ func (s *Server) AddStartCallback(callbacks ...func()) { // IsServing implements basicserver. It returns whether the server is the leader // if there is embedded etcd, or the primary otherwise. +// TODO: support multiple keyspace groups func (s *Server) IsServing() bool { - return atomic.LoadInt64(&s.isRunning) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader() + if atomic.LoadInt64(&s.isRunning) == 0 { + return false + } + + member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get election member", errs.ZapError(err)) + return false + } + return member.IsLeader() } // GetLeaderListenUrls gets service endpoints from the leader in election group. // The entry at the index 0 is the primary's service endpoint. func (s *Server) GetLeaderListenUrls() []string { - return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls() + member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + if err != nil { + log.Error("failed to get election member", errs.ZapError(err)) + return nil + } + + return member.GetLeaderListenUrls() } // AddServiceReadyCallback implements basicserver. @@ -229,8 +248,8 @@ func (s *Server) IsClosed() bool { } // GetTSOAllocatorManager returns the manager of TSO Allocator. -func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { - return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID) +func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) { + return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID) } // IsLocalRequest checks if the forwarded host is the current host @@ -416,11 +435,16 @@ func (s *Server) startServer() (err error) { } s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) + legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) + s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg) - s.keyspaceGroupManager.Initialize() + s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg) + // The param `false` means that we don't initialize the keyspace group manager + // by loading the keyspace group meta from etcd. + if err := s.keyspaceGroupManager.Initialize(false); err != nil { + return err + } s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.service = &Service{Server: s} @@ -448,8 +472,7 @@ func (s *Server) startServer() (err error) { } // Server has started. - entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} - serializedEntry, err := entry.Serialize() + serializedEntry, err := s.serviceID.Serialize() if err != nil { return err } diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 1cf258c08f9..f84459fbd7d 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -17,6 +17,7 @@ package endpoint import ( "fmt" "path" + "regexp" "strconv" "strings" @@ -239,6 +240,22 @@ func KeyspaceGroupIDPath(id uint32) string { return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id)) } +// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains +// the pattern of `tso/keyspace_groups/membership/(\d{5})$`. +func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) { + pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/") + re := regexp.MustCompile(pattern) + match := re.FindStringSubmatch(path) + if match == nil { + return 0, fmt.Errorf("invalid keyspace group id path: %s", path) + } + id, err := strconv.ParseUint(match[1], 10, 32) + if err != nil { + return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err) + } + return uint32(id), nil +} + // encodeKeyspaceGroupID from uint32 to string. func encodeKeyspaceGroupID(groupID uint32) string { return fmt.Sprintf("%05d", groupID) diff --git a/pkg/storage/endpoint/key_path_test.go b/pkg/storage/endpoint/key_path_test.go index d6ef584105a..270d1e266fe 100644 --- a/pkg/storage/endpoint/key_path_test.go +++ b/pkg/storage/endpoint/key_path_test.go @@ -27,3 +27,51 @@ func BenchmarkRegionPath(b *testing.B) { _ = RegionPath(uint64(i)) } } + +func TestExtractKeyspaceGroupIDFromPath(t *testing.T) { + re := require.New(t) + + rightCases := []struct { + path string + id uint32 + }{ + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999}, + {path: "tso/keyspace_groups/membership/00000", id: 0}, + {path: "tso/keyspace_groups/membership/00001", id: 1}, + {path: "tso/keyspace_groups/membership/12345", id: 12345}, + {path: "tso/keyspace_groups/membership/99999", id: 99999}, + } + + for _, tt := range rightCases { + id, err := ExtractKeyspaceGroupIDFromPath(tt.path) + re.Equal(tt.id, id) + re.NoError(err) + } + + wrongCases := []struct { + path string + }{ + {path: ""}, + {path: "00001"}, + {path: "xxx/keyspace_groups/membership/00001"}, + {path: "tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"}, + {path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"}, + } + + for _, tt := range wrongCases { + _, err := ExtractKeyspaceGroupIDFromPath(tt.path) + re.Error(err) + } +} diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 2d65dfee28a..3e4b5f2235e 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -22,11 +22,19 @@ import ( "go.etcd.io/etcd/clientv3" ) +// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group. +type KeyspaceGroupMember struct { + Address string `json:"address"` +} + // KeyspaceGroup is the keyspace group. type KeyspaceGroup struct { ID uint32 `json:"id"` UserKind string `json:"user-kind"` - // TODO: add `Members` field + // Members are the election members which campaign for the primary of the keyspace group. + Members []KeyspaceGroupMember `json:"members"` + // Keyspaces are the keyspace IDs which belong to the keyspace group. + Keyspaces []uint32 `json:"keyspaces"` } // KeyspaceGroupStorage is the interface for keyspace group storage. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 362ff6763b4..c074e25a6d9 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -294,8 +294,8 @@ func (am *AllocatorManager) close() { log.Info("closed the allocator manager") } -func (am *AllocatorManager) getMember() *ElectionMember { - return &am.member +func (am *AllocatorManager) getMember() ElectionMember { + return am.member } // SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location @@ -1072,6 +1072,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation)) return pdpb.Timestamp{}, err } + return allocatorGroup.allocator.GenerateTSO(count) } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 09febeccc3b..d79a9b2f0b7 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -193,7 +193,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // Have dc-locations configured in the cluster, use the Global TSO generation way. // (whit synchronization with other Local TSO Allocators) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(gta.ctx) defer cancel() for i := 0; i < maxRetryCount; i++ { var ( @@ -237,7 +237,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) skipCheck = true goto SETTING_PHASE } - // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valide. + // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 { tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc() } @@ -309,7 +309,7 @@ type syncResp struct { // SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap. // If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into -// each allocator and remines the same after the synchronization. +// each allocator and remains the same after the synchronization. // If not, it will be replaced with the new max Local TSO and return. func (gta *GlobalTSOAllocator) SyncMaxTS( ctx context.Context, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2598da4a6b0..e2cfba24658 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -16,53 +16,81 @@ package tso import ( "context" + "encoding/json" "fmt" "path" + "strings" + "sync" + "sync/atomic" + "time" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) -// primaryElectionSuffix is the suffix of the key for keyspace group primary election -const primaryElectionSuffix = "primary" +const ( + // primaryElectionSuffix is the suffix of the key for keyspace group primary election + primaryElectionSuffix = "primary" + // defaultLoadKeyspaceGroupsTimeout is the default timeout for loading the initial + // keyspace group assignment + defaultLoadKeyspaceGroupsTimeout = 30 * time.Second + defaultLoadKeyspaceGroupsBatchSize = int64(400) + loadFromEtcdMaxRetryTimes = 6 + loadFromEtcdRetryInterval = 500 * time.Millisecond + watchKEtcdChangeRetryInterval = 1 * time.Second +) -// KeyspaceGroupManager manages the primary/secondaries of the keyspace groups -// assigned to this host. The primaries provide the tso service for the corresponding +// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. +// The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. type KeyspaceGroupManager struct { - // ksgAllocatorManagers[i] stores the AllocatorManager of the keyspace group i. + // ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned + // with an allocator manager managing its global/local tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. - // TODO: change item type to atomic.Value stored as *AllocatorManager after we - // support online keyspace group assignment. - ksgAllocatorManagers [mcsutils.MaxKeyspaceGroupCountInUse]*AllocatorManager + ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager] + // ksgs stores the keyspace groups' membership/distribution meta. + ksgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup] + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - etcdClient *clientv3.Client + // tsoServiceID is the service ID of the TSO service, registered in the service discovery + tsoServiceID *discovery.ServiceRegistryEntry + etcdClient *clientv3.Client // electionNamePrefix is the name prefix to generate the unique name of a participant, // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string - // defaultKsgStorageTSRootPath is the root path of the default keyspace group in the - // storage endpoiont which is used for LoadTimestamp/SaveTimestamp. - // This is the legacy root path in the format of "/pd/{cluster_id}". - // Below is the entire path of in the legacy format (used by the default keyspace group) - // Key: /pd/{cluster_id}/timestamp - // Value: ts(time.Time) - // Key: /pd/{cluster_id}/lta/{dc-location}/timestamp - // Value: ts(time.Time) - defaultKsgStorageTSRootPath string - // tsoSvcRootPath defines the root path for all etcd paths used for different purposes. + // legacySvcRootPath defines the legacy root path for all etcd paths which derives from + // the PD/API service. It's in the format of "/pd/{cluster_id}". + // The main paths for different usages include: + // 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the + // storage endpoint. + // Key: /pd/{cluster_id}/timestamp + // Value: ts(time.Time) + // Key: /pd/{cluster_id}/lta/{dc-location}/timestamp + // Value: ts(time.Time) + // 2. The path for storing keyspace group membership/distribution metadata. + // Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} + // Value: endpoint.KeyspaceGroup + // Note: The {group} is 5 digits integer with leading zeros. + legacySvcRootPath string + // tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices. // It is in the format of "/ms//tso". - // The main paths for different usages in the tso microservice include: + // The main paths for different usages include: // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. @@ -72,16 +100,24 @@ type KeyspaceGroupManager struct { // Value: ts(time.Time) // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string + // legacySvcStorage is storage with legacySvcRootPath. + legacySvcStorage *endpoint.StorageEndpoint + // tsoSvcStorage is storage with tsoSvcRootPath. + tsoSvcStorage *endpoint.StorageEndpoint // cfg is the TSO config cfg ServiceConfig + // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment. + loadKeyspaceGroupsTimeout time.Duration + loadKeyspaceGroupsBatchSize int64 } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. func NewKeyspaceGroupManager( ctx context.Context, + tsoServiceID *discovery.ServiceRegistryEntry, etcdClient *clientv3.Client, electionNamePrefix string, - defaultKsgStorageTSRootPath string, + legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, ) *KeyspaceGroupManager { @@ -92,60 +128,380 @@ func NewKeyspaceGroupManager( } ctx, cancel := context.WithCancel(ctx) - ksgMgr := &KeyspaceGroupManager{ + kgm := &KeyspaceGroupManager{ ctx: ctx, cancel: cancel, + tsoServiceID: tsoServiceID, etcdClient: etcdClient, electionNamePrefix: electionNamePrefix, - defaultKsgStorageTSRootPath: defaultKsgStorageTSRootPath, + legacySvcRootPath: legacySvcRootPath, tsoSvcRootPath: tsoSvcRootPath, cfg: cfg, + loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout, + loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize, } - return ksgMgr + kgm.legacySvcStorage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) + kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) + return kgm } // Initialize this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Initialize() { - // TODO: dynamically load keyspace group assignment from the persistent storage and add watch for the assignment change - kgm.initDefaultKeyspaceGroup() +func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { + // Initialize the default keyspace group if not loading from storage + if !loadFromStorage { + group := &endpoint.KeyspaceGroup{ + ID: mcsutils.DefaultKeySpaceGroupID, + Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, + Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, + } + kgm.updateKeyspaceGroup(group) + return nil + } + + // Load the initial keyspace group assignment from storage with time limit + done := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(kgm.ctx) + go kgm.checkInitProgress(ctx, cancel, done) + watchStartRevision, err := kgm.initAssignment(ctx) + done <- struct{}{} + if err != nil { + log.Error("failed to initialize keyspace group manager", errs.ZapError(err)) + // We might have partially loaded/initialized the keyspace groups. Close the manager to clean up. + kgm.Close() + return err + } + + // Watch/apply keyspace group membership/distribution meta changes dynamically. + kgm.wg.Add(1) + go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision) + + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.closeKeyspaceGroups() + + log.Info("keyspace group manager closed") +} + +func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() { + log.Info("closing all keyspace groups") + + wg := sync.WaitGroup{} + for i := range kgm.ams { + if am := kgm.ams[i].Load(); am != nil { + wg.Add(1) + go func(am *AllocatorManager) { + defer wg.Done() + am.close() + log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.ksgID)) + }(am) + } + } + wg.Wait() + + log.Info("All keyspace groups closed") +} + +func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) { + select { + case <-done: + return + case <-time.After(kgm.loadKeyspaceGroupsTimeout): + log.Error("failed to initialize keyspace group manager", + zap.Any("timeout-setting", kgm.loadKeyspaceGroupsTimeout), + errs.ZapError(errs.ErrLoadKeyspaceGroupsTimeout)) + cancel() + case <-ctx.Done(): + } + <-done } -// Initialize this the default keyspace group -func (kgm *KeyspaceGroupManager) initDefaultKeyspaceGroup() { - uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, mcsutils.DefaultKeySpaceGroupID) - uniqueID := memberutil.GenerateUniqueID(uniqueName) - log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) +// initAssignment loads initial keyspace group assignment from storage and initialize the group manager. +func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) { + var ( + // The start revision for watching keyspace group membership/distribution change + watchStartRevision int64 + groups []*endpoint.KeyspaceGroup + more bool + err error + keyspaceGroupsLoaded uint32 + revision int64 + ) + + // Load all keyspace groups from etcd and apply the ones assigned to this tso service. + for { + revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize) + if err != nil { + return 0, err + } + + keyspaceGroupsLoaded += uint32(len(groups)) + + if watchStartRevision == 0 || revision < watchStartRevision { + watchStartRevision = revision + } + + // Update the keyspace groups + for _, group := range groups { + select { + case <-ctx.Done(): + return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated + default: + } - participant := member.NewParticipant(kgm.etcdClient) - participant.InitInfo(uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", mcsutils.DefaultKeySpaceGroupID)), - primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + kgm.updateKeyspaceGroup(group) + } - defaultKsgGroupStorage := endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(kgm.etcdClient, kgm.defaultKsgStorageTSRootPath), nil) - kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID] = - NewAllocatorManager( - kgm.ctx, mcsutils.DefaultKeySpaceGroupID, participant, - kgm.defaultKsgStorageTSRootPath, defaultKsgGroupStorage, - kgm.cfg, true) + if !more { + break + } + } + + log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded)) + return watchStartRevision, nil +} + +// loadKeyspaceGroups loads keyspace groups from the start ID with limit. +// If limit is 0, it will load all keyspace groups from the start ID. +func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( + ctx context.Context, startID uint32, limit int64, +) (revison int64, ksgs []*endpoint.KeyspaceGroup, more bool, err error) { + rootPath := kgm.legacySvcRootPath + startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(startID)}, "/") + endKey := strings.Join( + []string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/") + opOption := []clientv3.OpOption{clientv3.WithRange(endKey), clientv3.WithLimit(limit)} + + var resp *clientv3.GetResponse + for i := 0; i < loadFromEtcdMaxRetryTimes; i++ { + resp, err = etcdutil.EtcdKVGet(kgm.etcdClient, startKey, opOption...) + if err == nil && resp != nil { + break + } + select { + case <-ctx.Done(): + return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsTerminated + case <-time.After(loadFromEtcdRetryInterval): + } + } + + kgs := make([]*endpoint.KeyspaceGroup, 0, len(resp.Kvs)) + for _, item := range resp.Kvs { + kg := &endpoint.KeyspaceGroup{} + if err = json.Unmarshal(item.Value, kg); err != nil { + return 0, nil, false, err + } + kgs = append(kgs, kg) + } + + if resp.Header != nil { + revison = resp.Header.Revision + } + + return revison, kgs, resp.More, nil +} + +// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution +// and apply the change dynamically. +func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) { + defer logutil.LogPanic() + defer kgm.wg.Done() + + // Repeatedly watch/apply keyspace group membership/distribution changes until the context is canceled. + for { + select { + case <-kgm.ctx.Done(): + return + default: + } + + nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision) + if err != nil { + log.Error("watcher canceled unexpectedly. Will start a new watcher after a while", + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Error(err)) + time.Sleep(watchKEtcdChangeRetryInterval) + } + } +} + +// watchKeyspaceGroupsMetaChange watches any change in keyspace group membership/distribution +// and apply the change dynamically. +func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (int64, error) { + watcher := clientv3.NewWatcher(kgm.etcdClient) + defer watcher.Close() + + ksgPrefix := strings.Join([]string{kgm.legacySvcRootPath, endpoint.KeyspaceGroupIDPrefix()}, "/") + + for { + watchChan := watcher.Watch(kgm.ctx, ksgPrefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range watchChan { + if wresp.CompactRevision != 0 { + log.Warn("Required revision has been compacted, the watcher will watch again with the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + break + } + if wresp.Err() != nil { + log.Error("watch is canceled or closed", + zap.Int64("required-revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return revision, wresp.Err() + } + for _, event := range wresp.Events { + id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) + if err != nil { + log.Warn("failed to extract keyspace group ID from the key path", + zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) + continue + } + + switch event.Type { + case clientv3.EventTypePut: + group := &endpoint.KeyspaceGroup{} + if err := json.Unmarshal(event.Kv.Value, group); err != nil { + log.Warn("failed to unmarshal keyspace group", + zap.Uint32("keysapce-group-id", id), + zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) + } else { + kgm.updateKeyspaceGroup(group) + } + case clientv3.EventTypeDelete: + kgm.deleteKeyspaceGroup(id) + } + } + revision = wresp.Header.Revision + } + + select { + case <-kgm.ctx.Done(): + return revision, nil + default: + } + } +} + +func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { + for _, member := range group.Members { + if member.Address == kgm.tsoServiceID.ServiceAddr { + return true + } + } + return false +} + +// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to +// this host/pod, it will join the primary election. +func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGroup) { + if group.ID >= uint32(len(kgm.ams)) { + log.Warn("keyspace group ID is out of range, ignore it", + zap.Uint32("keyspace-group-id", group.ID), zap.Int("max-keyspace-group-id", len(kgm.ams)-1)) + return + } + + assignedToMe := kgm.isAssignedToMe(group) + if assignedToMe { + if kgm.ams[group.ID].Load() != nil { + log.Info("keyspace group already initialized, so update meta only", + zap.Uint32("keyspace-group-id", group.ID)) + kgm.ksgs[group.ID].Store(group) + return + } + + uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID) + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("participant-name", uniqueName), + zap.Uint64("participant-id", uniqueID)) + + participant := member.NewParticipant(kgm.etcdClient) + participant.InitInfo( + uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)), + primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + + // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. + var ( + tsRootPath string + storage *endpoint.StorageEndpoint + ) + if group.ID == mcsutils.DefaultKeySpaceGroupID { + tsRootPath = kgm.legacySvcRootPath + storage = kgm.legacySvcStorage + } else { + tsRootPath = kgm.tsoSvcRootPath + storage = kgm.tsoSvcStorage + } + + kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)) + kgm.ksgs[group.ID].Store(group) + } else { + // Not assigned to me. If this host/pod owns this keyspace group, it should resign. + kgm.deleteKeyspaceGroup(group.ID) + } +} + +// deleteKeyspaceGroup deletes the given keyspace group. +func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) { + kgm.ksgs[id].Store(nil) + am := kgm.ams[id].Swap(nil) + if am == nil { + return + } + am.close() + log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id)) } // GetAllocatorManager returns the AllocatorManager of the given keyspace group -func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) *AllocatorManager { - return kgm.ksgAllocatorManagers[keyspaceGroupID] +func (kgm *KeyspaceGroupManager) GetAllocatorManager(id uint32) (*AllocatorManager, error) { + if err := kgm.checkKeySpaceGroupID(id); err != nil { + return nil, err + } + if am := kgm.ams[id].Load(); am != nil { + return am, nil + } + return nil, errs.ErrGetAllocatorManager.FastGenByArgs( + fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", id, errs.NotServedErr)) } // GetElectionMember returns the election member of the given keyspace group -func (kgm *KeyspaceGroupManager) GetElectionMember(keyspaceGroupID uint32) ElectionMember { - return *kgm.ksgAllocatorManagers[keyspaceGroupID].getMember() +func (kgm *KeyspaceGroupManager) GetElectionMember(id uint32) (ElectionMember, error) { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + return nil, err + } + return am.getMember(), nil } // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. -func (kgm *KeyspaceGroupManager) HandleTSORequest(keyspaceGroupID uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) { - return kgm.ksgAllocatorManagers[keyspaceGroupID].HandleRequest(dcLocation, count) +func (kgm *KeyspaceGroupManager) HandleTSORequest(id uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) { + am, err := kgm.GetAllocatorManager(id) + if err != nil { + return pdpb.Timestamp{}, err + } + return am.HandleRequest(dcLocation, count) } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - kgm.cancel() - kgm.ksgAllocatorManagers[mcsutils.DefaultKeySpaceGroupID].close() +func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { + if id < mcsutils.MaxKeyspaceGroupCountInUse { + return nil + } + return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs( + fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index c8f7f1a81c2..8b9a0b3d6c5 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -16,56 +16,414 @@ package tso import ( "context" + "encoding/json" + "fmt" + "math/rand" "path" + "reflect" + "sort" + "strconv" + "strings" + "sync" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" - "github.com/tikv/pd/pkg/mcs/utils" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/discovery" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/testutil" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/goleak" ) -func TestNewKeyspaceGroupManager(t *testing.T) { - re := require.New(t) - backendpoints, etcdClient, clean := startEmbeddedEtcd(t) - defer clean() +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.LeakOptions...) +} + +type keyspaceGroupManagerTestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + backendEndpoints string + etcdClient *clientv3.Client + clean func() + cfg *TestServiceConfig +} + +func TestKeyspaceGroupManagerTestSuite(t *testing.T) { + suite.Run(t, new(keyspaceGroupManagerTestSuite)) +} - cfg := &TestServiceConfig{ +func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { + t := suite.T() + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + + suite.cfg = &TestServiceConfig{ Name: "tso-test-name", - BackendEndpoints: backendpoints, + BackendEndpoints: suite.backendEndpoints, ListenAddr: "http://127.0.0.1:3379", AdvertiseListenAddr: "http://127.0.0.1:3379", - LeaderLease: utils.DefaultLeaderLease, + LeaderLease: mcsutils.DefaultLeaderLease, LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, - TSOSaveInterval: time.Duration(utils.DefaultLeaderLease) * time.Second, + TSOSaveInterval: time.Duration(mcsutils.DefaultLeaderLease) * time.Second, MaxResetTSGap: time.Hour * 24, TLSConfig: nil, } +} - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defaultKsgStorageTSRootPath := path.Join("/pd/1") - tsoSvcRootPath := "/ms/1/tso" - electionNamePrefix := "tso-server-1" +func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { + suite.clean() + suite.cancel() +} - keyspaceGroupManager := NewKeyspaceGroupManager( - ctx, etcdClient, electionNamePrefix, defaultKsgStorageTSRootPath, tsoSvcRootPath, cfg) - keyspaceGroupManager.Initialize() +// TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. +// It should initialize the allocator manager with the desired configurations and parameters. +func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { + re := suite.Require() - re.Equal(etcdClient, keyspaceGroupManager.etcdClient) - re.Equal(electionNamePrefix, keyspaceGroupManager.electionNamePrefix) - re.Equal(defaultKsgStorageTSRootPath, keyspaceGroupManager.defaultKsgStorageTSRootPath) - re.Equal(tsoSvcRootPath, keyspaceGroupManager.tsoSvcRootPath) - re.Equal(cfg, keyspaceGroupManager.cfg) + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} + guid := uuid.New().String() + legacySvcRootPath := path.Join("/pd", guid) + tsoSvcRootPath := path.Join("/ms", guid, "tso") + electionNamePrefix := "tso-server-" + guid - am := keyspaceGroupManager.GetAllocatorManager(utils.DefaultKeySpaceGroupID) + ksgMgr := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, suite.cfg) + err := ksgMgr.Initialize(false) + re.NoError(err) + + re.Equal(tsoServiceID, ksgMgr.tsoServiceID) + re.Equal(suite.etcdClient, ksgMgr.etcdClient) + re.Equal(electionNamePrefix, ksgMgr.electionNamePrefix) + re.Equal(legacySvcRootPath, ksgMgr.legacySvcRootPath) + re.Equal(tsoSvcRootPath, ksgMgr.tsoSvcRootPath) + re.Equal(suite.cfg, ksgMgr.cfg) + re.Equal(defaultLoadKeyspaceGroupsBatchSize, ksgMgr.loadKeyspaceGroupsBatchSize) + re.Equal(defaultLoadKeyspaceGroupsTimeout, ksgMgr.loadKeyspaceGroupsTimeout) + + am, err := ksgMgr.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID) + re.NoError(err) re.False(am.enableLocalTSO) - re.Equal(utils.DefaultKeySpaceGroupID, am.ksgID) - re.Equal(utils.DefaultLeaderLease, am.leaderLease) + re.Equal(mcsutils.DefaultKeySpaceGroupID, am.ksgID) + re.Equal(mcsutils.DefaultLeaderLease, am.leaderLease) re.Equal(time.Hour*24, am.maxResetTSGap()) - re.Equal(defaultKsgStorageTSRootPath, am.rootPath) - re.Equal(time.Duration(utils.DefaultLeaderLease)*time.Second, am.saveInterval) + re.Equal(legacySvcRootPath, am.rootPath) + re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - keyspaceGroupManager.Close() + ksgMgr.Close() +} + +// TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. +func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsAssignment() { + re := suite.Require() + maxCountInUse := int(mcsutils.MaxKeyspaceGroupCountInUse) + // Test loading of empty keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0, 100) + // Test loading of single keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0, 100) + // Test loading of multiple keyspace group assignment. + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0, 100) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse-1, 0, 10) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse, 0, 10) + // Test loading of the keyspace group assignment which exceeds the maximum + // keyspace group count. In this case, the manager should only load/serve the + // first MaxKeyspaceGroupCountInUse keyspace groups and ignore the rest + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse+1, 0, 10) +} + +// TestLoadWithDifferentBatchSize tests the loading of the keyspace group assignment with the different batch size. +func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { + re := suite.Require() + + batchSize := int64(17) + maxCount := uint32(1024) + params := []struct { + batchSize int64 + count int + probabilityAssignToMe int // percentage of assigning keyspace groups to this host/pod + }{ + {batchSize: 1, count: 1, probabilityAssignToMe: 100}, + {batchSize: 2, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: 7, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: batchSize, count: int(batchSize), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount / 13), count: int(maxCount / 13), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount), count: int(maxCount / 13), probabilityAssignToMe: 10}, + } + + for _, param := range params { + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count-1, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, + param.count+1, param.batchSize, param.probabilityAssignToMe) + } +} + +// TestWatchAndDynamicallyApplyChanges tests the keyspace group manager watch and dynamically apply +// keyspace groups' membership/distribution meta changes. +func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges() { + re := suite.Require() + + // Start with the empty keyspace group assignment. + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 0) + re.NotNil(mgr) + defer mgr.Close() + err := mgr.Initialize(true) + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + // Initialize PUT/DELETE events + events := []*etcdEvent{} + // Assign keyspace group 0 to this host/pod/keyspace-group-manager. + // final result: [0] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 0, []string{svcAddr}) + // Assign keyspace group 1 to this host/pod/keyspace-group-manager. + // final result: [0,1] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 1, []string{"unknown", svcAddr}) + // Assign keyspace group 2 to other host/pod/keyspace-group-manager. + // final result: [0,1] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{"unknown"}) + // Assign keyspace group 3 to this host/pod/keyspace-group-manager. + // final result: [0,1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{svcAddr}) + // Delete keyspace group 0 + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.DELETE, 0, []string{}) + // Put keyspace group 4 which doesn't belong to anyone. + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{}) + // Put keyspace group 5 which doesn't belong to anyone. + // final result: [1,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 5, []string{}) + // Assign keyspace group 2 to this host/pod/keyspace-group-manager. + // final result: [1,2,3] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 2, []string{svcAddr}) + // Reassign keyspace group 3 to no one. + // final result: [1,2] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 3, []string{}) + // Reassign keyspace group 4 to this host/pod/keyspace-group-manager. + // final result: [1,2,4] + events = generateKeyspaceGroupEvent(events, mvccpb.PUT, 4, []string{svcAddr}) + + // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. + idsExpected := []int{1, 2, 4} + + // Apply the keyspace group assignment change events to etcd. + for _, event := range events { + switch event.eventType { + case mvccpb.PUT: + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + re.NoError(err) + case mvccpb.DELETE: + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + re.NoError(err) + } + } + + // Verify the keyspace group assignment. + testutil.Eventually(re, func() bool { + idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) + return reflect.DeepEqual(idsExpected, idsAssigned) + }) +} + +type etcdEvent struct { + eventType mvccpb.Event_EventType + ksg *endpoint.KeyspaceGroup +} + +func generateKeyspaceGroupEvent( + events []*etcdEvent, eventType mvccpb.Event_EventType, id uint32, addrs []string, +) []*etcdEvent { + members := []endpoint.KeyspaceGroupMember{} + for _, addr := range addrs { + members = append(members, endpoint.KeyspaceGroupMember{Address: addr}) + } + + return append(events, + &etcdEvent{ + eventType: eventType, + ksg: &endpoint.KeyspaceGroup{ + ID: id, + Members: members, + Keyspaces: []uint32{id}, + }, + }, + ) +} + +// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. +func runTestLoadKeyspaceGroupsAssignment( + ctx context.Context, + re *require.Assertions, + etcdClient *clientv3.Client, + cfg *TestServiceConfig, + numberOfKeypaceGroupsToAdd int, + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod +) { + idsExpected := []int{} + mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize) + re.NotNil(mgr) + defer mgr.Close() + + step := 30 + mux := sync.Mutex{} + wg := sync.WaitGroup{} + for i := 0; i < numberOfKeypaceGroupsToAdd; i += step { + wg.Add(1) + go func(startID int) { + defer wg.Done() + + endID := startID + step + if endID > numberOfKeypaceGroupsToAdd { + endID = numberOfKeypaceGroupsToAdd + } + + randomGen := rand.New(rand.NewSource(time.Now().UnixNano())) + for j := startID; j < endID; j++ { + assignToMe := false + // Assign the keyspace group to this host/pod with the given probability, + // and the keyspace group manager only loads the keyspace groups with id + // less than len(mgr.ams). + if j < len(mgr.ams) && randomGen.Intn(100) < probabilityAssignToMe { + assignToMe = true + mux.Lock() + idsExpected = append(idsExpected, j) + mux.Unlock() + } + addKeyspaceGroupAssignment( + ctx, etcdClient, assignToMe, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j)) + } + }(i) + } + wg.Wait() + + err := mgr.Initialize(true) + re.NoError(err) + + // Verify the keyspace group assignment. + sort.Ints(idsExpected) + idsAssigned := collectAssignedKeyspaceGroupIDs(re, mgr) + re.Equal(idsExpected, idsAssigned) +} + +func newUniqueKeyspaceGroupManager( + ctx context.Context, etcdClient *clientv3.Client, cfg *TestServiceConfig, + loadKeyspaceGroupsTimeout time.Duration, // set to 0 to use the default value + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value +) *KeyspaceGroupManager { + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.AdvertiseListenAddr} + uniqueID := memberutil.GenerateUniqueID(uuid.New().String()) + uniqueStr := strconv.FormatUint(uniqueID, 10) + legacySvcRootPath := path.Join("/pd", uniqueStr) + tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") + electionNamePrefix := "kgm-test-" + uniqueStr + + keyspaceGroupManager := NewKeyspaceGroupManager( + ctx, tsoServiceID, etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, cfg) + if loadKeyspaceGroupsTimeout != 0 { + keyspaceGroupManager.loadKeyspaceGroupsTimeout = loadKeyspaceGroupsTimeout + } + if loadKeyspaceGroupsBatchSize != 0 { + keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize + } + return keyspaceGroupManager +} + +// putKeyspaceGroupToEtcd puts a keyspace group to etcd. +func putKeyspaceGroupToEtcd( + ctx context.Context, etcdClient *clientv3.Client, + rootPath string, group *endpoint.KeyspaceGroup, +) error { + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(group.ID)}, "/") + value, err := json.Marshal(group) + if err != nil { + return err + } + + if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + return err + } + + return nil +} + +// deleteKeyspaceGroupInEtcd deletes a keyspace group in etcd. +func deleteKeyspaceGroupInEtcd( + ctx context.Context, etcdClient *clientv3.Client, + rootPath string, id uint32, +) error { + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + + if _, err := etcdClient.Delete(ctx, key); err != nil { + return err + } + + return nil +} + +// addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. +func addKeyspaceGroupAssignment( + ctx context.Context, etcdClient *clientv3.Client, + assignToMe bool, rootPath, svcAddr string, id uint32, +) error { + var location string + if assignToMe { + location = svcAddr + } else { + location = uuid.NewString() + } + group := &endpoint.KeyspaceGroup{ + ID: id, + Members: []endpoint.KeyspaceGroupMember{{Address: location}}, + Keyspaces: []uint32{id}, + } + + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + value, err := json.Marshal(group) + if err != nil { + return err + } + + if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + return err + } + + return nil +} + +func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGroupManager) []int { + ids := []int{} + for i := 0; i < len(ksgMgr.ksgs); i++ { + ksg := ksgMgr.ksgs[i].Load() + if ksg == nil { + re.Nil(ksgMgr.ams[i].Load(), fmt.Sprintf("ksg is nil but am is not nil for id %d", i)) + } else { + am := ksgMgr.ams[i].Load() + re.NotNil(am, fmt.Sprintf("ksg is not nil but am is nil for id %d", i)) + re.Equal(i, int(am.ksgID)) + re.Equal(i, int(ksg.ID)) + for _, m := range ksg.Members { + if m.Address == ksgMgr.tsoServiceID.ServiceAddr { + ids = append(ids, i) + break + } + } + } + } + + return ids } diff --git a/server/server.go b/server/server.go index b2573032d7f..b85d8f026e4 100644 --- a/server/server.go +++ b/server/server.go @@ -1299,7 +1299,7 @@ func (s *Server) GetServiceRateLimiter() *ratelimit.Limiter { return s.serviceRateLimiter } -// IsInRateLimitAllowList returns whethis given service label is in allow lost +// IsInRateLimitAllowList returns whether given service label is in allow lost func (s *Server) IsInRateLimitAllowList(serviceLabel string) bool { return s.serviceRateLimiter.IsInAllowList(serviceLabel) }