Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: dial using node iterator #20132

Merged
merged 17 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
p2p/discover: add RandomNodes iterator
  • Loading branch information
fjl committed Oct 5, 2019
commit c1f6136cb435e079b413483b1c730afb73ef15d6
5 changes: 3 additions & 2 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
"github.com/ethereum/go-ethereum/p2p/netutil"
)

// UDPConn is a network connection on which discovery can operate.
type UDPConn interface {
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
Close() error
LocalAddr() net.Addr
}

// Config holds Table-related settings.
// Config holds settings for the discovery listener.
type Config struct {
// These settings are required and configure the UDP listener:
PrivateKey *ecdsa.PrivateKey
Expand All @@ -50,7 +51,7 @@ func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
}

// ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled
// channel if configured.
// channel if configured. This is exported for internal use, do not use this type.
type ReadPacket struct {
Data []byte
Addr *net.UDPAddr
Expand Down
209 changes: 209 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package discover

import (
"context"

"github.com/ethereum/go-ethereum/p2p/enode"
)

// lookup performs a network search for nodes close to the given target. It approaches the
// target by querying nodes that are closer to it on each iteration. The given target does
// not need to be an actual node identifier.
type lookup struct {
tab *Table
queryfunc func(*node) ([]*node, error)
replyCh chan []*node
cancelCh <-chan struct{}
asked, seen map[enode.ID]bool
result nodesByDistance
replyBuffer []*node
queries int
}

type queryFunc func(*node) ([]*node, error)

func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *lookup {
it := &lookup{
tab: tab,
queryfunc: q,
asked: make(map[enode.ID]bool),
seen: make(map[enode.ID]bool),
result: nodesByDistance{target: target},
replyCh: make(chan []*node, alpha),
cancelCh: ctx.Done(),
queries: -1,
}
// Don't query further if we hit ourself.
// Unlikely to happen often in practice.
it.asked[tab.self().ID()] = true
return it
}

// run runs the lookup to completion and returns the closest nodes found.
func (it *lookup) run() []*enode.Node {
for it.next() {
}
return unwrapNodes(it.result.entries)
}

// next advances the lookup until any new nodes have been found.
// It returns false when the lookup has ended.
fjl marked this conversation as resolved.
Show resolved Hide resolved
func (it *lookup) next() bool {
for it.startQueries() {
select {
case nodes := <-it.replyCh:
it.replyBuffer = it.replyBuffer[:0]
for _, n := range nodes {
if n != nil && !it.seen[n.ID()] {
it.seen[n.ID()] = true
it.result.push(n, bucketSize)
it.replyBuffer = append(it.replyBuffer, n)
}
}
it.queries--
if len(it.replyBuffer) > 0 {
return true
}
case <-it.cancelCh:
it.shutdown()
}
}
return false
}

func (it *lookup) shutdown() {
for it.queries > 0 {
<-it.replyCh
it.queries--
}
it.queryfunc = nil
it.replyBuffer = nil
}

func (it *lookup) startQueries() bool {
if it.queryfunc == nil {
return false
}

// The first query returns nodes from the local table.
if it.queries == -1 {
it.tab.mutex.Lock()
closest := it.tab.closest(it.result.target, bucketSize, false)
karalabe marked this conversation as resolved.
Show resolved Hide resolved
it.tab.mutex.Unlock()
it.queries = 1
it.replyCh <- closest.entries
return true
}

// Ask the closest nodes that we haven't asked yet.
for i := 0; i < len(it.result.entries) && it.queries < alpha; i++ {
n := it.result.entries[i]
if !it.asked[n.ID()] {
it.asked[n.ID()] = true
it.queries++
go it.query(n, it.replyCh)
}
}
// The lookup ends when no more nodes can be asked.
return it.queries > 0
}

func (it *lookup) query(n *node, reply chan<- []*node) {
fails := it.tab.db.FindFails(n.ID(), n.IP())
r, err := it.queryfunc(n)
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if len(r) == 0 {
fails++
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
it.tab.log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
it.tab.log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
it.tab.delete(n)
}
} else if fails > 0 {
// Reset failure counter because it counts _consecutive_ failures.
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
it.tab.addSeenNode(n)
}
reply <- r
}

// lookupIterator performs lookup operations and iterates over all seen nodes.
// When a lookup finishes, a new one is created through nextLookup.
type lookupIterator struct {
buffer []*node
nextLookup lookupFunc
ctx context.Context
cancel func()
lookup *lookup
}

type lookupFunc func(ctx context.Context) *lookup

func newLookupIterator(ctx context.Context, next lookupFunc) *lookupIterator {
ctx, cancel := context.WithCancel(ctx)
return &lookupIterator{ctx: ctx, cancel: cancel, nextLookup: next}
}

// Node returns the current node.
func (it *lookupIterator) Node() *enode.Node {
if len(it.buffer) == 0 {
return nil
}
return unwrapNode(it.buffer[0])
}

// Next moves to the next node.
func (it *lookupIterator) Next() bool {
// Consume next node in lastReply.
fjl marked this conversation as resolved.
Show resolved Hide resolved
if len(it.buffer) > 0 {
it.buffer = it.buffer[1:]
}
// Advance the lookup to refill lastReply.
for len(it.buffer) == 0 {
if it.ctx.Err() != nil {
it.lookup = nil
it.buffer = nil
return false
}
if it.lookup == nil {
it.lookup = it.nextLookup(it.ctx)
continue
}
if !it.lookup.next() {
it.lookup = nil
continue
}
it.buffer = it.lookup.replyBuffer
}
return true
}

// Close ends the iterator.
func (it *lookupIterator) Close() {
it.cancel()
}
25 changes: 25 additions & 0 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package discover

import (
"bytes"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"net"
"reflect"
"sort"
"sync"

Expand Down Expand Up @@ -169,6 +172,28 @@ func hasDuplicates(slice []*node) bool {
return false
}

func checkNodesEqual(got, want []*enode.Node) error {
if reflect.DeepEqual(got, want) {
return nil
}
output := new(bytes.Buffer)
fmt.Fprintf(output, "got %d nodes:\n", len(got))
for _, n := range got {
fmt.Fprintf(output, " %v %v\n", n.ID(), n)
}
fmt.Fprintf(output, "want %d:\n", len(want))
for _, n := range want {
fmt.Fprintf(output, " %v %v\n", n.ID(), n)
}
return errors.New(output.String())
}

func sortByID(nodes []*enode.Node) {
sort.Slice(nodes, func(i, j int) bool {
return string(nodes[i].ID().Bytes()) < string(nodes[j].ID().Bytes())
})
}

func sortedByDistanceTo(distbase enode.ID, slice []*node) bool {
return sort.SliceIsSorted(slice, func(i, j int) bool {
return enode.DistCmp(distbase, slice[i].ID(), slice[j].ID()) < 0
Expand Down
Loading