Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: refine pd-heartbeat-bench #3688

Merged
merged 8 commits into from
Aug 1, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 168 additions & 46 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"log"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -28,13 +29,22 @@ import (

var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
storeCount = flag.Int("store", 20, "store count")
regionCount = flag.Uint64("region", 1000000, "region count")
keyLen = flag.Int("keylen", 56, "key length")
storeCount = flag.Int("store", 40, "store count")
regionCount = flag.Int("region", 1000000, "region count")
keyLen = flag.Int("key-len", 56, "key length")
replica = flag.Int("replica", 3, "replica count")
regionUpdateRatio = flag.Float64("region-update-ratio", 0.05, "ratio of the region need to update")
leaderUpdateRatio = flag.Float64("leader", 0.06, "ratio of the region leader need to update, they need save-tree")
epochUpdateRatio = flag.Float64("epoch", 0.04, "ratio of the region epoch need to update, they need save-kv")
spaceUpdateRatio = flag.Float64("space", 0.15, "ratio of the region space need to update")
flowUpdateRatio = flag.Float64("flow", 0.35, "ratio of the region flow need to update")
sample = flag.Bool("sample", false, "sample per second")
heartbeatRounds = flag.Int("heartbeat-rounds", 5, "total rounds of heartbeat")
heartbeatRounds = flag.Int("heartbeat-rounds", 4, "total rounds of heartbeat")
)

const (
bytesUnit = 1 << 23 // 8MB
keysUint = 1 << 13 // 8K
intervalUint = 60 // 60s
)

var clusterID uint64
Expand Down Expand Up @@ -126,61 +136,168 @@ func newEndKey(id uint64) []byte {
return k
}

// Store simulates a TiKV to heartbeat.
type Store struct {
id uint64
// Regions simulates all regions to heartbeat.
type Regions struct {
regions []*pdpb.RegionHeartbeatRequest

updateRound int

updateLeader []int
updateEpoch []int
updateSpace []int
updateFlow []int
}

func (rs *Regions) init() {
rs.regions = make([]*pdpb.RegionHeartbeatRequest, 0, *regionCount)
rs.updateRound = 0

// Generate regions
id := uint64(1)
now := uint64(time.Now().Unix())

for i := 0; i < *regionCount; i++ {
region := &pdpb.RegionHeartbeatRequest{
Header: header(),
Region: &metapb.Region{
Id: id,
StartKey: newStartKey(id),
EndKey: newEndKey(id),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1},
},
ApproximateSize: bytesUnit,
Interval: &pdpb.TimeInterval{
StartTimestamp: now,
EndTimestamp: now + intervalUint,
},
ApproximateKeys: keysUint,
Term: 1,
}
id += 1

peers := make([]*metapb.Peer, 0, *replica)
for j := 0; j < *replica; j++ {
peers = append(peers, &metapb.Peer{Id: id, StoreId: uint64((i+j)%*storeCount + 1)})
id += 1
}

region.Region.Peers = peers
region.Leader = peers[0]
rs.regions = append(rs.regions, region)
}

// Generate sample index
slice := make([]int, *regionCount)
for i := range slice {
slice[i] = i
}

rand.Seed(0) // Ensure consistent behavior multiple times
pick := func(ratio float64) []int {
rand.Shuffle(*regionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(*regionCount)*ratio)]...)
}

rs.updateLeader = pick(*leaderUpdateRatio)
rs.updateEpoch = pick(*epochUpdateRatio)
rs.updateSpace = pick(*spaceUpdateRatio)
rs.updateFlow = pick(*flowUpdateRatio)
}

func (rs *Regions) update() {
rs.updateRound += 1

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%*replica]
}
// update epoch
for _, i := range rs.updateEpoch {
region := rs.regions[i]
region.Region.RegionEpoch.Version += 1
}
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize += bytesUnit
region.ApproximateKeys += keysUint
}
// update flow
for _, i := range rs.updateFlow {
region := rs.regions[i]
region.BytesWritten += bytesUnit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need a param to control the flow change range.

region.BytesRead += bytesUnit
region.KeysWritten += keysUint
region.KeysRead += keysUint
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + intervalUint
}
}

// Run runs the store.
func (s *Store) Run(startNotifier chan report.Report, endNotifier chan struct{}) {
func (rs *Regions) send(storeID uint64, startNotifier chan report.Report, endNotifier chan struct{}) {
cli := newClient()
stream, err := cli.RegionHeartbeat(context.TODO())
if err != nil {
log.Fatal(err)
}
var peers []*metapb.Peer
for i := 0; i < *replica; i++ {
storeID := s.id + uint64(i)
if storeID > uint64(*storeCount) {
storeID -= uint64(*storeCount)
}
peers = append(peers, &metapb.Peer{Id: uint64(i + 1), StoreId: storeID})
}

count := 1
for r := range startNotifier {
startTime := time.Now()
for regionID := s.id; regionID <= *regionCount+uint64(*storeCount); regionID += uint64(*storeCount) {
updateRegionCount := uint64(float64(*regionCount) * (*regionUpdateRatio) / float64(*storeCount))
storeUpdateRegionMaxID := s.id + updateRegionCount*uint64(*storeCount)
meta := &metapb.Region{
Id: regionID,
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1},
StartKey: newStartKey(regionID),
EndKey: newEndKey(regionID),
}
if regionID < storeUpdateRegionMaxID {
meta.RegionEpoch.Version = uint64(count)
count := 0
for _, region := range rs.regions {
if region.Leader.StoreId != storeID {
continue
}
count += 1
reqStart := time.Now()
err = stream.Send(&pdpb.RegionHeartbeatRequest{
Header: header(),
Region: meta,
Leader: peers[0],
})

err = stream.Send(region)
r.Results() <- report.Result{Start: reqStart, End: time.Now(), Err: err}
if err != nil {
log.Fatal(err)
}
}
log.Printf("store %v finish heartbeat, cost time: %v", s.id, time.Since(startTime))
count++
log.Printf("store %v finish heartbeat, count: %v, cost time: %v", storeID, count, time.Since(startTime))
endNotifier <- struct{}{}
}
}

func (rs *Regions) result(sec float64) string {
if rs.updateRound == 0 {
// There was no difference in the first round
return ""
}

updated := make(map[int]struct{})
for _, i := range rs.updateLeader {
updated[i] = struct{}{}
}
for _, i := range rs.updateEpoch {
updated[i] = struct{}{}
}
for _, i := range rs.updateSpace {
updated[i] = struct{}{}
}
for _, i := range rs.updateFlow {
updated[i] = struct{}{}
}
inactiveCount := *regionCount - len(updated)

ret := "Update speed of each category:\n"
ret += fmt.Sprintf(" Requests/sec: %12.4f\n", float64(*regionCount)/sec)
ret += fmt.Sprintf(" Save-Tree/sec: %12.4f\n", float64(len(rs.updateLeader))/sec)
ret += fmt.Sprintf(" Save-KV/sec: %12.4f\n", float64(len(rs.updateEpoch))/sec)
ret += fmt.Sprintf(" Save-Space/sec: %12.4f\n", float64(len(rs.updateSpace))/sec)
ret += fmt.Sprintf(" Save-Flow/sec: %12.4f\n", float64(len(rs.updateFlow))/sec)
ret += fmt.Sprintf(" Skip/sec: %12.4f\n", float64(inactiveCount)/sec)
return ret
}

func main() {
log.SetFlags(0)
flag.Parse()
Expand All @@ -193,30 +310,35 @@ func main() {
log.Println("finish put stores")
groupStartNotify := make([]chan report.Report, *storeCount+1)
groupEndNotify := make([]chan struct{}, *storeCount+1)
regions := new(Regions)
regions.init()

for i := 1; i <= *storeCount; i++ {
s := Store{id: uint64(i)}
startNotifier := make(chan report.Report)
endNotifier := make(chan struct{})
groupStartNotify[i] = startNotifier
groupEndNotify[i] = endNotifier
go s.Run(startNotifier, endNotifier)
go regions.send(uint64(i), startNotifier, endNotifier)
}

for i := 0; i < *heartbeatRounds; i++ {
log.Printf("\n--------- Bench heartbeat (Round %d) ----------\n", i+1)
report := newReport()
rs := report.Run()
repo := newReport()
rs := repo.Run()
// All stores start heartbeat.
startTime := time.Now()
for storeID := 1; storeID <= *storeCount; storeID++ {
startNotifier := groupStartNotify[storeID]
startNotifier <- report
startNotifier <- repo
}
// All stores finished heartbeat once.
for storeID := 1; storeID <= *storeCount; storeID++ {
<-groupEndNotify[storeID]
}

close(report.Results())
since := time.Since(startTime).Seconds()
close(repo.Results())
log.Println(<-rs)
log.Println(regions.result(since))
regions.update()
}
}