Skip to content

Commit

Permalink
statistics: replace loads with []float64 in peer info (tikv#3729)
Browse files Browse the repository at this point in the history
* statistics: replace loads with []float64 in peer info

Signed-off-by: lhy1024 <admin@liudos.us>

* remove unused code

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* remove unused code

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* misc

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Jun 2, 2021
1 parent 7792ece commit 037d673
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 96 deletions.
21 changes: 3 additions & 18 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,12 +755,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand All @@ -777,12 +772,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckWritePeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand All @@ -799,12 +789,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
peer := region.GetLeader()
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand Down
14 changes: 8 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
zap.Uint64("store-id", storeID))
continue
}
peerInfo := core.NewPeerInfo(peer, 0, 0,
peerStat.GetReadBytes(), peerStat.GetReadKeys(), interval)
loads := []float64{
statistics.RegionReadBytes: float64(peerStat.GetReadBytes()),
statistics.RegionReadKeys: float64(peerStat.GetReadKeys()),
statistics.RegionWriteBytes: 0,
statistics.RegionWriteKeys: 0,
}
peerInfo := core.NewPeerInfo(peer, loads, interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
c.hotStat.CheckReadAsync(item)
}
Expand Down Expand Up @@ -588,10 +593,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(), region.GetKeysWritten(),
0, 0,
interval)
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
c.hotStat.CheckWriteAsync(item)
}
Expand Down
8 changes: 0 additions & 8 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,3 @@ func StringToKeyType(input string) KeyType {
panic("invalid key type: " + input)
}
}

// FlowStat indicates the stats of the flow
type FlowStat interface {
GetKeysWritten() uint64
GetBytesWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
}
45 changes: 11 additions & 34 deletions server/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,52 +76,29 @@ func CountInJointState(peers ...*metapb.Peer) int {
// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
writtenBytes uint64
writtenKeys uint64
readBytes uint64
readKeys uint64
interval uint64
loads []float64
interval uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer,
writtenBytes, writtenKeys, readBytes, readKeys uint64,
interval uint64) *PeerInfo {
func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
writtenBytes: writtenBytes,
writtenKeys: writtenKeys,
readBytes: readBytes,
readKeys: readKeys,
interval: interval,
Peer: meta,
loads: loads,
interval: interval,
}
}

// GetKeysWritten provides peer written keys
func (p *PeerInfo) GetKeysWritten() uint64 {
return p.writtenKeys
}

// GetBytesWritten provides peer written bytes
func (p *PeerInfo) GetBytesWritten() uint64 {
return p.writtenBytes
}

// GetBytesRead provides peer read bytes
func (p *PeerInfo) GetBytesRead() uint64 {
return p.readBytes
}

// GetKeysRead provides read keys
func (p *PeerInfo) GetKeysRead() uint64 {
return p.readKeys
}

// GetStoreID provides located storeID
func (p *PeerInfo) GetStoreID() uint64 {
return p.GetStoreId()
}

// GetLoads provides loads
func (p *PeerInfo) GetLoads() []float64 {
return p.loads
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
Expand Down
20 changes: 20 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,26 @@ func (r *RegionsInfo) GetFollower(storeID uint64, region *RegionInfo) *RegionInf
return nil
}

// GetLoads returns loads from region
func (r *RegionInfo) GetLoads() []float64 {
return []float64{
float64(r.GetBytesRead()),
float64(r.GetKeysRead()),
float64(r.GetBytesWritten()),
float64(r.GetKeysWritten()),
}
}

// GetWriteLoads returns write loads from region
func (r *RegionInfo) GetWriteLoads() []float64 {
return []float64{
0,
0,
float64(r.GetBytesWritten()),
float64(r.GetKeysWritten()),
}
}

// ScanRange scans regions intersecting [start key, end key), returns at most
// `limit` regions. limit <= 0 means no limit.
func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo {
Expand Down
19 changes: 1 addition & 18 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
return nil
}
storeID := peer.GetStoreID()
deltaLoads := getFlowDeltaLoads(peer)
deltaLoads := peer.GetLoads()
f.collectPeerMetrics(deltaLoads, interval)
loads := make([]float64, len(deltaLoads))
for i := range deltaLoads {
Expand Down Expand Up @@ -462,23 +462,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f
return newItem
}

func getFlowDeltaLoads(stat core.FlowStat) []float64 {
ret := make([]float64, RegionStatCount)
for k := RegionStatKind(0); k < RegionStatCount; k++ {
switch k {
case RegionReadBytes:
ret[k] = float64(stat.GetBytesRead())
case RegionReadKeys:
ret[k] = float64(stat.GetKeysRead())
case RegionWriteBytes:
ret[k] = float64(stat.GetBytesWritten())
case RegionWriteKeys:
ret[k] = float64(stat.GetKeysWritten())
}
}
return ret
}

func (f *hotPeerCache) putInheritItem(item *HotPeerStat) {
f.inheritItem[item.RegionID] = item
}
Expand Down
14 changes: 2 additions & 12 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,7 @@ func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect i
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
res = append(res, cache.CollectExpiredItems(region)...)
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := cache.CheckPeerFlow(peerInfo, region)
if item != nil {
res = append(res, item)
Expand Down Expand Up @@ -345,12 +340,7 @@ func BenchmarkCheckRegionFlow(b *testing.B) {
core.SetReadKeys(300000*10))
peerInfos := make([]*core.PeerInfo, 0)
for _, peer := range newRegion.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
10)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10)
peerInfos = append(peerInfos, peerInfo)
}
b.ResetTimer()
Expand Down
46 changes: 46 additions & 0 deletions server/statistics/kind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testRegionInfoSuite{})

type testRegionInfoSuite struct{}

func (s *testRegionInfoSuite) TestGetLoads(c *C) {
regionA := core.NewRegionInfo(&metapb.Region{Id: 100, Peers: []*metapb.Peer{}}, nil,
core.SetReadBytes(1),
core.SetReadKeys(2),
core.SetWrittenBytes(3),
core.SetWrittenKeys(4))
loads := regionA.GetLoads()
c.Assert(loads, HasLen, int(RegionStatCount))
c.Assert(float64(regionA.GetBytesRead()), Equals, loads[RegionReadBytes])
c.Assert(float64(regionA.GetKeysRead()), Equals, loads[RegionReadKeys])
c.Assert(float64(regionA.GetBytesWritten()), Equals, loads[RegionWriteBytes])
c.Assert(float64(regionA.GetKeysWritten()), Equals, loads[RegionWriteKeys])

loads = regionA.GetWriteLoads()
c.Assert(loads, HasLen, int(RegionStatCount))
c.Assert(0.0, Equals, loads[RegionReadBytes])
c.Assert(0.0, Equals, loads[RegionReadKeys])
c.Assert(float64(regionA.GetBytesWritten()), Equals, loads[RegionWriteBytes])
c.Assert(float64(regionA.GetKeysWritten()), Equals, loads[RegionWriteKeys])

}

0 comments on commit 037d673

Please sign in to comment.