Skip to content

Commit

Permalink
Merge branch 'master' into resourcemanager-availableRUCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Oct 17, 2024
2 parents f3777ef + f0c84e4 commit f3564f2
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 64 deletions.
39 changes: 8 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func newClientWithKeyspaceName(
}
clientCtx, clientCancel := context.WithCancel(ctx)
c := &client{
keyspaceID: nullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
Expand All @@ -455,19 +456,21 @@ func newClientWithKeyspaceName(
opt(c)
}

updateKeyspaceIDCb := func() error {
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
updateKeyspaceIDFunc := func() error {
keyspaceMeta, err := c.LoadKeyspace(clientCtx, keyspaceName)
if err != nil {
return err
}
c.keyspaceID = keyspaceMeta.GetId()
// c.keyspaceID is the source of truth for keyspace id.
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)
return nil
}

// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
c.pdSvcDiscovery = newPDServiceDiscovery(
clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg,
c.setServiceMode, updateKeyspaceIDFunc, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
Expand All @@ -482,32 +485,6 @@ func newClientWithKeyspaceName(
return c, nil
}

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-ticker.C:
}
}
return errors.WithStack(err)
}

func (c *client) loadKeyspaceMeta(keyspace string) error {
keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace)
if err != nil {
return err
}
c.keyspaceID = keyspaceMeta.GetId()
return nil
}

func (c *client) setup() error {
// Init the metrics.
if c.option.initMetrics {
Expand Down Expand Up @@ -579,7 +556,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option,
Expand Down
32 changes: 16 additions & 16 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ type pdServiceDiscovery struct {
cancel context.CancelFunc
closeOnce sync.Once

updateKeyspaceIDCb updateKeyspaceIDFunc
keyspaceID uint32
tlsCfg *tls.Config
updateKeyspaceIDFunc updateKeyspaceIDFunc
keyspaceID uint32
tlsCfg *tls.Config
// Client option.
option *option
}
Expand All @@ -461,21 +461,21 @@ func newPDServiceDiscovery(
ctx context.Context, cancel context.CancelFunc,
wg *sync.WaitGroup,
serviceModeUpdateCb func(pdpb.ServiceMode),
updateKeyspaceIDCb updateKeyspaceIDFunc,
updateKeyspaceIDFunc updateKeyspaceIDFunc,
keyspaceID uint32,
urls []string, tlsCfg *tls.Config, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: wg,
apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)},
serviceModeUpdateCb: serviceModeUpdateCb,
updateKeyspaceIDCb: updateKeyspaceIDCb,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: wg,
apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)},
serviceModeUpdateCb: serviceModeUpdateCb,
updateKeyspaceIDFunc: updateKeyspaceIDFunc,
keyspaceID: keyspaceID,
tlsCfg: tlsCfg,
option: option,
}
urls = addrsToURLs(urls, tlsCfg)
pdsd.urls.Store(urls)
Expand All @@ -500,8 +500,8 @@ func (c *pdServiceDiscovery) Init() error {

// We need to update the keyspace ID before we discover and update the service mode
// so that TSO in API mode can be initialized with the correct keyspace ID.
if c.updateKeyspaceIDCb != nil {
if err := c.updateKeyspaceIDCb(); err != nil {
if c.keyspaceID == nullKeyspaceID && c.updateKeyspaceIDFunc != nil {
if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ type tsoServiceDiscovery struct {
// newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service.
func newTSOServiceDiscovery(
ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery,
clusterID uint64, keyspaceID uint32, tlsCfg *tls.Config, option *option,
keyspaceID uint32, tlsCfg *tls.Config, option *option,
) ServiceDiscovery {
ctx, cancel := context.WithCancel(ctx)
c := &tsoServiceDiscovery{
ctx: ctx,
cancel: cancel,
metacli: metacli,
apiSvcDiscovery: apiSvcDiscovery,
clusterID: clusterID,
clusterID: apiSvcDiscovery.GetClusterID(),
tlsCfg: tlsCfg,
option: option,
checkMembershipCh: make(chan struct{}, 1),
Expand All @@ -180,10 +180,10 @@ func newTSOServiceDiscovery(
c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)}
// Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs,
// will be discovered later.
c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID)
c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, c.clusterID, defaultKeySpaceGroupID)

log.Info("created tso service discovery",
zap.Uint64("cluster-id", clusterID),
zap.Uint64("cluster-id", c.clusterID),
zap.Uint32("keyspace-id", keyspaceID),
zap.String("default-discovery-key", c.defaultDiscoveryKey))

Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo)
if c.GetRegionStats() != nil {
c.GetRegionStats().ClearDefunctRegion(item.GetID())
}
c.GetLabelStats().ClearDefunctRegion(item.GetID())
c.GetLabelStats().MarkDefunctRegion(item.GetID())
c.GetRuleManager().InvalidCache(item.GetID())
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,12 @@ func (c *Cluster) waitSchedulersInitialized() {
}
}

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
for _, region := range regions {
c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels())
}
c.labelStats.ClearDefunctRegions()
}

func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo {
Expand Down
24 changes: 19 additions & 5 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,13 +365,15 @@ type LabelStatistics struct {
syncutil.RWMutex
regionLabelStats map[uint64]string
labelCounter map[string]int
defunctRegions map[uint64]struct{}
}

// NewLabelStatistics creates a new LabelStatistics.
func NewLabelStatistics() *LabelStatistics {
return &LabelStatistics{
regionLabelStats: make(map[uint64]string),
labelCounter: make(map[string]int),
defunctRegions: make(map[uint64]struct{}),
}
}

Expand Down Expand Up @@ -405,14 +407,26 @@ func ResetLabelStatsMetrics() {
regionLabelLevelGauge.Reset()
}

// ClearDefunctRegion is used to handle the overlap region.
func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) {
// MarkDefunctRegion is used to handle the overlap region.
// It is used to mark the region as defunct and remove it from the label statistics later.
func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) {
l.Lock()
defer l.Unlock()
if label, ok := l.regionLabelStats[regionID]; ok {
l.labelCounter[label]--
delete(l.regionLabelStats, regionID)
l.defunctRegions[regionID] = struct{}{}
}

// ClearDefunctRegions is used to handle the overlap region.
// It is used to remove the defunct regions from the label statistics.
func (l *LabelStatistics) ClearDefunctRegions() {
l.Lock()
defer l.Unlock()
for regionID := range l.defunctRegions {
if label, ok := l.regionLabelStats[regionID]; ok {
l.labelCounter[label]--
delete(l.regionLabelStats, regionID)
}
}
l.defunctRegions = make(map[uint64]struct{})
}

// GetLabelCounter is only used for tests.
Expand Down
5 changes: 3 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Info("region syncer start load region")
start := time.Now()
err := storage.TryLoadRegionsOnce(ctx, regionStorage, bc.CheckAndPutRegion)
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions", errs.ZapError(err))
log.Warn("region syncer failed to load regions", errs.ZapError(err), zap.Duration("time-cost", time.Since(start)))
} else {
log.Info("region syncer finished load regions", zap.Duration("time-cost", time.Since(start)))
}
// establish client.
conn := grpcutil.CreateClientConn(ctx, addr, s.tlsConfig,
Expand Down
36 changes: 33 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
opt.SetReplicationConfig(cfg)
re.NoError(err)
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend())
cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil)

for i := uint64(1); i <= 4; i++ {
var labels []*metapb.StoreLabel
Expand Down Expand Up @@ -1159,13 +1160,42 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
StartKey: []byte{byte(1)},
EndKey: []byte{byte(2)},
}
r := core.NewRegionInfo(region, peers[0])
re.NoError(cluster.putRegion(r))
r1 := core.NewRegionInfo(region, peers[0])
re.NoError(cluster.putRegion(r1))

cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r})
cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1})
counter := cluster.labelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(1, counter["zone"])

region = &metapb.Region{
Id: 10,
Peers: peers,
StartKey: []byte{byte(2)},
EndKey: []byte{byte(3)},
}
r2 := core.NewRegionInfo(region, peers[0])
re.NoError(cluster.putRegion(r2))

cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2})
counter = cluster.labelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(2, counter["zone"])

// issue: https://github.com/tikv/pd/issues/8700
// step1: heartbeat a overlap region, which is used to simulate the case that the region is merged.
// step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered.
// We should only count region 9.
overlapRegion := r1.Clone(
core.WithStartKey(r1.GetStartKey()),
core.WithEndKey(r2.GetEndKey()),
core.WithLeader(r2.GetPeer(8)),
)
re.NoError(cluster.HandleRegionHeartbeat(overlapRegion))
cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2})
counter = cluster.labelStats.GetLabelCounter()
re.Equal(0, counter["none"])
re.Equal(1, counter["zone"])
}

func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) {
Expand Down
1 change: 1 addition & 0 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (sc *schedulingController) UpdateRegionsLabelLevelStats(regions []*core.Reg
for _, region := range regions {
sc.labelStats.Observe(region, sc.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), sc.opt.GetLocationLabels())
}
sc.labelStats.ClearDefunctRegions()
}

func (sc *schedulingController) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo {
Expand Down

0 comments on commit f3564f2

Please sign in to comment.