Skip to content

Commit

Permalink
les: handler separation
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Jun 3, 2019
1 parent 962af9f commit e5ff2cd
Show file tree
Hide file tree
Showing 31 changed files with 2,203 additions and 2,481 deletions.
18 changes: 9 additions & 9 deletions les/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,17 @@ type priorityClientInfo struct {
}

// newPriorityClientPool creates a new priority client pool
func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, metricsLogger, eventLogger *csvlogger.Logger) *priorityClientPool {
return &priorityClientPool{
func newPriorityClientPool(freeClientCap uint64, ps *peerSet, child clientPool, eventLogger *csvlogger.Logger, metricsLogger *csvlogger.Logger) *priorityClientPool {
pool := &priorityClientPool{
clients: make(map[enode.ID]priorityClientInfo),
freeClientCap: freeClientCap,
ps: ps,
child: child,
logger: eventLogger,
logTotalPriConn: metricsLogger.NewChannel("totalPriConn", 0),
}
ps.notify(pool)
return pool
}

// registerPeer is called when a new client is connected. If the client has no
Expand Down Expand Up @@ -456,7 +458,7 @@ func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, pas
return nil, ErrUnknownBenchmarkType
}
}
rs := api.server.protocolManager.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
rs := api.server.handler.runBenchmark(benchmarks, passCount, time.Millisecond*time.Duration(length))
result := make([]map[string]interface{}, len(setups))
for i, r := range rs {
res := make(map[string]interface{})
Expand All @@ -476,14 +478,12 @@ func (api *PrivateLightServerAPI) Benchmark(setups []map[string]interface{}, pas
// PrivateLightAPI provides an API to access the LES light server or light client.
type PrivateLightAPI struct {
backend *lesCommons
reg *checkpointRegistrar
}

// NewPrivateLightAPI creates a new LES service API.
func NewPrivateLightAPI(backend *lesCommons, reg *checkpointRegistrar) *PrivateLightAPI {
func NewPrivateLightAPI(backend *lesCommons) *PrivateLightAPI {
return &PrivateLightAPI{
backend: backend,
reg: reg,
}
}

Expand Down Expand Up @@ -513,7 +513,7 @@ func (api *PrivateLightAPI) LatestCheckpoint() ([4]string, error) {
// result[2], 32 bytes hex encoded latest section bloom trie root hash
func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {
var res [3]string
cp := api.backend.getLocalCheckpoint(index)
cp := api.backend.localCheckpoint(index)
if cp.Empty() {
return res, ErrNoCheckpoint
}
Expand All @@ -523,8 +523,8 @@ func (api *PrivateLightAPI) GetCheckpoint(index uint64) ([3]string, error) {

// GetCheckpointContractAddress returns the contract contract address in hex format.
func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) {
if api.reg == nil {
if api.backend.registrar == nil {
return "", ErrNotActivated
}
return api.reg.config.ContractAddr.Hex(), nil
return api.backend.registrar.config.ContractAddr.Hex(), nil
}
2 changes: 1 addition & 1 deletion les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (b *LesApiBackend) CurrentBlock() *types.Block {
}

func (b *LesApiBackend) SetHead(number uint64) {
b.eth.protocolManager.downloader.Cancel()
b.eth.handler.downloader.Cancel()
b.eth.blockchain.SetHead(number)
}

Expand Down
19 changes: 6 additions & 13 deletions les/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,16 @@ func TestCapacityAPI10(t *testing.T) {
// while connected and going back and forth between free and priority mode with
// the supplied API calls is also thoroughly tested.
func testCapacityAPI(t *testing.T, clientCount int) {
// Skip test if no data dir specified
if testServerDataDir == "" {
// Skip test if no data dir specified
return
}

for !testSim(t, 1, clientCount, []string{testServerDataDir}, nil, func(ctx context.Context, net *simulations.Network, servers []*simulations.Node, clients []*simulations.Node) bool {
if len(servers) != 1 {
t.Fatalf("Invalid number of servers: %d", len(servers))
}
server := servers[0]

clientRpcClients := make([]*rpc.Client, len(clients))

serverRpcClient, err := server.Client()
if err != nil {
t.Fatalf("Failed to obtain rpc client: %v", err)
Expand All @@ -101,6 +98,7 @@ func testCapacityAPI(t *testing.T, clientCount int) {
minCap := getMinCap(ctx, t, serverRpcClient)
testCap := totalCap * 3 / 4
fmt.Printf("Server testCap: %d minCap: %d head number: %d head hash: %064x\n", testCap, minCap, headNum, headHash)

reqMinCap := uint64(float64(testCap) * minRelCap / (minRelCap + float64(len(clients)-1)))
if minCap > reqMinCap {
t.Fatalf("Minimum client capacity (%d) bigger than required minimum for this test (%d)", minCap, reqMinCap)
Expand All @@ -109,13 +107,13 @@ func testCapacityAPI(t *testing.T, clientCount int) {
freeIdx := rand.Intn(len(clients))
freeCap := getFreeCap(ctx, t, serverRpcClient)

clientRpcClients := make([]*rpc.Client, len(clients))
for i, client := range clients {
var err error
clientRpcClients[i], err = client.Client()
if err != nil {
t.Fatalf("Failed to obtain rpc client: %v", err)
}

fmt.Println("connecting client", i)
if i != freeIdx {
setCapacity(ctx, t, serverRpcClient, client.ID(), testCap/uint64(len(clients)))
Expand All @@ -142,21 +140,22 @@ func testCapacityAPI(t *testing.T, clientCount int) {

reqCount := make([]uint64, len(clientRpcClients))

// Send light request like crazy.
for i, c := range clientRpcClients {
wg.Add(1)
i, c := i, c
go func() {
defer wg.Done()

queue := make(chan struct{}, 100)
var count uint64
for {
select {
case queue <- struct{}{}:
select {
case <-stop:
wg.Done()
return
case <-ctx.Done():
wg.Done()
return
default:
wg.Add(1)
Expand All @@ -171,10 +170,8 @@ func testCapacityAPI(t *testing.T, clientCount int) {
}()
}
case <-stop:
wg.Done()
return
case <-ctx.Done():
wg.Done()
return
}
}
Expand Down Expand Up @@ -315,12 +312,10 @@ func getHead(ctx context.Context, t *testing.T, client *rpc.Client) (uint64, com
}

func testRequest(ctx context.Context, t *testing.T, client *rpc.Client) bool {
//res := make(map[string]interface{})
var res string
var addr common.Address
rand.Read(addr[:])
c, _ := context.WithTimeout(ctx, time.Second*12)
// if err := client.CallContext(ctx, &res, "eth_getProof", addr, nil, "latest"); err != nil {
err := client.CallContext(c, &res, "eth_getBalance", addr, "latest")
if err != nil {
fmt.Println("request error:", err)
Expand Down Expand Up @@ -417,7 +412,6 @@ func NewNetwork() (*simulations.Network, func(), error) {
adapterTeardown()
net.Shutdown()
}

return net, teardown, nil
}

Expand Down Expand Up @@ -515,7 +509,6 @@ func newLesServerService(ctx *adapters.ServiceContext) (node.Service, error) {
if err != nil {
return nil, err
}

server, err := NewLesServer(ethereum, &config)
if err != nil {
return nil, err
Expand Down
47 changes: 24 additions & 23 deletions les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// requestBenchmark is an interface for different randomized request generators
type requestBenchmark interface {
// init initializes the generator for generating the given number of randomized requests
init(pm *ProtocolManager, count int) error
init(h *serverHandler, count int) error
// request initiates sending a single request to the given peer
request(peer *peer, index int) error
}
Expand All @@ -52,10 +52,10 @@ type benchmarkBlockHeaders struct {
hashes []common.Hash
}

func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {
d := int64(b.amount-1) * int64(b.skip+1)
b.offset = 0
b.randMax = pm.blockchain.CurrentHeader().Number.Int64() + 1 - d
b.randMax = h.blockchain.CurrentHeader().Number.Int64() + 1 - d
if b.randMax < 0 {
return fmt.Errorf("chain is too short")
}
Expand All @@ -65,7 +65,7 @@ func (b *benchmarkBlockHeaders) init(pm *ProtocolManager, count int) error {
if b.byHash {
b.hashes = make([]common.Hash, count)
for i := range b.hashes {
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(b.offset+rand.Int63n(b.randMax)))
}
}
return nil
Expand All @@ -85,11 +85,11 @@ type benchmarkBodiesOrReceipts struct {
hashes []common.Hash
}

func (b *benchmarkBodiesOrReceipts) init(pm *ProtocolManager, count int) error {
randMax := pm.blockchain.CurrentHeader().Number.Int64() + 1
func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {
randMax := h.blockchain.CurrentHeader().Number.Int64() + 1
b.hashes = make([]common.Hash, count)
for i := range b.hashes {
b.hashes[i] = rawdb.ReadCanonicalHash(pm.chainDb, uint64(rand.Int63n(randMax)))
b.hashes[i] = rawdb.ReadCanonicalHash(h.chainDb, uint64(rand.Int63n(randMax)))
}
return nil
}
Expand All @@ -108,8 +108,8 @@ type benchmarkProofsOrCode struct {
headHash common.Hash
}

func (b *benchmarkProofsOrCode) init(pm *ProtocolManager, count int) error {
b.headHash = pm.blockchain.CurrentHeader().Hash()
func (b *benchmarkProofsOrCode) init(h *serverHandler, count int) error {
b.headHash = h.blockchain.CurrentHeader().Hash()
return nil
}

Expand All @@ -130,11 +130,11 @@ type benchmarkHelperTrie struct {
sectionCount, headNum uint64
}

func (b *benchmarkHelperTrie) init(pm *ProtocolManager, count int) error {
func (b *benchmarkHelperTrie) init(h *serverHandler, count int) error {
if b.bloom {
b.sectionCount, b.headNum, _ = pm.server.bloomTrieIndexer.Sections()
b.sectionCount, b.headNum, _ = h.server.bloomTrieIndexer.Sections()
} else {
b.sectionCount, _, _ = pm.server.chtIndexer.Sections()
b.sectionCount, _, _ = h.server.chtIndexer.Sections()
b.headNum = b.sectionCount*params.CHTFrequency - 1
}
if b.sectionCount == 0 {
Expand Down Expand Up @@ -170,7 +170,7 @@ type benchmarkTxSend struct {
txs types.Transactions
}

func (b *benchmarkTxSend) init(pm *ProtocolManager, count int) error {
func (b *benchmarkTxSend) init(h *serverHandler, count int) error {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
signer := types.NewEIP155Signer(big.NewInt(18))
Expand All @@ -196,7 +196,7 @@ func (b *benchmarkTxSend) request(peer *peer, index int) error {
// benchmarkTxStatus implements requestBenchmark
type benchmarkTxStatus struct{}

func (b *benchmarkTxStatus) init(pm *ProtocolManager, count int) error {
func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
return nil
}

Expand All @@ -217,7 +217,7 @@ type benchmarkSetup struct {

// runBenchmark runs a benchmark cycle for all benchmark types in the specified
// number of passes
func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
func (h *serverHandler) runBenchmark(benchmarks []requestBenchmark, passCount int, targetTime time.Duration) []*benchmarkSetup {
setup := make([]*benchmarkSetup, len(benchmarks))
for i, b := range benchmarks {
setup[i] = &benchmarkSetup{req: b}
Expand All @@ -239,7 +239,7 @@ func (pm *ProtocolManager) runBenchmark(benchmarks []requestBenchmark, passCount
if next.totalTime > 0 {
count = int(uint64(next.totalCount) * uint64(targetTime) / uint64(next.totalTime))
}
if err := pm.measure(next, count); err != nil {
if err := h.measure(next, count); err != nil {
next.err = err
}
}
Expand Down Expand Up @@ -275,14 +275,15 @@ func (m *meteredPipe) WriteMsg(msg p2p.Msg) error {

// measure runs a benchmark for a single type in a single pass, with the given
// number of requests
func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
func (h *serverHandler) measure(setup *benchmarkSetup, count int) error {
clientPipe, serverPipe := p2p.MsgPipe()
clientMeteredPipe := &meteredPipe{rw: clientPipe}
serverMeteredPipe := &meteredPipe{rw: serverPipe}
var id enode.ID
rand.Read(id[:])
clientPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
serverPeer := pm.newPeer(lpv2, NetworkId, p2p.NewPeer(id, "server", nil), serverMeteredPipe)

clientPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "client", nil), clientMeteredPipe)
serverPeer := newPeer(lpv2, NetworkId, false, p2p.NewPeer(id, "server", nil), serverMeteredPipe)
serverPeer.sendQueue = newExecQueue(count)
serverPeer.announceType = announceTypeNone
serverPeer.fcCosts = make(requestCostTable)
Expand All @@ -291,10 +292,10 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
serverPeer.fcCosts[code] = c
}
serverPeer.fcParams = flowcontrol.ServerParams{BufLimit: 1, MinRecharge: 1}
serverPeer.fcClient = flowcontrol.NewClientNode(pm.server.fcManager, serverPeer.fcParams)
serverPeer.fcClient = flowcontrol.NewClientNode(h.server.fcManager, serverPeer.fcParams)
defer serverPeer.fcClient.Disconnect()

if err := setup.req.init(pm, count); err != nil {
if err := setup.req.init(h, count); err != nil {
return err
}

Expand All @@ -311,7 +312,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
}()
go func() {
for i := 0; i < count; i++ {
if err := pm.handleMsg(serverPeer); err != nil {
if err := h.handleMsg(serverPeer); err != nil {
errCh <- err
return
}
Expand All @@ -336,7 +337,7 @@ func (pm *ProtocolManager) measure(setup *benchmarkSetup, count int) error {
if err != nil {
return err
}
case <-pm.quitSync:
case <-h.closeCh:
clientPipe.Close()
serverPipe.Close()
return fmt.Errorf("Benchmark cancelled")
Expand Down
3 changes: 2 additions & 1 deletion les/bloombits.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (
func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
defer eth.wg.Done()
for {
select {
case <-eth.shutdownChan:
case <-eth.closeCh:
return

case request := <-eth.bloomRequests:
Expand Down
Loading

0 comments on commit e5ff2cd

Please sign in to comment.