Skip to content

Commit 099aa49

Browse files
zsfelfoldirjl493456442
authored andcommitted
les, les/lespay/client: add service value statistics and API (ethereum#20837)
This PR adds service value measurement statistics to the light client. It also adds a private API that makes these statistics accessible. A follow-up PR will add the new server pool which uses these statistics to select servers with good performance. This document describes the function of the new components: https://gist.github.com/zsfelfoldi/3c7ace895234b7b345ab4f71dab102d4 Co-authored-by: rjl493456442 <garyrong0905@gmail.com> Co-authored-by: rjl493456442 <garyrong0905@gmail.com>
1 parent a89f609 commit 099aa49

17 files changed

+2144
-40
lines changed

internal/web3ext/web3ext.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ var Modules = map[string]string{
3333
"swarmfs": SwarmfsJs,
3434
"txpool": TxpoolJs,
3535
"les": LESJs,
36+
"lespay": LESPayJs,
3637
}
3738

3839
const ChequebookJs = `
@@ -856,3 +857,34 @@ web3._extend({
856857
]
857858
});
858859
`
860+
861+
const LESPayJs = `
862+
web3._extend({
863+
property: 'lespay',
864+
methods:
865+
[
866+
new web3._extend.Method({
867+
name: 'distribution',
868+
call: 'lespay_distribution',
869+
params: 2
870+
}),
871+
new web3._extend.Method({
872+
name: 'timeout',
873+
call: 'lespay_timeout',
874+
params: 2
875+
}),
876+
new web3._extend.Method({
877+
name: 'value',
878+
call: 'lespay_value',
879+
params: 2
880+
}),
881+
],
882+
properties:
883+
[
884+
new web3._extend.Property({
885+
name: 'requestStats',
886+
getter: 'lespay_requestStats'
887+
}),
888+
]
889+
});
890+
`

les/benchmark.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
191191

192192
func (b *benchmarkTxSend) request(peer *serverPeer, index int) error {
193193
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
194-
return peer.sendTxs(0, enc)
194+
return peer.sendTxs(0, 1, enc)
195195
}
196196

197197
// benchmarkTxStatus implements requestBenchmark

les/client.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package les
1919

2020
import (
2121
"fmt"
22+
"time"
2223

2324
"github.com/ethereum/go-ethereum/accounts"
2425
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -37,6 +38,7 @@ import (
3738
"github.com/ethereum/go-ethereum/event"
3839
"github.com/ethereum/go-ethereum/internal/ethapi"
3940
"github.com/ethereum/go-ethereum/les/checkpointoracle"
41+
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
4042
"github.com/ethereum/go-ethereum/light"
4143
"github.com/ethereum/go-ethereum/log"
4244
"github.com/ethereum/go-ethereum/node"
@@ -49,15 +51,16 @@ import (
4951
type LightEthereum struct {
5052
lesCommons
5153

52-
peers *serverPeerSet
53-
reqDist *requestDistributor
54-
retriever *retrieveManager
55-
odr *LesOdr
56-
relay *lesTxRelay
57-
handler *clientHandler
58-
txPool *light.TxPool
59-
blockchain *light.LightChain
60-
serverPool *serverPool
54+
peers *serverPeerSet
55+
reqDist *requestDistributor
56+
retriever *retrieveManager
57+
odr *LesOdr
58+
relay *lesTxRelay
59+
handler *clientHandler
60+
txPool *light.TxPool
61+
blockchain *light.LightChain
62+
serverPool *serverPool
63+
valueTracker *lpc.ValueTracker
6164

6265
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
6366
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
@@ -74,6 +77,10 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
7477
if err != nil {
7578
return nil, err
7679
}
80+
lespayDb, err := ctx.OpenDatabase("lespay", 0, 0, "eth/db/lespay")
81+
if err != nil {
82+
return nil, err
83+
}
7784
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis,
7885
config.OverrideIstanbul, config.OverrideMuirGlacier)
7986
if _, isCompat := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !isCompat {
@@ -99,7 +106,9 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
99106
bloomRequests: make(chan chan *bloombits.Retrieval),
100107
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
101108
serverPool: newServerPool(chainDb, config.UltraLightServers),
109+
valueTracker: lpc.NewValueTracker(lespayDb, &mclock.System{}, requestList, time.Minute, 1/float64(time.Hour), 1/float64(time.Hour*100), 1/float64(time.Hour*1000)),
102110
}
111+
peers.subscribe((*vtSubscription)(leth.valueTracker))
103112
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
104113
leth.relay = newLesTxRelay(peers, leth.retriever)
105114

@@ -154,6 +163,23 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
154163
return leth, nil
155164
}
156165

166+
// vtSubscription implements serverPeerSubscriber
167+
type vtSubscription lpc.ValueTracker
168+
169+
// registerPeer implements serverPeerSubscriber
170+
func (v *vtSubscription) registerPeer(p *serverPeer) {
171+
vt := (*lpc.ValueTracker)(v)
172+
p.setValueTracker(vt, vt.Register(p.ID()))
173+
p.updateVtParams()
174+
}
175+
176+
// unregisterPeer implements serverPeerSubscriber
177+
func (v *vtSubscription) unregisterPeer(p *serverPeer) {
178+
vt := (*lpc.ValueTracker)(v)
179+
vt.Unregister(p.ID())
180+
p.setValueTracker(nil, nil)
181+
}
182+
157183
type LightDummyAPI struct{}
158184

159185
// Etherbase is the address that mining rewards will be send to
@@ -207,6 +233,11 @@ func (s *LightEthereum) APIs() []rpc.API {
207233
Version: "1.0",
208234
Service: NewPrivateLightAPI(&s.lesCommons),
209235
Public: false,
236+
}, {
237+
Namespace: "lespay",
238+
Version: "1.0",
239+
Service: lpc.NewPrivateClientAPI(s.valueTracker),
240+
Public: false,
210241
},
211242
}...)
212243
}
@@ -266,6 +297,7 @@ func (s *LightEthereum) Stop() error {
266297
s.engine.Close()
267298
s.eventMux.Stop()
268299
s.serverPool.stop()
300+
s.valueTracker.Stop()
269301
s.chainDb.Close()
270302
s.wg.Wait()
271303
log.Info("Light ethereum stopped")

les/client_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
180180
return errResp(ErrRequestRejected, "")
181181
}
182182
p.updateFlowControl(update)
183+
p.updateVtParams()
183184

184185
if req.Hash != (common.Hash{}) {
185186
if p.announceType == announceTypeNone {
@@ -205,6 +206,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
205206
return errResp(ErrDecode, "msg %v: %v", msg, err)
206207
}
207208
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
209+
p.answeredRequest(resp.ReqID)
208210
if h.fetcher.requestedID(resp.ReqID) {
209211
h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
210212
} else {
@@ -222,6 +224,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
222224
return errResp(ErrDecode, "msg %v: %v", msg, err)
223225
}
224226
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
227+
p.answeredRequest(resp.ReqID)
225228
deliverMsg = &Msg{
226229
MsgType: MsgBlockBodies,
227230
ReqID: resp.ReqID,
@@ -237,6 +240,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
237240
return errResp(ErrDecode, "msg %v: %v", msg, err)
238241
}
239242
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
243+
p.answeredRequest(resp.ReqID)
240244
deliverMsg = &Msg{
241245
MsgType: MsgCode,
242246
ReqID: resp.ReqID,
@@ -252,6 +256,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
252256
return errResp(ErrDecode, "msg %v: %v", msg, err)
253257
}
254258
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
259+
p.answeredRequest(resp.ReqID)
255260
deliverMsg = &Msg{
256261
MsgType: MsgReceipts,
257262
ReqID: resp.ReqID,
@@ -267,6 +272,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
267272
return errResp(ErrDecode, "msg %v: %v", msg, err)
268273
}
269274
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
275+
p.answeredRequest(resp.ReqID)
270276
deliverMsg = &Msg{
271277
MsgType: MsgProofsV2,
272278
ReqID: resp.ReqID,
@@ -282,6 +288,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
282288
return errResp(ErrDecode, "msg %v: %v", msg, err)
283289
}
284290
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
291+
p.answeredRequest(resp.ReqID)
285292
deliverMsg = &Msg{
286293
MsgType: MsgHelperTrieProofs,
287294
ReqID: resp.ReqID,
@@ -297,6 +304,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
297304
return errResp(ErrDecode, "msg %v: %v", msg, err)
298305
}
299306
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
307+
p.answeredRequest(resp.ReqID)
300308
deliverMsg = &Msg{
301309
MsgType: MsgTxStatus,
302310
ReqID: resp.ReqID,

les/lespay/client/api.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2020 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package client
18+
19+
import (
20+
"time"
21+
22+
"github.com/ethereum/go-ethereum/common/mclock"
23+
"github.com/ethereum/go-ethereum/les/utils"
24+
"github.com/ethereum/go-ethereum/p2p/enode"
25+
)
26+
27+
// PrivateClientAPI implements the lespay client side API
28+
type PrivateClientAPI struct {
29+
vt *ValueTracker
30+
}
31+
32+
// NewPrivateClientAPI creates a PrivateClientAPI
33+
func NewPrivateClientAPI(vt *ValueTracker) *PrivateClientAPI {
34+
return &PrivateClientAPI{vt}
35+
}
36+
37+
// parseNodeStr converts either an enode address or a plain hex node id to enode.ID
38+
func parseNodeStr(nodeStr string) (enode.ID, error) {
39+
if id, err := enode.ParseID(nodeStr); err == nil {
40+
return id, nil
41+
}
42+
if node, err := enode.Parse(enode.ValidSchemes, nodeStr); err == nil {
43+
return node.ID(), nil
44+
} else {
45+
return enode.ID{}, err
46+
}
47+
}
48+
49+
// RequestStats returns the current contents of the reference request basket, with
50+
// request values meaning average per request rather than total.
51+
func (api *PrivateClientAPI) RequestStats() []RequestStatsItem {
52+
return api.vt.RequestStats()
53+
}
54+
55+
// Distribution returns a distribution as a series of (X, Y) chart coordinates,
56+
// where the X axis is the response time in seconds while the Y axis is the amount of
57+
// service value received with a response time close to the X coordinate.
58+
// The distribution is optionally normalized to a sum of 1.
59+
// If nodeStr == "" then the global distribution is returned, otherwise the individual
60+
// distribution of the specified server node.
61+
func (api *PrivateClientAPI) Distribution(nodeStr string, normalized bool) (RtDistribution, error) {
62+
var expFactor utils.ExpirationFactor
63+
if !normalized {
64+
expFactor = utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
65+
}
66+
if nodeStr == "" {
67+
return api.vt.RtStats().Distribution(normalized, expFactor), nil
68+
}
69+
if id, err := parseNodeStr(nodeStr); err == nil {
70+
return api.vt.GetNode(id).RtStats().Distribution(normalized, expFactor), nil
71+
} else {
72+
return RtDistribution{}, err
73+
}
74+
}
75+
76+
// Timeout suggests a timeout value based on either the global distribution or the
77+
// distribution of the specified node. The parameter is the desired rate of timeouts
78+
// assuming a similar distribution in the future.
79+
// Note that the actual timeout should have a sensible minimum bound so that operating
80+
// under ideal working conditions for a long time (for example, using a local server
81+
// with very low response times) will not make it very hard for the system to accommodate
82+
// longer response times in the future.
83+
func (api *PrivateClientAPI) Timeout(nodeStr string, failRate float64) (float64, error) {
84+
if nodeStr == "" {
85+
return float64(api.vt.RtStats().Timeout(failRate)) / float64(time.Second), nil
86+
}
87+
if id, err := parseNodeStr(nodeStr); err == nil {
88+
return float64(api.vt.GetNode(id).RtStats().Timeout(failRate)) / float64(time.Second), nil
89+
} else {
90+
return 0, err
91+
}
92+
}
93+
94+
// Value calculates the total service value provided either globally or by the specified
95+
// server node, using a weight function based on the given timeout.
96+
func (api *PrivateClientAPI) Value(nodeStr string, timeout float64) (float64, error) {
97+
wt := TimeoutWeights(time.Duration(timeout * float64(time.Second)))
98+
expFactor := utils.ExpFactor(api.vt.StatsExpirer().LogOffset(mclock.Now()))
99+
if nodeStr == "" {
100+
return api.vt.RtStats().Value(wt, expFactor), nil
101+
}
102+
if id, err := parseNodeStr(nodeStr); err == nil {
103+
return api.vt.GetNode(id).RtStats().Value(wt, expFactor), nil
104+
} else {
105+
return 0, err
106+
}
107+
}

0 commit comments

Comments
 (0)