From 3ccdd830f70f0dac376645572fbfeb3047aa6342 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Nov 2023 17:52:19 +0800 Subject: [PATCH] This is an automated cherry-pick of #7443 close tikv/pd#7416 Signed-off-by: ti-chi-bot --- client/client.go | 4 +- client/grpcutil/grpcutil.go | 44 ++ client/pd_service_discovery.go | 672 ++++++++++++++++++++++++++ client/tso_client.go | 279 +++++++++++ client/tso_dispatcher.go | 838 +++++++++++++++++++++++++++++++++ tests/client/client_test.go | 101 +++- 6 files changed, 1934 insertions(+), 4 deletions(-) create mode 100644 client/pd_service_discovery.go create mode 100644 client/tso_client.go create mode 100644 client/tso_dispatcher.go diff --git a/client/client.go b/client/client.go index 9d95d0edbee..bcc1b4f7ce2 100644 --- a/client/client.go +++ b/client/client.go @@ -501,16 +501,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) { if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { healthCli := healthpb.NewHealthClient(cc.(*grpc.ClientConn)) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - rpcErr, ok := status.FromError(err) failpoint.Inject("unreachableNetwork1", func() { resp = nil err = status.New(codes.Unavailable, "unavailable").Err() }) + rpcErr, ok := status.FromError(err) if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING { atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) } else { atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0)) } + } else { + atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) } } diff --git a/client/grpcutil/grpcutil.go b/client/grpcutil/grpcutil.go index 464fc3c19fa..47e6b3afffc 100644 --- a/client/grpcutil/grpcutil.go +++ b/client/grpcutil/grpcutil.go @@ -19,6 +19,12 @@ import ( "crypto/tls" "net/url" +<<<<<<< HEAD +======= + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" +>>>>>>> 54bf70e45 (client: update the leader even if the connection creation fails (#7443)) "github.com/tikv/pd/client/errs" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -64,3 +70,41 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context { md := metadata.Pairs(ForwardMetadataKey, addr) return metadata.NewOutgoingContext(ctx, md) } +<<<<<<< HEAD +======= + +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr. +// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it. +func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) { + conn, ok := clientConns.Load(addr) + if ok { + // TODO: check the connection state. + return conn.(*grpc.ClientConn), nil + } + tlsConfig, err := tlsCfg.ToTLSConfig() + if err != nil { + return nil, err + } + dCtx, cancel := context.WithTimeout(ctx, dialTimeout) + defer cancel() + cc, err := GetClientConn(dCtx, addr, tlsConfig, opt...) + failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) { + if val, ok := val.(string); ok && val == addr { + cc = nil + err = errors.Errorf("unreachable network") + } + }) + if err != nil { + return nil, err + } + conn, loaded := clientConns.LoadOrStore(addr, cc) + if !loaded { + // Successfully stored the connection. + return cc, nil + } + cc.Close() + cc = conn.(*grpc.ClientConn) + log.Debug("use existing connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String())) + return cc, nil +} +>>>>>>> 54bf70e45 (client: update the leader even if the connection creation fails (#7443)) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go new file mode 100644 index 00000000000..b75276adbe9 --- /dev/null +++ b/client/pd_service_discovery.go @@ -0,0 +1,672 @@ +// Copyright 2019 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "reflect" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/tlsutil" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + globalDCLocation = "global" + memberUpdateInterval = time.Minute + serviceModeUpdateInterval = 3 * time.Second + updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. + updateMemberBackOffBaseTime = 100 * time.Millisecond +) + +type serviceType int + +const ( + apiService serviceType = iota + tsoService +) + +// ServiceDiscovery defines the general interface for service discovery on a quorum-based cluster +// or a primary/secondary configured cluster. +type ServiceDiscovery interface { + // Init initialize the concrete client underlying + Init() error + // Close releases all resources + Close() + // GetClusterID returns the ID of the cluster + GetClusterID() uint64 + // GetKeyspaceID returns the ID of the keyspace + GetKeyspaceID() uint32 + // GetKeyspaceGroupID returns the ID of the keyspace group + GetKeyspaceGroupID() uint32 + // GetServiceURLs returns the URLs of the servers providing the service + GetServiceURLs() []string + // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint + // which is the leader in a quorum-based cluster or the primary in a primary/secondary + // configured cluster. + GetServingEndpointClientConn() *grpc.ClientConn + // GetClientConns returns the mapping {addr -> a gRPC connection} + GetClientConns() *sync.Map + // GetServingAddr returns the serving endpoint which is the leader in a quorum-based cluster + // or the primary in a primary/secondary configured cluster. + GetServingAddr() string + // GetBackupAddrs gets the addresses of the current reachable and healthy backup service + // endpoints randomly. Backup service endpoints are followers in a quorum-based cluster or + // secondaries in a primary/secondary configured cluster. + GetBackupAddrs() []string + // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr + GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) + // ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change + // among the leader/followers in a quorum-based cluster or among the primary/secondaries in a + // primary/secondary configured cluster. + ScheduleCheckMemberChanged() + // CheckMemberChanged immediately check if there is any membership change among the leader/followers + // in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. + CheckMemberChanged() error + // AddServingAddrSwitchedCallback adds callbacks which will be called when the leader + // in a quorum-based cluster or the primary in a primary/secondary configured cluster + // is switched. + AddServingAddrSwitchedCallback(callbacks ...func()) + // AddServiceAddrsSwitchedCallback adds callbacks which will be called when any leader/follower + // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster + // is changed. + AddServiceAddrsSwitchedCallback(callbacks ...func()) +} + +type updateKeyspaceIDFunc func() error +type tsoLocalServAddrsUpdatedFunc func(map[string]string) error +type tsoGlobalServAddrUpdatedFunc func(string) error + +type tsoAllocatorEventSource interface { + // SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso + // allocator leader list is updated. + SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) + // SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso + // allocator leader is updated. + SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) +} + +var _ ServiceDiscovery = (*pdServiceDiscovery)(nil) +var _ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil) + +// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based +type pdServiceDiscovery struct { + isInitialized bool + + urls atomic.Value // Store as []string + // PD leader URL + leader atomic.Value // Store as string + // PD follower URLs + followers atomic.Value // Store as []string + + clusterID uint64 + // addr -> a gRPC connection + clientConns sync.Map // Store as map[string]*grpc.ClientConn + + // serviceModeUpdateCb will be called when the service mode gets updated + serviceModeUpdateCb func(pdpb.ServiceMode) + // leaderSwitchedCbs will be called after the leader switched + leaderSwitchedCbs []func() + // membersChangedCbs will be called after there is any membership change in the + // leader and followers + membersChangedCbs []func() + // tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator + // leader list is updated. The input is a map {DC Location -> Leader Addr} + tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc + // tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator + // leader is updated. + tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc + + checkMembershipCh chan struct{} + + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + closeOnce sync.Once + + updateKeyspaceIDCb updateKeyspaceIDFunc + keyspaceID uint32 + tlsCfg *tlsutil.TLSConfig + // Client option. + option *option +} + +// newPDServiceDiscovery returns a new PD service discovery-based client. +func newPDServiceDiscovery( + ctx context.Context, cancel context.CancelFunc, + wg *sync.WaitGroup, + serviceModeUpdateCb func(pdpb.ServiceMode), + updateKeyspaceIDCb updateKeyspaceIDFunc, + keyspaceID uint32, + urls []string, tlsCfg *tlsutil.TLSConfig, option *option, +) *pdServiceDiscovery { + pdsd := &pdServiceDiscovery{ + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + serviceModeUpdateCb: serviceModeUpdateCb, + updateKeyspaceIDCb: updateKeyspaceIDCb, + keyspaceID: keyspaceID, + tlsCfg: tlsCfg, + option: option, + } + pdsd.urls.Store(urls) + return pdsd +} + +func (c *pdServiceDiscovery) Init() error { + if c.isInitialized { + return nil + } + + if err := c.initRetry(c.initClusterID); err != nil { + c.cancel() + return err + } + if err := c.initRetry(c.updateMember); err != nil { + c.cancel() + return err + } + log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) + + // We need to update the keyspace ID before we discover and update the service mode + // so that TSO in API mode can be initialized with the correct keyspace ID. + if c.updateKeyspaceIDCb != nil { + if err := c.updateKeyspaceIDCb(); err != nil { + return err + } + } + + if err := c.checkServiceModeChanged(); err != nil { + log.Warn("[pd] failed to check service mode and will check later", zap.Error(err)) + } + + c.wg.Add(2) + go c.updateMemberLoop() + go c.updateServiceModeLoop() + + c.isInitialized = true + return nil +} + +func (c *pdServiceDiscovery) initRetry(f func() error) error { + var err error + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for i := 0; i < c.option.maxRetryTimes; i++ { + if err = f(); err == nil { + return nil + } + select { + case <-c.ctx.Done(): + return err + case <-ticker.C: + } + } + return errors.WithStack(err) +} + +func (c *pdServiceDiscovery) updateMemberLoop() { + defer c.wg.Done() + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + ticker := time.NewTicker(memberUpdateInterval) + defer ticker.Stop() + + bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout) + for { + select { + case <-ctx.Done(): + log.Info("[pd] exit member loop due to context canceled") + return + case <-ticker.C: + case <-c.checkMembershipCh: + } + failpoint.Inject("skipUpdateMember", func() { + failpoint.Continue() + }) + if err := bo.Exec(ctx, c.updateMember); err != nil { + log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err)) + } + } +} + +func (c *pdServiceDiscovery) updateServiceModeLoop() { + defer c.wg.Done() + failpoint.Inject("skipUpdateServiceMode", func() { + failpoint.Return() + }) + failpoint.Inject("usePDServiceMode", func() { + c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) + failpoint.Return() + }) + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + ticker := time.NewTicker(serviceModeUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + if err := c.checkServiceModeChanged(); err != nil { + log.Error("[pd] failed to update service mode", + zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err)) + c.ScheduleCheckMemberChanged() // check if the leader changed + } + } +} + +// Close releases all resources. +func (c *pdServiceDiscovery) Close() { + c.closeOnce.Do(func() { + log.Info("[pd] close pd service discovery client") + c.clientConns.Range(func(key, cc interface{}) bool { + if err := cc.(*grpc.ClientConn).Close(); err != nil { + log.Error("[pd] failed to close grpc clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + c.clientConns.Delete(key) + return true + }) + }) +} + +// GetClusterID returns the ClusterID. +func (c *pdServiceDiscovery) GetClusterID() uint64 { + return c.clusterID +} + +// GetKeyspaceID returns the ID of the keyspace +func (c *pdServiceDiscovery) GetKeyspaceID() uint32 { + return c.keyspaceID +} + +// SetKeyspaceID sets the ID of the keyspace +func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) { + c.keyspaceID = keyspaceID +} + +// GetKeyspaceGroupID returns the ID of the keyspace group +func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 { + // PD/API service only supports the default keyspace group + return defaultKeySpaceGroupID +} + +// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. +func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) { + switch svcType { + case apiService: + urls = c.GetServiceURLs() + case tsoService: + leaderAddr := c.getLeaderAddr() + if len(leaderAddr) > 0 { + clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout) + if err != nil { + log.Error("[pd] failed to get cluster info", + zap.String("leader-addr", leaderAddr), errs.ZapError(err)) + return nil, err + } + urls = clusterInfo.TsoUrls + } else { + err = errors.New("failed to get leader addr") + return nil, err + } + default: + panic("invalid service type") + } + + return urls, nil +} + +// GetServiceURLs returns the URLs of the servers. +// For testing use. It should only be called when the client is closed. +func (c *pdServiceDiscovery) GetServiceURLs() []string { + return c.urls.Load().([]string) +} + +// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint +// which is the leader in a quorum-based cluster or the primary in a primary/secondary +// configured cluster. +func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { + if cc, ok := c.clientConns.Load(c.getLeaderAddr()); ok { + return cc.(*grpc.ClientConn) + } + return nil +} + +// GetClientConns returns the mapping {addr -> a gRPC connection} +func (c *pdServiceDiscovery) GetClientConns() *sync.Map { + return &c.clientConns +} + +// GetServingAddr returns the leader address +func (c *pdServiceDiscovery) GetServingAddr() string { + return c.getLeaderAddr() +} + +// GetBackupAddrs gets the addresses of the current reachable and healthy followers +// in a quorum-based cluster. +func (c *pdServiceDiscovery) GetBackupAddrs() []string { + return c.getFollowerAddrs() +} + +// ScheduleCheckMemberChanged is used to check if there is any membership +// change among the leader and the followers. +func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { + select { + case c.checkMembershipCh <- struct{}{}: + default: + } +} + +// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a +// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. +func (c *pdServiceDiscovery) CheckMemberChanged() error { + return c.updateMember() +} + +// AddServingAddrSwitchedCallback adds callbacks which will be called +// when the leader is switched. +func (c *pdServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) { + c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) +} + +// AddServiceAddrsSwitchedCallback adds callbacks which will be called when +// any leader/follower is changed. +func (c *pdServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) { + c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) +} + +// SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso +// allocator leader list is updated. +func (c *pdServiceDiscovery) SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) { + c.tsoLocalAllocLeadersUpdatedCb = callback +} + +// SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso +// allocator leader is updated. +func (c *pdServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) { + addr := c.getLeaderAddr() + if len(addr) > 0 { + callback(addr) + } + c.tsoGlobalAllocLeaderUpdatedCb = callback +} + +// getLeaderAddr returns the leader address. +func (c *pdServiceDiscovery) getLeaderAddr() string { + leaderAddr := c.leader.Load() + if leaderAddr == nil { + return "" + } + return leaderAddr.(string) +} + +// getFollowerAddrs returns the follower address. +func (c *pdServiceDiscovery) getFollowerAddrs() []string { + followerAddrs := c.followers.Load() + if followerAddrs == nil { + return []string{} + } + return followerAddrs.([]string) +} + +func (c *pdServiceDiscovery) initClusterID() error { + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + clusterID := uint64(0) + for _, url := range c.GetServiceURLs() { + members, err := c.getMembers(ctx, url, c.option.timeout) + if err != nil || members.GetHeader() == nil { + log.Warn("[pd] failed to get cluster id", zap.String("url", url), errs.ZapError(err)) + continue + } + if clusterID == 0 { + clusterID = members.GetHeader().GetClusterId() + continue + } + failpoint.Inject("skipClusterIDCheck", func() { + failpoint.Continue() + }) + // All URLs passed in should have the same cluster ID. + if members.GetHeader().GetClusterId() != clusterID { + return errors.WithStack(errUnmatchedClusterID) + } + } + // Failed to init the cluster ID. + if clusterID == 0 { + return errors.WithStack(errFailInitClusterID) + } + c.clusterID = clusterID + return nil +} + +func (c *pdServiceDiscovery) checkServiceModeChanged() error { + leaderAddr := c.getLeaderAddr() + if len(leaderAddr) == 0 { + return errors.New("no leader found") + } + + clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout) + if err != nil { + if strings.Contains(err.Error(), "Unimplemented") { + // If the method is not supported, we set it to pd mode. + // TODO: it's a hack way to solve the compatibility issue. + // we need to remove this after all maintained version supports the method. + c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) + return nil + } + return err + } + if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 { + return errors.WithStack(errNoServiceModeReturned) + } + c.serviceModeUpdateCb(clusterInfo.ServiceModes[0]) + return nil +} + +func (c *pdServiceDiscovery) updateMember() error { + for i, url := range c.GetServiceURLs() { + failpoint.Inject("skipFirstUpdateMember", func() { + if i == 0 { + failpoint.Continue() + } + }) + + members, err := c.getMembers(c.ctx, url, updateMemberTimeout) + // Check the cluster ID. + if err == nil && members.GetHeader().GetClusterId() != c.clusterID { + err = errs.ErrClientUpdateMember.FastGenByArgs("cluster id does not match") + } + // Check the TSO Allocator Leader. + var errTSO error + if err == nil { + if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { + err = errs.ErrClientGetLeader.FastGenByArgs("leader address doesn't exist") + } + // Still need to update TsoAllocatorLeaders, even if there is no PD leader + errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders()) + } + + // Failed to get members + if err != nil { + log.Info("[pd] cannot update member from this address", + zap.String("address", url), + errs.ZapError(err)) + select { + case <-c.ctx.Done(): + return errors.WithStack(err) + default: + continue + } + } + + c.updateURLs(members.GetMembers()) + c.updateFollowers(members.GetMembers(), members.GetLeader()) + if err := c.switchLeader(members.GetLeader().GetClientUrls()); err != nil { + return err + } + + // If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error, + // the error of `switchTSOAllocatorLeader` will be returned. + return errTSO + } + return errs.ErrClientGetMember.FastGenByArgs() +} + +func (c *pdServiceDiscovery) getClusterInfo(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetClusterInfoResponse, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + cc, err := c.GetOrCreateGRPCConn(url) + if err != nil { + return nil, err + } + clusterInfo, err := pdpb.NewPDClient(cc).GetClusterInfo(ctx, &pdpb.GetClusterInfoRequest{}) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause() + } + if clusterInfo.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", clusterInfo.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetClusterInfo.Wrap(attachErr).GenWithStackByCause() + } + return clusterInfo, nil +} + +func (c *pdServiceDiscovery) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + cc, err := c.GetOrCreateGRPCConn(url) + if err != nil { + return nil, err + } + members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{}) + if err != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } + if members.GetHeader().GetError() != nil { + attachErr := errors.Errorf("error:%s target:%s status:%s", members.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) + return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause() + } + return members, nil +} + +func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { + urls := make([]string, 0, len(members)) + for _, m := range members { + urls = append(urls, m.GetClientUrls()...) + } + + sort.Strings(urls) + oldURLs := c.GetServiceURLs() + // the url list is same. + if reflect.DeepEqual(oldURLs, urls) { + return + } + c.urls.Store(urls) + // Update the connection contexts when member changes if TSO Follower Proxy is enabled. + if c.option.getEnableTSOFollowerProxy() { + // Run callbacks to reflect the membership changes in the leader and followers. + for _, cb := range c.membersChangedCbs { + cb() + } + } + log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) +} + +func (c *pdServiceDiscovery) switchLeader(addrs []string) error { + // FIXME: How to safely compare leader urls? For now, only allows one client url. + addr := addrs[0] + oldLeader := c.getLeaderAddr() + if addr == oldLeader { + return nil + } + + if _, err := c.GetOrCreateGRPCConn(addr); err != nil { + log.Warn("[pd] failed to connect leader", zap.String("leader", addr), errs.ZapError(err)) + } + // Set PD leader and Global TSO Allocator (which is also the PD leader) + c.leader.Store(addr) + // Run callbacks + if c.tsoGlobalAllocLeaderUpdatedCb != nil { + if err := c.tsoGlobalAllocLeaderUpdatedCb(addr); err != nil { + return err + } + } + for _, cb := range c.leaderSwitchedCbs { + cb() + } + log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader)) + return nil +} + +func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { + var addrs []string + for _, member := range members { + if member.GetMemberId() != leader.GetMemberId() { + if len(member.GetClientUrls()) > 0 { + addrs = append(addrs, member.GetClientUrls()...) + } + } + } + c.followers.Store(addrs) +} + +func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error { + if len(allocatorMap) == 0 { + return nil + } + + allocMap := make(map[string]string) + // Switch to the new one + for dcLocation, member := range allocatorMap { + if len(member.GetClientUrls()) == 0 { + continue + } + allocMap[dcLocation] = member.GetClientUrls()[0] + } + + // Run the callback to reflect any possible change in the local tso allocators. + if c.tsoLocalAllocLeadersUpdatedCb != nil { + if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil { + return err + } + } + + return nil +} + +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr +func (c *pdServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +} diff --git a/client/tso_client.go b/client/tso_client.go new file mode 100644 index 00000000000..fc38ee8e5ba --- /dev/null +++ b/client/tso_client.go @@ -0,0 +1,279 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "go.uber.org/zap" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// TSOClient is the client used to get timestamps. +type TSOClient interface { + // GetTS gets a timestamp from PD or TSO microservice. + GetTS(ctx context.Context) (int64, int64, error) + // GetTSAsync gets a timestamp from PD or TSO microservice, without block the caller. + GetTSAsync(ctx context.Context) TSFuture + // GetLocalTS gets a local timestamp from PD or TSO microservice. + GetLocalTS(ctx context.Context, dcLocation string) (int64, int64, error) + // GetLocalTSAsync gets a local timestamp from PD or TSO microservice, without block the caller. + GetLocalTSAsync(ctx context.Context, dcLocation string) TSFuture + // GetMinTS gets a timestamp from PD or the minimal timestamp across all keyspace groups from + // the TSO microservice. + GetMinTS(ctx context.Context) (int64, int64, error) +} + +type tsoRequest struct { + start time.Time + clientCtx context.Context + requestCtx context.Context + done chan error + physical int64 + logical int64 + dcLocation string +} + +var tsoReqPool = sync.Pool{ + New: func() interface{} { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + } + }, +} + +type tsoClient struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + option *option + + svcDiscovery ServiceDiscovery + tsoStreamBuilderFactory + // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} + tsoAllocators sync.Map // Store as map[string]string + // tsoAllocServingAddrSwitchedCallback will be called when any global/local + // tso allocator leader is switched. + tsoAllocServingAddrSwitchedCallback []func() + + // tsoDispatcher is used to dispatch different TSO requests to + // the corresponding dc-location TSO channel. + tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest + // dc-location -> deadline + tsDeadline sync.Map // Same as map[string]chan deadline + // dc-location -> *tsoInfo while the tsoInfo is the last TSO info + lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo + + checkTSDeadlineCh chan struct{} + checkTSODispatcherCh chan struct{} + updateTSOConnectionCtxsCh chan struct{} +} + +// newTSOClient returns a new TSO client. +func newTSOClient( + ctx context.Context, option *option, + svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, +) *tsoClient { + ctx, cancel := context.WithCancel(ctx) + c := &tsoClient{ + ctx: ctx, + cancel: cancel, + option: option, + svcDiscovery: svcDiscovery, + tsoStreamBuilderFactory: factory, + checkTSDeadlineCh: make(chan struct{}), + checkTSODispatcherCh: make(chan struct{}, 1), + updateTSOConnectionCtxsCh: make(chan struct{}, 1), + } + + eventSrc := svcDiscovery.(tsoAllocatorEventSource) + eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs) + eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr) + c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) + + return c +} + +func (c *tsoClient) Setup() { + c.svcDiscovery.CheckMemberChanged() + c.updateTSODispatcher() + + // Start the daemons. + c.wg.Add(2) + go c.tsoDispatcherCheckLoop() + go c.tsCancelLoop() +} + +// Close closes the TSO client +func (c *tsoClient) Close() { + if c == nil { + return + } + log.Info("closing tso client") + + c.cancel() + c.wg.Wait() + + log.Info("close tso client") + c.tsoDispatcher.Range(func(_, dispatcherInterface interface{}) bool { + if dispatcherInterface != nil { + dispatcher := dispatcherInterface.(*tsoDispatcher) + tsoErr := errors.WithStack(errClosing) + dispatcher.tsoBatchController.revokePendingRequest(tsoErr) + dispatcher.dispatcherCancel() + } + return true + }) + + log.Info("tso client is closed") +} + +// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map +func (c *tsoClient) GetTSOAllocators() *sync.Map { + return &c.tsoAllocators +} + +// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation +func (c *tsoClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { + url, exist := c.tsoAllocators.Load(dcLocation) + if !exist { + return "", false + } + return url.(string), true +} + +// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection +// of the given dcLocation +func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { + url, ok := c.tsoAllocators.Load(dcLocation) + if !ok { + panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) + } + // todo: if we support local tso forward, we should get or create client conns. + cc, ok := c.svcDiscovery.GetClientConns().Load(url) + if !ok { + return nil, url.(string) + } + return cc.(*grpc.ClientConn), url.(string) +} + +// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called +// when any global/local tso allocator service endpoint is switched. +func (c *tsoClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { + c.tsoAllocServingAddrSwitchedCallback = append(c.tsoAllocServingAddrSwitchedCallback, callbacks...) +} + +func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) error { + if len(allocatorMap) == 0 { + return nil + } + + updated := false + + // Switch to the new one + for dcLocation, addr := range allocatorMap { + if len(addr) == 0 { + continue + } + oldAddr, exist := c.GetTSOAllocatorServingAddrByDCLocation(dcLocation) + if exist && addr == oldAddr { + continue + } + updated = true + if _, err := c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + log.Warn("[tso] failed to connect dc tso allocator serving address", + zap.String("dc-location", dcLocation), + zap.String("serving-address", addr), + errs.ZapError(err)) + return err + } + c.tsoAllocators.Store(dcLocation, addr) + log.Info("[tso] switch dc tso local allocator serving address", + zap.String("dc-location", dcLocation), + zap.String("new-address", addr), + zap.String("old-address", oldAddr)) + } + + // Garbage collection of the old TSO allocator primaries + c.gcAllocatorServingAddr(allocatorMap) + + if updated { + c.scheduleCheckTSODispatcher() + } + + return nil +} + +func (c *tsoClient) updateTSOGlobalServAddr(addr string) error { + c.tsoAllocators.Store(globalDCLocation, addr) + log.Info("[tso] switch dc tso global allocator serving address", + zap.String("dc-location", globalDCLocation), + zap.String("new-address", addr)) + c.scheduleCheckTSODispatcher() + return nil +} + +func (c *tsoClient) gcAllocatorServingAddr(curAllocatorMap map[string]string) { + // Clean up the old TSO allocators + c.tsoAllocators.Range(func(dcLocationKey, _ interface{}) bool { + dcLocation := dcLocationKey.(string) + // Skip the Global TSO Allocator + if dcLocation == globalDCLocation { + return true + } + if _, exist := curAllocatorMap[dcLocation]; !exist { + log.Info("[tso] delete unused tso allocator", zap.String("dc-location", dcLocation)) + c.tsoAllocators.Delete(dcLocation) + } + return true + }) +} + +// backupClientConn gets a grpc client connection of the current reachable and healthy +// backup service endpoints randomly. Backup service endpoints are followers in a +// quorum-based cluster or secondaries in a primary/secondary configured cluster. +func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { + addrs := c.svcDiscovery.GetBackupAddrs() + if len(addrs) < 1 { + return nil, "" + } + var ( + cc *grpc.ClientConn + err error + ) + for i := 0; i < len(addrs); i++ { + addr := addrs[rand.Intn(len(addrs))] + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + return cc, addr + } + } + return nil, "" +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go new file mode 100644 index 00000000000..0de4dc3a49e --- /dev/null +++ b/client/tso_dispatcher.go @@ -0,0 +1,838 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "context" + "fmt" + "math/rand" + "runtime/trace" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/client/timerpool" + "github.com/tikv/pd/client/tsoutil" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +type tsoDispatcher struct { + dispatcherCancel context.CancelFunc + tsoBatchController *tsoBatchController +} + +type tsoInfo struct { + tsoServer string + reqKeyspaceGroupID uint32 + respKeyspaceGroupID uint32 + respReceivedAt time.Time + physical int64 + logical int64 +} + +const ( + tsLoopDCCheckInterval = time.Minute + defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 +) + +func (c *tsoClient) scheduleCheckTSODispatcher() { + select { + case c.checkTSODispatcherCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { + select { + case c.updateTSOConnectionCtxsCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) error { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok { + err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation)) + log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err)) + c.svcDiscovery.ScheduleCheckMemberChanged() + return err + } + + defer trace.StartRegion(request.requestCtx, "tsoReqEnqueue").End() + dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + return nil +} + +// TSFuture is a future which promises to return a TSO. +type TSFuture interface { + // Wait gets the physical and logical time, it would block caller if data is not available yet. + Wait() (int64, int64, error) +} + +func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { + // If tso command duration is observed very high, the reason could be it + // takes too long for Wait() be called. + start := time.Now() + cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) + select { + case err = <-req.done: + defer trace.StartRegion(req.requestCtx, "tsoReqDone").End() + err = errors.WithStack(err) + defer tsoReqPool.Put(req) + if err != nil { + cmdFailDurationTSO.Observe(time.Since(req.start).Seconds()) + return 0, 0, err + } + physical, logical = req.physical, req.logical + now := time.Now() + cmdDurationWait.Observe(now.Sub(start).Seconds()) + cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) + return + case <-req.requestCtx.Done(): + return 0, 0, errors.WithStack(req.requestCtx.Err()) + case <-req.clientCtx.Done(): + return 0, 0, errors.WithStack(req.clientCtx.Err()) + } +} + +func (c *tsoClient) updateTSODispatcher() { + // Set up the new TSO dispatcher and batch controller. + c.GetTSOAllocators().Range(func(dcLocationKey, _ interface{}) bool { + dcLocation := dcLocationKey.(string) + if !c.checkTSODispatcher(dcLocation) { + c.createTSODispatcher(dcLocation) + } + return true + }) + // Clean up the unused TSO dispatcher + c.tsoDispatcher.Range(func(dcLocationKey, dispatcher interface{}) bool { + dcLocation := dcLocationKey.(string) + // Skip the Global TSO Allocator + if dcLocation == globalDCLocation { + return true + } + if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { + log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) + dispatcher.(*tsoDispatcher).dispatcherCancel() + c.tsoDispatcher.Delete(dcLocation) + } + return true + }) +} + +type deadline struct { + timer *time.Timer + done chan struct{} + cancel context.CancelFunc +} + +func newTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *deadline { + timer := timerpool.GlobalTimerPool.Get(timeout) + return &deadline{ + timer: timer, + done: done, + cancel: cancel, + } +} + +func (c *tsoClient) tsCancelLoop() { + defer c.wg.Done() + + tsCancelLoopCtx, tsCancelLoopCancel := context.WithCancel(c.ctx) + defer tsCancelLoopCancel() + + ticker := time.NewTicker(tsLoopDCCheckInterval) + defer ticker.Stop() + for { + // Watch every dc-location's tsDeadlineCh + c.GetTSOAllocators().Range(func(dcLocation, _ interface{}) bool { + c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) + return true + }) + select { + case <-c.checkTSDeadlineCh: + continue + case <-ticker.C: + continue + case <-tsCancelLoopCtx.Done(): + log.Info("exit tso requests cancel loop") + return + } + } +} + +func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { + if _, exist := c.tsDeadline.Load(dcLocation); !exist { + tsDeadlineCh := make(chan *deadline, 1) + c.tsDeadline.Store(dcLocation, tsDeadlineCh) + go func(dc string, tsDeadlineCh <-chan *deadline) { + for { + select { + case d := <-tsDeadlineCh: + select { + case <-d.timer.C: + log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) + d.cancel() + timerpool.GlobalTimerPool.Put(d.timer) + case <-d.done: + timerpool.GlobalTimerPool.Put(d.timer) + case <-ctx.Done(): + timerpool.GlobalTimerPool.Put(d.timer) + return + } + case <-ctx.Done(): + return + } + } + }(dcLocation, tsDeadlineCh) + } +} + +func (c *tsoClient) scheduleCheckTSDeadline() { + select { + case c.checkTSDeadlineCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) tsoDispatcherCheckLoop() { + defer c.wg.Done() + + loopCtx, loopCancel := context.WithCancel(c.ctx) + defer loopCancel() + + ticker := time.NewTicker(tsLoopDCCheckInterval) + defer ticker.Stop() + for { + c.updateTSODispatcher() + select { + case <-ticker.C: + case <-c.checkTSODispatcherCh: + case <-loopCtx.Done(): + log.Info("exit tso dispatcher loop") + return + } + } +} + +func (c *tsoClient) checkAllocator( + dispatcherCtx context.Context, + forwardCancel context.CancelFunc, + dc, forwardedHostTrim, addrTrim, url string, + updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { + defer func() { + // cancel the forward stream + forwardCancel() + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) + }() + cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) + var healthCli healthpb.HealthClient + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + // the pd/allocator leader change, we need to re-establish the stream + if u != url { + log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) + return + } + if healthCli == nil && cc != nil { + healthCli = healthpb.NewHealthClient(cc) + } + if healthCli != nil { + healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + failpoint.Inject("unreachableNetwork", func() { + resp.Status = healthpb.HealthCheckResponse_UNKNOWN + }) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + // create a stream of the original allocator + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + if err == nil && stream != nil { + log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return + } + } + } + select { + case <-dispatcherCtx.Done(): + return + case <-ticker.C: + // To ensure we can get the latest allocator leader + // and once the leader is changed, we can exit this function. + cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) + } + } +} + +func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok || dispatcher == nil { + return false + } + return true +} + +func (c *tsoClient) createTSODispatcher(dcLocation string) { + dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) + dispatcher := &tsoDispatcher{ + dispatcherCancel: dispatcherCancel, + tsoBatchController: newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize), + } + + if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { + // Successfully stored the value. Start the following goroutine. + // Each goroutine is responsible for handling the tso stream request for its dc-location. + // The only case that will make the dispatcher goroutine exit + // is that the loopCtx is done, otherwise there is no circumstance + // this goroutine should exit. + c.wg.Add(1) + go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) + log.Info("[tso] tso dispatcher created", zap.String("dc-location", dcLocation)) + } else { + dispatcherCancel() + } +} + +func (c *tsoClient) handleDispatcher( + dispatcherCtx context.Context, + dc string, + tbc *tsoBatchController) { + var ( + err error + streamAddr string + stream tsoStream + streamCtx context.Context + cancel context.CancelFunc + // addr -> connectionContext + connectionCtxs sync.Map + opts []opentracing.StartSpanOption + ) + defer func() { + log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) + // Cancel all connections. + connectionCtxs.Range(func(_, cc interface{}) bool { + cc.(*tsoConnectionContext).cancel() + return true + }) + c.wg.Done() + }() + // Call updateTSOConnectionCtxs once to init the connectionCtxs first. + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + // Only the Global TSO needs to watch the updateTSOConnectionCtxsCh to sense the + // change of the cluster when TSO Follower Proxy is enabled. + // TODO: support TSO Follower Proxy for the Local TSO. + if dc == globalDCLocation { + go func() { + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + select { + case <-dispatcherCtx.Done(): + return + case <-c.option.enableTSOFollowerProxyCh: + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered + continue + } + case <-updateTicker.C: + case <-c.updateTSOConnectionCtxsCh: + } + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + } + }() + } + + // Loop through each batch of TSO requests and send them for processing. + streamLoopTimer := time.NewTimer(c.option.timeout) + defer streamLoopTimer.Stop() + bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout) +tsoBatchLoop: + for { + select { + case <-dispatcherCtx.Done(): + return + default: + } + // Start to collect the TSO requests. + maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + if err == context.Canceled { + log.Info("[tso] stop fetching the pending tso requests due to context canceled", + zap.String("dc-location", dc)) + } else { + log.Error("[tso] fetch pending tso requests error", + zap.String("dc-location", dc), + errs.ZapError(errs.ErrClientGetTSO.FastGenByArgs("when fetch pending tso requests"), err)) + } + return + } + if maxBatchWaitInterval >= 0 { + tbc.adjustBestBatchSize() + } + // Stop the timer if it's not stopped. + if !streamLoopTimer.Stop() { + select { + case <-streamLoopTimer.C: // try to drain from the channel + default: + } + } + // We need be careful here, see more details in the comments of Timer.Reset. + // https://pkg.go.dev/time@master#Timer.Reset + streamLoopTimer.Reset(c.option.timeout) + // Choose a stream to send the TSO gRPC request. + streamChoosingLoop: + for { + connectionCtx := c.chooseStream(&connectionCtxs) + if connectionCtx != nil { + streamAddr, stream, streamCtx, cancel = connectionCtx.streamAddr, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel + } + // Check stream and retry if necessary. + if stream == nil { + log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) + if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + continue streamChoosingLoop + } + timer := time.NewTimer(retryInterval) + select { + case <-dispatcherCtx.Done(): + timer.Stop() + return + case <-streamLoopTimer.C: + err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) + log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + c.svcDiscovery.ScheduleCheckMemberChanged() + c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + timer.Stop() + continue tsoBatchLoop + case <-timer.C: + timer.Stop() + continue streamChoosingLoop + } + } + select { + case <-streamCtx.Done(): + log.Info("[tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) + // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. + connectionCtxs.Delete(streamAddr) + cancel() + stream = nil + continue + default: + break streamChoosingLoop + } + } + done := make(chan struct{}) + dl := newTSDeadline(c.option.timeout, done, cancel) + tsDeadlineCh, ok := c.tsDeadline.Load(dc) + for !ok || tsDeadlineCh == nil { + c.scheduleCheckTSDeadline() + time.Sleep(time.Millisecond * 100) + tsDeadlineCh, ok = c.tsDeadline.Load(dc) + } + select { + case <-dispatcherCtx.Done(): + return + case tsDeadlineCh.(chan *deadline) <- dl: + } + opts = extractSpanReference(tbc, opts[:0]) + err = c.processRequests(stream, dc, tbc, opts) + close(done) + // If error happens during tso stream handling, reset stream and run the next trial. + if err != nil { + select { + case <-dispatcherCtx.Done(): + return + default: + } + c.svcDiscovery.ScheduleCheckMemberChanged() + log.Error("[tso] getTS error", + zap.String("dc-location", dc), + zap.String("stream-addr", streamAddr), + errs.ZapError(errs.ErrClientGetTSO.FastGenByArgs("after processing requests"), err)) + // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. + connectionCtxs.Delete(streamAddr) + cancel() + stream = nil + // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. + if IsLeaderChange(err) { + if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil { + select { + case <-dispatcherCtx.Done(): + return + default: + } + } + // Because the TSO Follower Proxy could be configured online, + // If we change it from on -> off, background updateTSOConnectionCtxs + // will cancel the current stream, then the EOF error caused by cancel() + // should not trigger the updateTSOConnectionCtxs here. + // So we should only call it when the leader changes. + c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + } + } + } +} + +// TSO Follower Proxy only supports the Global TSO proxy now. +func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { + return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() +} + +// chooseStream uses the reservoir sampling algorithm to randomly choose a connection. +// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. +func (c *tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { + idx := 0 + connectionCtxs.Range(func(_, cc interface{}) bool { + j := rand.Intn(idx + 1) + if j < 1 { + connectionCtx = cc.(*tsoConnectionContext) + } + idx++ + return true + }) + return connectionCtx +} + +type tsoConnectionContext struct { + streamAddr string + // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, + // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster + stream tsoStream + ctx context.Context + cancel context.CancelFunc +} + +func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { + // Normal connection creating, it will be affected by the `enableForwarding`. + createTSOConnection := c.tryConnectToTSO + if c.allowTSOFollowerProxy(dc) { + createTSOConnection = c.tryConnectToTSOWithProxy + } + if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { + log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false + } + return true +} + +// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable +// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, +// while a new daemon will be created also to switch back to a normal leader connection ASAP the +// connection comes back to normal. +func (c *tsoClient) tryConnectToTSO( + dispatcherCtx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { + var ( + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + ) + updateAndClear := func(newAddr string, connectionCtx *tsoConnectionContext) { + if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded { + // If the previous connection still exists, we should close it first. + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Store(newAddr, connectionCtx) + } + connectionCtxs.Range(func(addr, cc interface{}) bool { + if addr.(string) != newAddr { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + } + // retry several times before falling back to the follower when the network problem happens + + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() + for i := 0; i < maxRetryTimes; i++ { + c.svcDiscovery.ScheduleCheckMemberChanged() + cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if cc != nil { + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + failpoint.Inject("unreachableNetwork", func() { + stream = nil + err = status.New(codes.Unavailable, "unavailable").Err() + }) + if stream != nil && err == nil { + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return nil + } + + if err != nil && c.option.enableForwarding { + // The reason we need to judge if the error code is equal to "Canceled" here is that + // when we create a stream we use a goroutine to manually control the timeout of the connection. + // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. + // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. + // And actually the `Canceled` error can be regarded as a kind of network error in some way. + if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { + networkErrNum++ + } + } + cancel() + } else { + networkErrNum++ + } + select { + case <-dispatcherCtx.Done(): + return err + case <-ticker.C: + } + } + + if networkErrNum == maxRetryTimes { + // encounter the network error + backupClientConn, addr := c.backupClientConn() + if backupClientConn != nil { + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) + forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + + // create the follower stream + cctx, cancel := context.WithCancel(dispatcherCtx) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + if err == nil { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + // the goroutine is used to check the network and change back to the original stream + go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, updateAndClear) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + updateAndClear(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + return nil + } + cancel() + } + } + return err +} + +// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers +// or of keyspace group primary/secondaries. +func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { + var ( + addrs = c.svcDiscovery.GetServiceURLs() + streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) + cc *grpc.ClientConn + err error + ) + for _, addr := range addrs { + if len(addrs) == 0 { + continue + } + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) + } + } + return streamBuilders +} + +// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as +// a TSO proxy to reduce the pressure of the main serving service endpoint. +func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { + tsoStreamBuilders := c.getAllTSOStreamBuilders() + leaderAddr := c.svcDiscovery.GetServingAddr() + forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + // GC the stale one. + connectionCtxs.Range(func(addr, cc interface{}) bool { + if _, ok := tsoStreamBuilders[addr.(string)]; !ok { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + // Update the missing one. + for addr, tsoStreamBuilder := range tsoStreamBuilders { + if _, ok = connectionCtxs.Load(addr); ok { + continue + } + cctx, cancel := context.WithCancel(dispatcherCtx) + // Do not proxy the leader client. + if addr != leaderAddr { + log.Info("[tso] use follower to forward tso stream to do the proxy", + zap.String("dc", dc), zap.String("addr", addr)) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + } + // Create the TSO stream. + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + if err == nil { + if addr != leaderAddr { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + } + connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + continue + } + log.Error("[tso] create the tso stream failed", + zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + cancel() + } + return nil +} + +func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanOption) []opentracing.StartSpanOption { + for _, req := range tbc.getCollectedRequests() { + if span := opentracing.SpanFromContext(req.requestCtx); span != nil { + opts = append(opts, opentracing.ChildOf(span.Context())) + } + } + return opts +} + +func (c *tsoClient) processRequests( + stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption, +) error { + if len(opts) > 0 { + span := opentracing.StartSpan("pdclient.processRequests", opts...) + defer span.Finish() + } + + requests := tbc.getCollectedRequests() + for _, req := range requests { + defer trace.StartRegion(req.requestCtx, "tsoReqSend").End() + } + count := int64(len(requests)) + reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( + c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, + dcLocation, requests, tbc.batchStartTime) + if err != nil { + c.finishRequest(requests, 0, 0, 0, err) + return err + } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) + curTSOInfo := &tsoInfo{ + tsoServer: stream.getServerAddr(), + reqKeyspaceGroupID: reqKeyspaceGroupID, + respKeyspaceGroupID: respKeyspaceGroupID, + respReceivedAt: time.Now(), + physical: physical, + logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), + } + c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) + c.finishRequest(requests, physical, firstLogical, suffixBits, nil) + return nil +} + +func (c *tsoClient) compareAndSwapTS( + dcLocation string, + curTSOInfo *tsoInfo, + physical, firstLogical int64, +) { + val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) + if !loaded { + return + } + lastTSOInfo := val.(*tsoInfo) + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dcLocation), + zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) + } + + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical + // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then + // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // last time. + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + lastTSOInfo.tsoServer = curTSOInfo.tsoServer + lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID + lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID + lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt + lastTSOInfo.physical = curTSOInfo.physical + lastTSOInfo.logical = curTSOInfo.logical +} + +func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { + for i := 0; i < len(requests); i++ { + if span := opentracing.SpanFromContext(requests[i].requestCtx); span != nil { + span.Finish() + } + requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) + defer trace.StartRegion(requests[i].requestCtx, "tsoReqDequeue").End() + requests[i].done <- err + } +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 5cc068a8c20..276c07f9673 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -532,7 +532,7 @@ func TestCustomTimeout(t *testing.T) { re.Less(time.Since(start), 2*time.Second) } -func TestGetRegionFromFollowerClient(t *testing.T) { +func TestGetRegionByFollowerForwarding(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -558,7 +558,7 @@ func TestGetRegionFromFollowerClient(t *testing.T) { } // case 1: unreachable -> normal -func TestGetTsoFromFollowerClient1(t *testing.T) { +func TestGetTsoByFollowerForwarding1(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -589,7 +589,7 @@ func TestGetTsoFromFollowerClient1(t *testing.T) { } // case 2: unreachable -> leader transfer -> normal -func TestGetTsoFromFollowerClient2(t *testing.T) { +func TestGetTsoByFollowerForwarding2(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -623,6 +623,101 @@ func TestGetTsoFromFollowerClient2(t *testing.T) { checkTS(re, cli, lastTS) } +// case 3: network partition between client and follower A -> transfer leader to follower A -> normal +func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + pd.LeaderHealthCheckInterval = 100 * time.Millisecond + cluster, err := tests.NewTestCluster(ctx, 3) + re.NoError(err) + defer cluster.Destroy() + + endpoints := runServer(re, cluster) + re.NotEmpty(cluster.WaitLeader()) + leader := cluster.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + testutil.Eventually(re, func() bool { + regionHeartbeat, err := grpcPDClient.RegionHeartbeat(ctx) + re.NoError(err) + regionID := regionIDAllocator.alloc() + region := &metapb.Region{ + Id: regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: peers, + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(leader.GetServer()), + Region: region, + Leader: peers[0], + } + err = regionHeartbeat.Send(req) + re.NoError(err) + _, err = regionHeartbeat.Recv() + return err == nil + }) + follower := cluster.GetServer(cluster.GetFollower()) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) + + cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) + var lastTS uint64 + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + r, err := cli.GetRegion(context.Background(), []byte("a")) + re.NoError(err) + re.NotNil(r) + leader.GetServer().GetMember().ResignEtcdLeader(leader.GetServer().Context(), + leader.GetServer().Name(), follower.GetServer().Name()) + re.NotEmpty(cluster.WaitLeader()) + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + testutil.Eventually(re, func() bool { + r, err = cli.GetRegion(context.Background(), []byte("a")) + if err == nil && r != nil { + return true + } + return false + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2")) + testutil.Eventually(re, func() bool { + physical, logical, err := cli.GetTS(context.TODO()) + if err == nil { + lastTS = tsoutil.ComposeTS(physical, logical) + return true + } + t.Log(err) + return false + }) + lastTS = checkTS(re, cli, lastTS) + testutil.Eventually(re, func() bool { + r, err = cli.GetRegion(context.Background(), []byte("a")) + if err == nil && r != nil { + return true + } + return false + }) +} + func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 { for i := 0; i < tsoRequestRound; i++ { physical, logical, err := cli.GetTS(context.TODO())