Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 17 additions & 74 deletions pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package hive

import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -41,8 +40,7 @@ const (
peersStreamName = "peers"
messageTimeout = 1 * time.Minute // maximum allowed time for a message to be read or written.
maxBatchSize = 30
pingTimeout = time.Second * 15 // time to wait for ping to succeed
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
batchValidationTimeout = 5 * time.Minute // prevent lock contention on peer validation
)

var (
Expand All @@ -53,7 +51,7 @@ var (
)

type Service struct {
streamer p2p.StreamerPinger
streamer p2p.Streamer
addressBook addressbook.GetPutter
addPeersHandler func(...swarm.Address)
networkID uint64
Expand All @@ -69,7 +67,7 @@ type Service struct {
allowPrivateCIDRs bool
}

func New(streamer p2p.StreamerPinger, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, logger log.Logger) *Service {
func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, bootnode bool, allowPrivateCIDRs bool, logger log.Logger) *Service {
svc := &Service{
streamer: streamer,
logger: logger.WithName(loggerName).Register(),
Expand Down Expand Up @@ -274,72 +272,6 @@ func (s *Service) startCheckPeersHandler() {

func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
var peersToAdd []swarm.Address
mtx := sync.Mutex{}
wg := sync.WaitGroup{}

addPeer := func(newPeer *pb.BzzAddress, multiUnderlay []ma.Multiaddr) {
err := s.sem.Acquire(ctx, 1)
if err != nil {
return
}

wg.Add(1)
go func() {
s.metrics.PeerConnectAttempts.Inc()

defer func() {
s.sem.Release(1)
wg.Done()
}()

ctx, cancel := context.WithTimeout(ctx, pingTimeout)
defer cancel()

var (
pingSuccessful bool
start time.Time
)
for _, underlay := range multiUnderlay {
// ping each underlay address, pick first available
start = time.Now()
if _, err := s.streamer.Ping(ctx, underlay); err != nil {
s.logger.Debug("unreachable peer underlay", "peer_address", hex.EncodeToString(newPeer.Overlay), "underlay", underlay, "error", err)
continue
}
pingSuccessful = true
s.logger.Debug("reachable peer underlay", "peer_address", hex.EncodeToString(newPeer.Overlay), "underlay", underlay)
break
}

if !pingSuccessful {
// none of underlay addresses is available
s.metrics.PingFailureTime.Observe(time.Since(start).Seconds())
s.metrics.UnreachablePeers.Inc()
return
}

s.metrics.PingTime.Observe(time.Since(start).Seconds())
s.metrics.ReachablePeers.Inc()

bzzAddress := bzz.Address{
Overlay: swarm.NewAddress(newPeer.Overlay),
Underlays: multiUnderlay,
Signature: newPeer.Signature,
Nonce: newPeer.Nonce,
}

err := s.addressBook.Put(bzzAddress.Overlay, bzzAddress)
if err != nil {
s.metrics.StorePeerErr.Inc()
s.logger.Warning("skipping peer in response", "peer_address", newPeer.String(), "error", err)
return
}

mtx.Lock()
peersToAdd = append(peersToAdd, bzzAddress.Overlay)
mtx.Unlock()
}()
}

for _, p := range peers.Peers {
multiUnderlays, err := bzz.DeserializeUnderlays(p.Underlay)
Expand All @@ -361,10 +293,21 @@ func (s *Service) checkAndAddPeers(ctx context.Context, peers pb.Peers) {
continue
}

// add peer does not exist in the addressbook
addPeer(p, multiUnderlays)
bzzAddress := bzz.Address{
Overlay: swarm.NewAddress(p.Overlay),
Underlays: multiUnderlays,
Signature: p.Signature,
Nonce: p.Nonce,
}

if err := s.addressBook.Put(bzzAddress.Overlay, bzzAddress); err != nil {
s.metrics.StorePeerErr.Inc()
s.logger.Warning("skipping peer in response", "peer_address", p.String(), "error", err)
return
}

peersToAdd = append(peersToAdd, bzzAddress.Overlay)
}
wg.Wait()

if s.addPeersHandler != nil && len(peersToAdd) > 0 {
s.addPeersHandler(peersToAdd...)
Expand Down
30 changes: 2 additions & 28 deletions pkg/hive/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"strconv"
"testing"
Expand Down Expand Up @@ -48,7 +47,7 @@ func TestHandlerRateLimit(t *testing.T) {

addressbookclean := ab.New(mock.NewStateStore())

// new recorder for handling Ping
// new recorder
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, false, true, logger)
Expand Down Expand Up @@ -192,7 +191,6 @@ func TestBroadcastPeers(t *testing.T) {
wantOverlays []swarm.Address
wantBzzAddresses []bzz.Address
allowPrivateCIDRs bool
pingErr func(addr ma.Multiaddr) (time.Duration, error)
}{
"OK - single record": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
Expand Down Expand Up @@ -234,24 +232,6 @@ func TestBroadcastPeers(t *testing.T) {
wantBzzAddresses: bzzAddresses[:2*hive.MaxBatchSize],
allowPrivateCIDRs: true,
},
"OK - single batch - skip ping failures": {
addresee: swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c"),
peers: overlays[:15],
wantMsgs: []pb.Peers{{Peers: wantMsgs[0].Peers[:15]}},
wantOverlays: overlays[:10],
wantBzzAddresses: bzzAddresses[:10],
allowPrivateCIDRs: true,
pingErr: func(addr ma.Multiaddr) (rtt time.Duration, err error) {
for _, v := range bzzAddresses[10:15] {
for _, underlay := range v.Underlays {
if underlay.Equal(addr) {
return rtt, errors.New("ping failure")
}
}
}
return rtt, nil
},
},
"Ok - don't advertise private CIDRs only": {
addresee: overlays[len(overlays)-1],
peers: overlays[:15],
Expand Down Expand Up @@ -291,13 +271,7 @@ func TestBroadcastPeers(t *testing.T) {

addressbookclean := ab.New(mock.NewStateStore())

// new recorder for handling Ping
var streamer *streamtest.Recorder
if tc.pingErr != nil {
streamer = streamtest.New(streamtest.WithPingErr(tc.pingErr))
} else {
streamer = streamtest.New()
}
streamer := streamtest.New()
// create a hive server that handles the incoming stream
server := hive.New(streamer, addressbookclean, networkID, false, true, logger)
testutil.CleanupCloser(t, server)
Expand Down
28 changes: 28 additions & 0 deletions pkg/p2p/libp2p/libp2ptest/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package libp2ptest

import (
"bytes"
"sync"
)

// SafeBuffer is a thread-safe bytes.Buffer.
type SafeBuffer struct {
b bytes.Buffer
m sync.Mutex
}

func (s *SafeBuffer) Write(p []byte) (n int, err error) {
s.m.Lock()
defer s.m.Unlock()
return s.b.Write(p)
}

func (s *SafeBuffer) String() string {
s.m.Lock()
defer s.m.Unlock()
return s.b.String()
}
69 changes: 69 additions & 0 deletions pkg/p2p/libp2p/libp2ptest/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package libp2ptest

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethersphere/bee/v2/pkg/addressbook"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p/libp2p"
"github.com/ethersphere/bee/v2/pkg/statestore/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology/lightnode"
"github.com/ethersphere/bee/v2/pkg/util/testutil"
)

// NewLibp2pService creates a new libp2p service for testing purposes.
func NewLibp2pService(t *testing.T, networkID uint64, logger log.Logger) (*libp2p.Service, swarm.Address) {
t.Helper()

swarmKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}

nonce := common.HexToHash("0x1").Bytes()

overlay, err := crypto.NewOverlayAddress(swarmKey.PublicKey, networkID, nonce)
if err != nil {
t.Fatal(err)
}

addr := ":0"

statestore := mock.NewStateStore()
ab := addressbook.New(statestore)

libp2pKey, err := crypto.GenerateSecp256r1Key()
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

lightNodes := lightnode.NewContainer(overlay)

opts := libp2p.Options{
PrivateKey: libp2pKey,
Nonce: nonce,
FullNode: true,
NATAddr: "127.0.0.1:0", // Disable default NAT manager
}

s, err := libp2p.New(ctx, crypto.NewDefaultSigner(swarmKey), networkID, overlay, addr, ab, statestore, lightNodes, logger, nil, opts)
if err != nil {
t.Fatal(err)
}
testutil.CleanupCloser(t, s)

_ = s.Ready()

return s, overlay
}
12 changes: 4 additions & 8 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ func (p *Puller) manage(ctx context.Context) {
// disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map.
// Must be called under lock.
func (p *Puller) disconnectPeer(addr swarm.Address) {
loggerV2 := p.logger.V(2).Register()

loggerV2.Debug("disconnecting peer", "peer_address", addr)
p.logger.Debug("disconnecting peer", "peer_address", addr)
if peer, ok := p.syncPeers[addr.ByteString()]; ok {
peer.mtx.Lock()
peer.stop()
Expand Down Expand Up @@ -300,8 +298,6 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
// syncPeerBin will start historical and live syncing for the peer for a particular bin.
// Must be called under syncPeer lock.
func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint8, cursor uint64) {
loggerV2 := p.logger.V(2).Register()

ctx, cancel := context.WithCancel(parentCtx)
peer.setBinCancel(cancel, bin)

Expand Down Expand Up @@ -331,7 +327,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint

select {
case <-ctx.Done():
loggerV2.Debug("syncWorker context cancelled", "peer_address", address, "bin", bin)
p.logger.Debug("syncWorker context cancelled", "peer_address", address, "bin", bin)
return
default:
}
Expand All @@ -353,7 +349,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
p.logger.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
}

_ = p.limiter.WaitN(ctx, count)
Expand All @@ -372,7 +368,7 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
p.logger.Error(err, "syncWorker could not persist interval for peer, quitting", "peer_address", address)
return
}
loggerV2.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "isHistorical", isHistorical, "duration", time.Since(syncStart), "peer_address", address)
p.logger.Debug("syncWorker pulled", "bin", bin, "start", start, "topmost", top, "isHistorical", isHistorical, "duration", time.Since(syncStart), "peer_address", address)
start = top + 1
}
}
Expand Down
Loading
Loading