Skip to content

les: fix UDP connection query #22451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions les/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ type LightEthereum struct {
accountManager *accounts.Manager
netRPCService *ethapi.PublicNetAPI

p2pServer *p2p.Server
p2pConfig *p2p.Config
p2pServer *p2p.Server
p2pConfig *p2p.Config
udpEnabled bool
}

// New creates an instance of the light client.
Expand Down Expand Up @@ -113,10 +114,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
p2pServer: stack.Server(),
p2pConfig: &stack.Config().P2P,
udpEnabled: stack.Config().P2P.DiscoveryV5,
}

var prenegQuery vfc.QueryFunc
if leth.p2pServer.DiscV5 != nil {
if leth.udpEnabled {
prenegQuery = leth.prenegQuery
}
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
Expand Down Expand Up @@ -198,7 +200,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {

// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
if s.p2pServer.DiscV5 == nil {
if !s.udpEnabled {
return nil
}
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
Expand All @@ -215,7 +217,7 @@ func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.R
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if s.p2pServer.DiscV5 == nil {
if !s.udpEnabled {
return 0
}
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
Expand Down Expand Up @@ -346,7 +348,11 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
func (s *LightEthereum) Start() error {
log.Warn("Light client mode is an experimental feature")

discovery, err := s.setupDiscovery(s.p2pConfig)
if s.udpEnabled && s.p2pServer.DiscV5 == nil {
s.udpEnabled = false
log.Error("Discovery v5 is not initialized")
}
discovery, err := s.setupDiscovery()
if err != nil {
return err
}
Expand Down
18 changes: 16 additions & 2 deletions les/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type clientPool struct {
clock mclock.Clock
closed bool
removePeer func(enode.ID)
synced func() bool
ns *nodestate.NodeStateMachine
pp *vfs.PriorityPool
bt *vfs.BalanceTracker
Expand Down Expand Up @@ -107,7 +108,7 @@ type clientInfo struct {
}

// newClientPool creates a new client pool
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID)) *clientPool {
func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap uint64, connectedBias time.Duration, clock mclock.Clock, removePeer func(enode.ID), synced func() bool) *clientPool {
pool := &clientPool{
ns: ns,
BalanceTrackerSetup: balanceTrackerSetup,
Expand All @@ -116,6 +117,7 @@ func newClientPool(ns *nodestate.NodeStateMachine, lesDb ethdb.Database, minCap
minCap: minCap,
connectedBias: connectedBias,
removePeer: removePeer,
synced: synced,
}
pool.bt = vfs.NewBalanceTracker(ns, balanceTrackerSetup, lesDb, clock, &utils.Expirer{}, &utils.Expirer{})
pool.pp = vfs.NewPriorityPool(ns, priorityPoolSetup, clock, minCap, connectedBias, 4)
Expand Down Expand Up @@ -396,6 +398,13 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
if !f.synced() {
capacityQueryZeroMeter.Mark(1)
reply, _ := rlp.EncodeToBytes(&result)
return reply
}

node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
Expand All @@ -416,7 +425,6 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
Expand All @@ -434,6 +442,12 @@ func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []by
result[i] = 0
}
}
// add first result to metrics (don't care about priority client multi-queries yet)
if result[0] == 0 {
capacityQueryZeroMeter.Mark(1)
} else {
capacityQueryNonZeroMeter.Mark(1)
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}
24 changes: 12 additions & 12 deletions les/clientpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func testClientPool(t *testing.T, activeLimit, clientCount, paidCount int, rando
disconnFn = func(id enode.ID) {
disconnCh <- int(id[0]) + int(id[1])<<8
}
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn)
pool = newClientPool(testStateMachine(), db, 1, 0, &clock, disconnFn, alwaysTrueFn)
)
pool.ns.Start()

Expand Down Expand Up @@ -239,7 +239,7 @@ func TestConnectPaidClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
Expand All @@ -255,7 +255,7 @@ func TestConnectPaidClientToSmallPool(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand All @@ -274,7 +274,7 @@ func TestConnectPaidClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestPaidClientKickedOut(t *testing.T) {
removeFn := func(id enode.ID) {
kickedCh <- int(id[0])
}
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
pool.bt.SetExpirationTCs(0, 0)
defer pool.stop()
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestConnectFreeClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10))
Expand All @@ -352,7 +352,7 @@ func TestConnectFreeClientToFullPool(t *testing.T) {
db = rawdb.NewMemoryDatabase()
)
removeFn := func(enode.ID) {} // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestFreeClientKickedOut(t *testing.T) {
kicked = make(chan int, 100)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) }
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestPositiveBalanceCalculation(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand All @@ -448,7 +448,7 @@ func TestDowngradePriorityClient(t *testing.T) {
kicked = make(chan int, 10)
)
removeFn := func(id enode.ID) { kicked <- int(id[0]) } // Noop
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, removeFn, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestNegativeBalanceCalculation(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(10, uint64(10)) // Total capacity limit is 10
Expand Down Expand Up @@ -521,7 +521,7 @@ func TestInactiveClient(t *testing.T) {
clock mclock.Simulated
db = rawdb.NewMemoryDatabase()
)
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {})
pool := newClientPool(testStateMachine(), db, 1, defaultConnectedBias, &clock, func(id enode.ID) {}, alwaysTrueFn)
pool.ns.Start()
defer pool.stop()
pool.setLimits(2, uint64(2))
Expand Down
5 changes: 2 additions & 3 deletions les/enr_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package les

import (
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -42,7 +41,7 @@ type ethEntry struct {
func (ethEntry) ENRKey() string { return "eth" }

// setupDiscovery creates the node discovery source for the eth protocol.
func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error) {
func (eth *LightEthereum) setupDiscovery() (enode.Iterator, error) {
it := enode.NewFairMix(0)

// Enable DNS discovery.
Expand All @@ -56,7 +55,7 @@ func (eth *LightEthereum) setupDiscovery(cfg *p2p.Config) (enode.Iterator, error
}

// Enable DHT.
if cfg.DiscoveryV5 && eth.p2pServer.DiscV5 != nil {
if eth.udpEnabled {
it.AddSource(eth.p2pServer.DiscV5.RandomNodes())
}

Expand Down
10 changes: 6 additions & 4 deletions les/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ var (
serverConnectionGauge = metrics.NewRegisteredGauge("les/connection/server", nil)
clientConnectionGauge = metrics.NewRegisteredGauge("les/connection/client", nil)

totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
totalCapacityGauge = metrics.NewRegisteredGauge("les/server/totalCapacity", nil)
totalRechargeGauge = metrics.NewRegisteredGauge("les/server/totalRecharge", nil)
totalConnectedGauge = metrics.NewRegisteredGauge("les/server/totalConnected", nil)
blockProcessingTimer = metrics.NewRegisteredTimer("les/server/blockProcessingTime", nil)
capacityQueryZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryZero", nil)
capacityQueryNonZeroMeter = metrics.NewRegisteredMeter("les/server/capQueryNonZero", nil)

requestServedMeter = metrics.NewRegisteredMeter("les/server/req/avgServedTime", nil)
requestServedTimer = metrics.NewRegisteredTimer("les/server/req/servedTime", nil)
Expand Down
2 changes: 1 addition & 1 deletion les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
srv.maxCapacity = totalRecharge
}
srv.fcManager.SetCapacityLimits(srv.minCapacity, srv.maxCapacity, srv.minCapacity*2)
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient)
srv.clientPool = newClientPool(ns, lesDb, srv.minCapacity, defaultConnectedBias, mclock.System{}, srv.dropClient, issync)
srv.clientPool.setDefaultFactors(vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1}, vfs.PriceFactors{TimeFactor: 0, CapacityFactor: 1, RequestFactor: 1})

checkpoint := srv.latestLocalCheckpoint()
Expand Down
6 changes: 5 additions & 1 deletion les/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
}
server.costTracker, server.minCapacity = newCostTracker(db, server.config)
server.costTracker.testCostList = testCostList(0) // Disable flow control mechanism.
server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {})
server.clientPool = newClientPool(ns, db, testBufRecharge, defaultConnectedBias, clock, func(id enode.ID) {}, alwaysTrueFn)
server.clientPool.setLimits(10000, 10000) // Assign enough capacity for clientpool
server.handler = newServerHandler(server, simulation.Blockchain(), db, txpool, func() bool { return true })
if server.oracle != nil {
Expand All @@ -319,6 +319,10 @@ func newTestServerHandler(blocks int, indexers []*core.ChainIndexer, db ethdb.Da
return server.handler, simulation
}

func alwaysTrueFn() bool {
return true
}

// testPeer is a simulated peer to allow testing direct network calls.
type testPeer struct {
cpeer *clientPeer
Expand Down
Loading