Skip to content

Commit

Permalink
p2p/discover: improved node revalidation (ethereum#29572)
Browse files Browse the repository at this point in the history
Node discovery periodically revalidates the nodes in its table by sending PING, checking
if they are still alive. I recently noticed some issues with the implementation of this
process, which can cause strange results such as nodes dropping unexpectedly, certain
nodes not getting revalidated often enough, and bad results being returned to incoming
FINDNODE queries.

In this change, the revalidation process is improved with the following logic:

- We maintain two 'revalidation lists' containing the table nodes, named 'fast' and 'slow'.
- The process chooses random nodes from each list on a randomized interval, the interval being
  faster for the 'fast' list, and performs revalidation for the chosen node.
- Whenever a node is newly inserted into the table, it goes into the 'fast' list.
  Once validation passes, it transfers to the 'slow' list. If a request fails, or the
  node changes endpoint, it transfers back into 'fast'.
- livenessChecks is incremented by one for successful checks. Unlike the old implementation,
  we will not drop the node on the first failing check. We instead quickly decay the
  livenessChecks give it another chance.
- Order of nodes in bucket doesn't matter anymore.

I am also adding a debug API endpoint to dump the node table content.

Co-authored-by: Martin HS <martin@swende.se>
  • Loading branch information
2 people authored and pratikspatil024 committed Jun 11, 2024
1 parent e28c203 commit 1351a54
Show file tree
Hide file tree
Showing 15 changed files with 921 additions and 570 deletions.
57 changes: 57 additions & 0 deletions cmd/devp2p/discv4cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
Expand All @@ -30,9 +31,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)

var (
Expand All @@ -46,6 +49,7 @@ var (
discv4ResolveJSONCommand,
discv4CrawlCommand,
discv4TestCommand,
discv4ListenCommand,
},
}
discv4PingCommand = &cli.Command{
Expand Down Expand Up @@ -76,6 +80,14 @@ var (
Flags: discoveryNodeFlags,
ArgsUsage: "<nodes.json file>",
}
discv4ListenCommand = &cli.Command{
Name: "listen",
Usage: "Runs a discovery node",
Action: discv4Listen,
Flags: flags.Merge(discoveryNodeFlags, []cli.Flag{
httpAddrFlag,
}),
}
discv4CrawlCommand = &cli.Command{
Name: "crawl",
Usage: "Updates a nodes.json file with random nodes found in the DHT",
Expand Down Expand Up @@ -132,6 +144,10 @@ var (
Usage: "Enode of the remote node under test",
EnvVars: []string{"REMOTE_ENODE"},
}
httpAddrFlag = &cli.StringFlag{
Name: "rpc",
Usage: "HTTP server listening address",
}
)

var discoveryNodeFlags = []cli.Flag{
Expand All @@ -158,6 +174,27 @@ func discv4Ping(ctx *cli.Context) error {
return nil
}

func discv4Listen(ctx *cli.Context) error {
disc, _ := startV4(ctx)
defer disc.Close()

fmt.Println(disc.Self())

httpAddr := ctx.String(httpAddrFlag.Name)
if httpAddr == "" {
// Non-HTTP mode.
select {}
}

api := &discv4API{disc}
log.Info("Starting RPC API server", "addr", httpAddr)
srv := rpc.NewServer("", 0, 0)
srv.RegisterName("discv4", api)
http.DefaultServeMux.Handle("/", srv)
httpsrv := http.Server{Addr: httpAddr, Handler: http.DefaultServeMux}
return httpsrv.ListenAndServe()
}

func discv4RequestRecord(ctx *cli.Context) error {
n := getNodeArg(ctx)
disc, _ := startV4(ctx)
Expand Down Expand Up @@ -401,3 +438,23 @@ func parseBootnodes(ctx *cli.Context) ([]*enode.Node, error) {

return nodes, nil
}

type discv4API struct {
host *discover.UDPv4
}

func (api *discv4API) LookupRandom(n int) (ns []*enode.Node) {
it := api.host.RandomNodes()
for len(ns) < n && it.Next() {
ns = append(ns, it.Node())
}
return ns
}

func (api *discv4API) Buckets() [][]discover.BucketNode {
return api.host.TableBuckets()
}

func (api *discv4API) Self() *enode.Node {
return api.host.Self()
}
2 changes: 1 addition & 1 deletion internal/testlog/testlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h *bufHandler) Handle(_ context.Context, r slog.Record) error {
}

func (h *bufHandler) Enabled(_ context.Context, lvl slog.Level) bool {
return lvl <= h.level
return lvl >= h.level
}

func (h *bufHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
Expand Down
17 changes: 17 additions & 0 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
)
Expand All @@ -40,6 +41,9 @@ func (n *Node) apis() []rpc.API {
}, {
Namespace: "debug",
Service: debug.Handler,
}, {
Namespace: "debug",
Service: &p2pDebugAPI{n},
}, {
Namespace: "web3",
Service: &web3API{n},
Expand Down Expand Up @@ -477,3 +481,16 @@ func (api *adminAPI) SetHttpExecutionPoolSize(n int) *ExecutionPoolSize {

return api.GetExecutionPoolSize()
}

// p2pDebugAPI provides access to p2p internals for debugging.
type p2pDebugAPI struct {
stack *Node
}

func (s *p2pDebugAPI) DiscoveryV4Table() [][]discover.BucketNode {
disc := s.stack.server.DiscoveryV4()
if disc != nil {
return disc.TableBuckets()
}
return nil
}
49 changes: 43 additions & 6 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ package discover

import (
"crypto/ecdsa"
crand "crypto/rand"
"encoding/binary"
"math/rand"
"net"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
Expand Down Expand Up @@ -62,7 +66,7 @@ type Config struct {
func (cfg Config) withDefaults() Config {
// Node table configuration:
if cfg.PingInterval == 0 {
cfg.PingInterval = 10 * time.Second
cfg.PingInterval = 3 * time.Second
}
if cfg.RefreshInterval == 0 {
cfg.RefreshInterval = 30 * time.Minute
Expand Down Expand Up @@ -96,10 +100,43 @@ type ReadPacket struct {
Addr *net.UDPAddr
}

func min(x, y int) int {
if x > y {
return y
}
type randomSource interface {
Intn(int) int
Int63n(int64) int64
Shuffle(int, func(int, int))
}

// reseedingRandom is a random number generator that tracks when it was last re-seeded.
type reseedingRandom struct {
mu sync.Mutex
cur *rand.Rand
}

func (r *reseedingRandom) seed() {
var b [8]byte
crand.Read(b[:])
seed := binary.BigEndian.Uint64(b[:])
new := rand.New(rand.NewSource(int64(seed)))

r.mu.Lock()
r.cur = new
r.mu.Unlock()
}

func (r *reseedingRandom) Intn(n int) int {
r.mu.Lock()
defer r.mu.Unlock()
return r.cur.Intn(n)
}

func (r *reseedingRandom) Int63n(n int64) int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.cur.Int63n(n)
}

return x
func (r *reseedingRandom) Shuffle(n int, swap func(i, j int)) {
r.mu.Lock()
defer r.mu.Unlock()
r.cur.Shuffle(n, swap)
}
32 changes: 5 additions & 27 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,35 +148,13 @@ func (it *lookup) slowdown() {
}

func (it *lookup) query(n *node, reply chan<- []*node) {
fails := it.tab.db.FindFails(n.ID(), n.IP())
r, err := it.queryfunc(n)

if errors.Is(err, errClosed) {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if len(r) == 0 {
fails++
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
// Remove the node from the local table if it fails to return anything useful too
// many times, but only if there are enough other nodes in the bucket.
dropped := false
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
dropped = true

it.tab.delete(n)
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.
success := len(r) > 0
it.tab.trackRequest(n, success, r)
if err != nil {
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "err", err)
}

it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
} else if fails > 0 {
// Reset failure counter because it counts _consecutive_ failures.
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
it.tab.addSeenNode(n)
}
reply <- r
}
Expand Down
20 changes: 15 additions & 5 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,22 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
)

type BucketNode struct {
Node *enode.Node `json:"node"`
AddedToTable time.Time `json:"addedToTable"`
AddedToBucket time.Time `json:"addedToBucket"`
Checks int `json:"checks"`
Live bool `json:"live"`
}

// node represents a host on the network.
// The fields of Node may not be modified.
type node struct {
enode.Node
addedAt time.Time // time when the node was added to the table
livenessChecks uint // how often liveness was checked
*enode.Node
addedToTable time.Time // first time node was added to bucket or replacement list
addedToBucket time.Time // time it was added in the actual bucket
livenessChecks uint // how often liveness was checked
isValidatedLive bool // true if existence of node is considered validated right now
}

type encPubkey [64]byte
Expand Down Expand Up @@ -70,7 +80,7 @@ func (e encPubkey) id() enode.ID {
}

func wrapNode(n *enode.Node) *node {
return &node{Node: *n}
return &node{Node: n}
}

func wrapNodes(ns []*enode.Node) []*node {
Expand All @@ -83,7 +93,7 @@ func wrapNodes(ns []*enode.Node) []*node {
}

func unwrapNode(n *node) *enode.Node {
return &n.Node
return n.Node
}

func unwrapNodes(ns []*node) []*enode.Node {
Expand Down
Loading

0 comments on commit 1351a54

Please sign in to comment.