Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/msgrate: return capacity as integer #22943

Merged
merged 6 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package downloader

import (
"errors"
"math"
"math/big"
"sort"
"sync"
Expand Down Expand Up @@ -232,7 +231,7 @@ func (p *peerConnection) SetNodeDataIdle(delivered int, deliveryTime time.Time)
// HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput.
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)))
cap := p.rates.Capacity(eth.BlockHeadersMsg, targetRTT)
if cap > MaxHeaderFetch {
cap = MaxHeaderFetch
}
Expand All @@ -242,7 +241,7 @@ func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
// BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput.
func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)))
cap := p.rates.Capacity(eth.BlockBodiesMsg, targetRTT)
if cap > MaxBlockFetch {
cap = MaxBlockFetch
}
Expand All @@ -252,7 +251,7 @@ func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
// ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput.
func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.ReceiptsMsg, targetRTT)))
cap := p.rates.Capacity(eth.ReceiptsMsg, targetRTT)
if cap > MaxReceiptFetch {
cap = MaxReceiptFetch
}
Expand All @@ -262,7 +261,7 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
// NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput.
func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
cap := int(math.Ceil(p.rates.Capacity(eth.NodeDataMsg, targetRTT)))
cap := p.rates.Capacity(eth.NodeDataMsg, targetRTT)
if cap > MaxStateFetch {
cap = MaxStateFetch
}
Expand Down Expand Up @@ -411,7 +410,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockHeadersMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
Expand All @@ -423,7 +422,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.BlockBodiesMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
Expand All @@ -435,7 +434,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.ReceiptsMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
Expand All @@ -447,53 +446,56 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.stateIdle) == 0
}
throughput := func(p *peerConnection) float64 {
throughput := func(p *peerConnection) int {
return p.rates.Capacity(eth.NodeDataMsg, time.Second)
}
return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput)
}

// idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their measure throughput.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
// The resulting set of peers are sorted by their capacity.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol uint, idleCheck func(*peerConnection) bool, capacity func(*peerConnection) int) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()

idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
tps := make([]float64, 0, len(ps.peers))
var (
total = 0
idle = make([]*peerConnection, 0, len(ps.peers))
tps = make([]int, 0, len(ps.peers))
)
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
idle = append(idle, p)
tps = append(tps, throughput(p))
tps = append(tps, capacity(p))
}
total++
}
}

// And sort them
sortPeers := &peerThroughputSort{idle, tps}
sortPeers := &peerCapacitySort{idle, tps}
sort.Sort(sortPeers)
return sortPeers.p, total
}

// peerThroughputSort implements the Sort interface, and allows for
// sorting a set of peers by their throughput
// The sorted data is with the _highest_ throughput first
type peerThroughputSort struct {
// peerCapacitySort implements sort.Interface.
// It sorts peer connections by capacity (descending).
type peerCapacitySort struct {
p []*peerConnection
tp []float64
tp []int
}

func (ps *peerThroughputSort) Len() int {
func (ps *peerCapacitySort) Len() int {
return len(ps.p)
}

func (ps *peerThroughputSort) Less(i, j int) bool {
func (ps *peerCapacitySort) Less(i, j int) bool {
return ps.tp[i] > ps.tp[j]
}

func (ps *peerThroughputSort) Swap(i, j int) {
func (ps *peerCapacitySort) Swap(i, j int) {
ps.p[i], ps.p[j] = ps.p[j], ps.p[i]
ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i]
}
30 changes: 15 additions & 15 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.accountIdlers)),
caps: make([]float64, 0, len(s.accountIdlers)),
caps: make([]int, 0, len(s.accountIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.accountIdlers {
Expand Down Expand Up @@ -958,7 +958,7 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeIdlers)),
caps: make([]float64, 0, len(s.bytecodeIdlers)),
caps: make([]int, 0, len(s.bytecodeIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeIdlers {
Expand Down Expand Up @@ -1012,11 +1012,11 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, int(cap))
hashes := make([]common.Hash, 0, cap)
for hash := range task.codeTasks {
delete(task.codeTasks, hash)
hashes = append(hashes, hash)
if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
Expand Down Expand Up @@ -1061,7 +1061,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.storageIdlers)),
caps: make([]float64, 0, len(s.storageIdlers)),
caps: make([]int, 0, len(s.storageIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.storageIdlers {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
cap = minRequestSize
}
storageSets := int(cap / 1024)
storageSets := cap / 1024

var (
accounts = make([]common.Hash, 0, storageSets)
Expand Down Expand Up @@ -1217,7 +1217,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.trienodeHealIdlers)),
caps: make([]float64, 0, len(s.trienodeHealIdlers)),
caps: make([]int, 0, len(s.trienodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.trienodeHealIdlers {
Expand Down Expand Up @@ -1284,9 +1284,9 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
cap = maxTrieRequestCount
}
var (
hashes = make([]common.Hash, 0, int(cap))
paths = make([]trie.SyncPath, 0, int(cap))
pathsets = make([]TrieNodePathSet, 0, int(cap))
hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap)
pathsets = make([]TrieNodePathSet, 0, cap)
)
for hash, pathset := range s.healer.trieTasks {
delete(s.healer.trieTasks, hash)
Expand All @@ -1295,7 +1295,7 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
paths = append(paths, pathset)
pathsets = append(pathsets, [][]byte(pathset)) // TODO(karalabe): group requests by account hash

if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
Expand Down Expand Up @@ -1341,7 +1341,7 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
// Sort the peers by download capacity to use faster ones if many available
idlers := &capacitySort{
ids: make([]string, 0, len(s.bytecodeHealIdlers)),
caps: make([]float64, 0, len(s.bytecodeHealIdlers)),
caps: make([]int, 0, len(s.bytecodeHealIdlers)),
}
targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeHealIdlers {
Expand Down Expand Up @@ -1407,12 +1407,12 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
if cap > maxCodeRequestCount {
cap = maxCodeRequestCount
}
hashes := make([]common.Hash, 0, int(cap))
hashes := make([]common.Hash, 0, cap)
for hash := range s.healer.codeTasks {
delete(s.healer.codeTasks, hash)

hashes = append(hashes, hash)
if len(hashes) >= int(cap) {
if len(hashes) >= cap {
break
}
}
Expand Down Expand Up @@ -2852,7 +2852,7 @@ func estimateRemainingSlots(hashes int, last common.Hash) (uint64, error) {
// of highest capacity being at the front.
type capacitySort struct {
ids []string
caps []float64
caps []int
}

func (s *capacitySort) Len() int {
Expand Down
14 changes: 11 additions & 3 deletions p2p/msgrate/msgrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package msgrate

import (
"errors"
"math"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -162,7 +163,7 @@ func NewTracker(caps map[uint64]float64, rtt time.Duration) *Tracker {
// the load proportionally to the requested items, so fetching a bit more might
// still take the same RTT. By forcefully overshooting by a small amount, we can
// avoid locking into a lower-that-real capacity.
func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 {
func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) int {
t.lock.RLock()
defer t.lock.RUnlock()

Expand All @@ -171,7 +172,14 @@ func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) float64 {

// Return an overestimation to force the peer out of a stuck minima, adding
// +1 in case the item count is too low for the overestimator to dent
return 1 + capacityOverestimation*throughput
return roundCapacity(1 + capacityOverestimation*throughput)
}

// roundCapacity gives the integer value of a capacity.
// The result fits int32, and is guaranteed to be positive.
func roundCapacity(cap float64) int {
const maxInt32 = float64(1<<31 - 1)
return int(math.Min(maxInt32, math.Max(1, math.Ceil(cap))))
}

// Update modifies the peer's capacity values for a specific data type with a new
Expand Down Expand Up @@ -435,7 +443,7 @@ func (t *Trackers) detune() {

// Capacity is a helper function to access a specific tracker without having to
// track it explicitly outside.
func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) float64 {
func (t *Trackers) Capacity(id string, kind uint64, targetRTT time.Duration) int {
t.lock.RLock()
defer t.lock.RUnlock()

Expand Down
28 changes: 28 additions & 0 deletions p2p/msgrate/msgrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package msgrate

import "testing"

func TestCapacityOverflow(t *testing.T) {
tracker := NewTracker(nil, 1)
tracker.Update(1, 1, 100000)
cap := tracker.Capacity(1, 10000000)
if int32(cap) < 0 {
t.Fatalf("Negative: %v", int32(cap))
}
}