Skip to content

Commit

Permalink
tools/simulator: reduce range times (#8318)
Browse files Browse the repository at this point in the history
ref #8135

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Jun 25, 2024
1 parent 49c3b5a commit 6ae7b47
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 25 deletions.
10 changes: 9 additions & 1 deletion pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ const (
PendingPeerInSubTree SubTreeRegionType = "pending"
)

// GetStoreRegions gets all RegionInfo with a given storeID
// GetStoreRegionsByTypeInSubTree gets all RegionInfo with a given storeID
func (r *RegionsInfo) GetStoreRegionsByTypeInSubTree(storeID uint64, typ SubTreeRegionType) ([]*RegionInfo, error) {
r.st.RLock()
var regions []*RegionInfo
Expand Down Expand Up @@ -2210,3 +2210,11 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and function need to be self-locked.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
1 change: 1 addition & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ EXIT:

if simulator.PDHTTPClient != nil {
simulator.PDHTTPClient.Close()
simulator.SD.Close()
}
if simResult != "OK" {
os.Exit(1)
Expand Down
10 changes: 5 additions & 5 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
// errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
errFailInitClusterID = errors.New("[pd] failed to get cluster id")
PDHTTPClient pdHttp.Client
sd pd.ServiceDiscovery
SD pd.ServiceDiscovery
ClusterID uint64
)

Expand Down Expand Up @@ -167,9 +167,9 @@ func (c *client) HeartbeatStreamLoop() {

// update connection to recreate heartbeat stream
for i := 0; i < retryTimes; i++ {
sd.ScheduleCheckMemberChanged()
SD.ScheduleCheckMemberChanged()
time.Sleep(leaderChangedWaitTime)
if client := sd.GetServiceClient(); client != nil {
if client := SD.GetServiceClient(); client != nil {
_, conn, err := getLeaderURL(ctx, client.GetClientConn())
if err != nil {
simutil.Logger.Error("[HeartbeatStreamLoop] failed to get leader URL", zap.Error(err))
Expand Down Expand Up @@ -351,9 +351,9 @@ func (rc *RetryClient) requestWithRetry(f func() (any, error)) (any, error) {
}
// retry to get leader URL
for i := 0; i < rc.retryCount; i++ {
sd.ScheduleCheckMemberChanged()
SD.ScheduleCheckMemberChanged()
time.Sleep(100 * time.Millisecond)
if client := sd.GetServiceClient(); client != nil {
if client := SD.GetServiceClient(); client != nil {
_, conn, err := getLeaderURL(context.Background(), client.GetClientConn())
if err != nil {
simutil.Logger.Error("[retry] failed to get leader URL", zap.Error(err))
Expand Down
6 changes: 3 additions & 3 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ func (d *Driver) allocID() error {
func (d *Driver) updateNodesClient() error {
urls := strings.Split(d.pdAddr, ",")
ctx, cancel := context.WithCancel(context.Background())
sd = pd.NewDefaultPDServiceDiscovery(ctx, cancel, urls, nil)
if err := sd.Init(); err != nil {
SD = pd.NewDefaultPDServiceDiscovery(ctx, cancel, urls, nil)
if err := SD.Init(); err != nil {
return err
}
// Init PD HTTP client.
PDHTTPClient = pdHttp.NewClientWithServiceDiscovery("pd-simulator", sd)
PDHTTPClient = pdHttp.NewClientWithServiceDiscovery("pd-simulator", SD)

for _, node := range d.conn.Nodes {
node.client = NewRetryClient(node)
Expand Down
9 changes: 4 additions & 5 deletions tools/pd-simulator/simulator/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
node = nodes[uint64(e.ID)]
}
if node == nil {
simutil.Logger.Error("node is not existed", zap.Uint64("node-id", node.Id))
return false
simutil.Logger.Error("node is not existed")
return true
}
delete(raft.conn.Nodes, node.Id)
// delete store
Expand All @@ -240,8 +240,7 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
}
node.Stop()

regions := raft.GetRegions()
for _, region := range regions {
raft.TraverseRegions(func(region *core.RegionInfo) {
storeIDs := region.GetStoreIDs()
if _, ok := storeIDs[node.Id]; ok {
downPeer := &pdpb.PeerStats{
Expand All @@ -251,6 +250,6 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
region = region.Clone(core.WithDownPeers(append(region.GetDownPeers(), downPeer)))
raft.SetRegion(region)
}
}
})
return true
}
6 changes: 3 additions & 3 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/tools/pd-simulator/simulator/cases"
Expand Down Expand Up @@ -204,8 +205,7 @@ func (n *Node) regionHeartBeat() {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
regions := n.raftEngine.GetRegions()
for _, region := range regions {
n.raftEngine.TraverseRegions(func(region *core.RegionInfo) {
if region.GetLeader() != nil && region.GetLeader().GetStoreId() == n.Id {
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
err := n.client.RegionHeartbeat(ctx, region)
Expand All @@ -217,7 +217,7 @@ func (n *Node) regionHeartBeat() {
}
cancel()
}
}
})
}

func (n *Node) reportRegionChange() {
Expand Down
13 changes: 5 additions & 8 deletions tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func NewRaftEngine(conf *cases.Case, conn *Connection, storeConfig *config.SimCo
}

func (r *RaftEngine) stepRegions() {
regions := r.GetRegions()
for _, region := range regions {
r.TraverseRegions(func(region *core.RegionInfo) {
r.stepLeader(region)
r.stepSplit(region)
}
})
}

func (r *RaftEngine) stepLeader(region *core.RegionInfo) {
Expand Down Expand Up @@ -265,11 +264,9 @@ func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) {
}
}

// GetRegions gets all RegionInfo from regionMap
func (r *RaftEngine) GetRegions() []*core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegions()
// TraverseRegions executes a function on all regions, and function need to be self-locked.
func (r *RaftEngine) TraverseRegions(lockedFunc func(*core.RegionInfo)) {
r.regionsInfo.TraverseRegions(lockedFunc)
}

// SetRegion sets the RegionInfo with regionID
Expand Down

0 comments on commit 6ae7b47

Please sign in to comment.