Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ var SensorCmd = &cobra.Command{
defer sub.Unsubscribe()

ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds.
hourlyTicker := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour.
defer ticker.Stop()
defer hourlyTicker.Stop()
defer ticker1h.Stop()

dnsLock := make(chan struct{}, 1)
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -263,7 +264,7 @@ var SensorCmd = &cobra.Command{
go handleRPC(conns, inputSensorParams.NetworkID)

// Run DNS discovery immediately at startup.
go handleDNSDiscovery(&server)
go handleDNSDiscovery(&server, dnsLock)

for {
select {
Expand All @@ -283,8 +284,8 @@ var SensorCmd = &cobra.Command{
if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil {
log.Error().Err(err).Msg("Failed to write nodes to file")
}
case <-hourlyTicker.C:
go handleDNSDiscovery(&server)
case <-ticker1h.C:
go handleDNSDiscovery(&server, dnsLock)
case <-signals:
// This gracefully stops the sensor so that the peers can be written to
// the nodes file.
Expand Down Expand Up @@ -324,31 +325,37 @@ func handlePrometheus() {
}

// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
// the p2p server. It syncs the DNS discovery tree and adds any newly discovered
// peers not already in the peers map.
func handleDNSDiscovery(server *ethp2p.Server) {
// the p2p server. It uses an iterator to discover peers incrementally rather
// than loading all nodes at once. The lock channel prevents concurrent runs.
func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
if len(inputSensorParams.DiscoveryDNS) == 0 {
return
}

select {
case lock <- struct{}{}:
defer func() { <-lock }()
default:
log.Warn().Msg("DNS discovery already running, skipping")
return
}

log.Info().
Str("discovery-dns", inputSensorParams.DiscoveryDNS).
Msg("Starting DNS discovery sync")
Msg("Starting DNS discovery")

client := dnsdisc.NewClient(dnsdisc.Config{})
tree, err := client.SyncTree(inputSensorParams.DiscoveryDNS)
iter, err := client.NewIterator(inputSensorParams.DiscoveryDNS)
if err != nil {
log.Error().Err(err).Msg("Failed to sync DNS discovery tree")
log.Error().Err(err).Msg("Failed to create DNS discovery iterator")
return
}
defer iter.Close()

// Log the number of nodes in the tree.
log.Info().
Int("unique_nodes", len(tree.Nodes())).
Msg("Successfully synced DNS discovery tree")

// Add DNS-discovered peers.
for _, node := range tree.Nodes() {
// Add DNS-discovered peers using the iterator.
count := 0
for iter.Next() {
node := iter.Node()
log.Debug().
Str("enode", node.URLv4()).
Msg("Discovered peer through DNS")
Expand All @@ -357,9 +364,12 @@ func handleDNSDiscovery(server *ethp2p.Server) {
// connect to the peer if it's already connected. If a node is part of the
// static peer set, the server will handle reconnecting after disconnects.
server.AddPeer(node)
count++
}

log.Info().Msg("Finished adding DNS discovery peers")
log.Info().
Int("discovered_peers", count).
Msg("Finished DNS discovery")
}

// getLatestBlock will get the latest block from an RPC provider.
Expand Down
Loading