Skip to content

Commit

Permalink
swarm/network: hive bug: needed shallow peers are not sent to nodes b…
Browse files Browse the repository at this point in the history
…eyond connection's proximity order (#19326)

* swarm/network: fix hive bug not sending shallow peers

-  hive bug: needed shallow peers were not sent to nodes beyond connection's proximity order
- add extensive protocol exchange tests for initial subPeersMsg-peersMsg exchange
- modify bzzProtocolTester to allow pregenerated overlay addresses

* swarm/network: attempt to fix hive persistance test

* swarm/network: fix TestHiveStatePersistance (#1320)

* swarm/network: remove trace lines from the hive persistance test

* address PR review comments

* swarm/network: address PR comments on TestInitialPeersMsg

 * eliminate *testing.T argument from bzz/hive protocoltesters
 * add sorting (only runs in test code) on peersMsg payload
 * add random (0 to MaxPeersPerPO) peers for each po
 * add extra peers closer to pivot than control
  • Loading branch information
zelig authored Apr 2, 2019
1 parent 92faf1b commit 0529015
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 64 deletions.
27 changes: 22 additions & 5 deletions swarm/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

// discovery bzz extension for requesting and relaying node address records

var sortPeers = noSortPeers

// Peer wraps BzzPeer and embeds Kademlia overlay connectivity driver
type Peer struct {
*BzzPeer
Expand Down Expand Up @@ -156,28 +158,39 @@ func (msg subPeersMsg) String() string {
return fmt.Sprintf("%T: request peers > PO%02d. ", msg, msg.Depth)
}

// handleSubPeersMsg handles incoming subPeersMsg
// this message represents the saturation depth of the remote peer
// saturation depth is the radius within which the peer subscribes to peers
// the first time this is received we send peer info on all
// our connected peers that fall within peers saturation depth
// otherwise this depth is just recorded on the peer, so that
// subsequent new connections are sent iff they fall within the radius
func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
d.setDepth(msg.Depth)
// only send peers after the initial subPeersMsg
if !d.sentPeers {
d.setDepth(msg.Depth)
var peers []*BzzAddr
// iterate connection in ascending order of disctance from the remote address
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po {
// terminate if we are beyond the radius
if uint8(po) < msg.Depth {
return false
}
if !d.seen(p.BzzAddr) {
if !d.seen(p.BzzAddr) { // here just records the peer sent
peers = append(peers, p.BzzAddr)
}
return true
})
// if useful peers are found, send them over
if len(peers) > 0 {
go d.Send(context.TODO(), &peersMsg{Peers: peers})
go d.Send(context.TODO(), &peersMsg{Peers: sortPeers(peers)})
}
}
d.sentPeers = true
return nil
}

// seen takes an peer address and checks if it was sent to a peer already
// seen takes a peer address and checks if it was sent to a peer already
// if not, marks the peer as sent
func (d *Peer) seen(p *BzzAddr) bool {
d.mtx.Lock()
Expand All @@ -201,3 +214,7 @@ func (d *Peer) setDepth(depth uint8) {
defer d.mtx.Unlock()
d.depth = depth
}

func noSortPeers(peers []*BzzAddr) []*BzzAddr {
return peers
}
206 changes: 204 additions & 2 deletions swarm/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,32 @@
package network

import (
"crypto/ecdsa"
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sort"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/pot"
)

/***
*
* - after connect, that outgoing subpeersmsg is sent
*
*/
func TestDiscovery(t *testing.T) {
func TestSubPeersMsg(t *testing.T) {
params := NewHiveParams()
s, pp, err := newHiveTester(t, params, 1, nil)
s, pp, err := newHiveTester(params, 1, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -58,3 +71,192 @@ func TestDiscovery(t *testing.T) {
t.Fatal(err)
}
}

const (
maxPO = 8 // PO of pivot and control; chosen to test enough cases but not run too long
maxPeerPO = 6 // pivot has no peers closer than this to the control peer
maxPeersPerPO = 3
)

// TestInitialPeersMsg tests if peersMsg response to incoming subPeersMsg is correct
func TestInitialPeersMsg(t *testing.T) {
for po := 0; po < maxPO; po++ {
for depth := 0; depth < maxPO; depth++ {
t.Run(fmt.Sprintf("PO=%d,advertised depth=%d", po, depth), func(t *testing.T) {
testInitialPeersMsg(t, po, depth)
})
}
}
}

// testInitialPeersMsg tests that the correct set of peer info is sent
// to another peer after receiving their subPeersMsg request
func testInitialPeersMsg(t *testing.T, peerPO, peerDepth int) {
// generate random pivot address
prvkey, err := crypto.GenerateKey()
if err != nil {
t.Fatal(err)
}

defer func(orig func([]*BzzAddr) []*BzzAddr) {
sortPeers = orig
}(sortPeers)
sortPeers = testSortPeers
pivotAddr := pot.NewAddressFromBytes(PrivateKeyToBzzKey(prvkey))
// generate control peers address at peerPO wrt pivot
peerAddr := pot.RandomAddressAt(pivotAddr, peerPO)
// construct kademlia and hive
to := NewKademlia(pivotAddr[:], NewKadParams())
hive := NewHive(NewHiveParams(), to, nil)

// expected addrs in peersMsg response
var expBzzAddrs []*BzzAddr
connect := func(a pot.Address, po int) (addrs []*BzzAddr) {
n := rand.Intn(maxPeersPerPO)
for i := 0; i < n; i++ {
peer, err := newDiscPeer(pot.RandomAddressAt(a, po))
if err != nil {
t.Fatal(err)
}
hive.On(peer)
addrs = append(addrs, peer.BzzAddr)
}
return addrs
}
register := func(a pot.Address, po int) {
addr := pot.RandomAddressAt(a, po)
hive.Register(&BzzAddr{OAddr: addr[:]})
}

// generate connected and just registered peers
for po := maxPeerPO; po >= 0; po-- {
// create a fake connected peer at po from peerAddr
ons := connect(peerAddr, po)
// create a fake registered address at po from peerAddr
register(peerAddr, po)
// we collect expected peer addresses only up till peerPO
if po < peerDepth {
continue
}
expBzzAddrs = append(expBzzAddrs, ons...)
}

// add extra connections closer to pivot than control
for po := peerPO + 1; po < maxPO; po++ {
ons := connect(pivotAddr, po)
if peerDepth <= peerPO {
expBzzAddrs = append(expBzzAddrs, ons...)
}
}

// create a special bzzBaseTester in which we can associate `enode.ID` to the `bzzAddr` we created above
s, _, err := newBzzBaseTesterWithAddrs(prvkey, [][]byte{peerAddr[:]}, DiscoverySpec, hive.Run)
if err != nil {
t.Fatal(err)
}

// peerID to use in the protocol tester testExchange expect/trigger
peerID := s.Nodes[0].ID()
// block until control peer is found among hive peers
found := false
for attempts := 0; attempts < 20; attempts++ {
if _, found = hive.peers[peerID]; found {
break
}
time.Sleep(1 * time.Millisecond)
}

if !found {
t.Fatal("timeout waiting for peer connection to start")
}

// pivotDepth is the advertised depth of the pivot node we expect in the outgoing subPeersMsg
pivotDepth := hive.saturation()
// the test exchange is as follows:
// 1. pivot sends to the control peer a `subPeersMsg` advertising its depth (ignored)
// 2. peer sends to pivot a `subPeersMsg` advertising its own depth (arbitrarily chosen)
// 3. pivot responds with `peersMsg` with the set of expected peers
err = s.TestExchanges(
p2ptest.Exchange{
Label: "outgoing subPeersMsg",
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &subPeersMsg{Depth: uint8(pivotDepth)},
Peer: peerID,
},
},
},
p2ptest.Exchange{
Label: "trigger subPeersMsg and expect peersMsg",
Triggers: []p2ptest.Trigger{
{
Code: 1,
Msg: &subPeersMsg{Depth: uint8(peerDepth)},
Peer: peerID,
},
},
Expects: []p2ptest.Expect{
{
Code: 0,
Msg: &peersMsg{Peers: testSortPeers(expBzzAddrs)},
Peer: peerID,
Timeout: 100 * time.Millisecond,
},
},
})

// for values MaxPeerPO < peerPO < MaxPO the pivot has no peers to offer to the control peer
// in this case, no peersMsg will be sent out, and we would run into a time out
if len(expBzzAddrs) == 0 {
if err != nil {
if err.Error() != "exchange #1 \"trigger subPeersMsg and expect peersMsg\": timed out" {
t.Fatalf("expected timeout, got %v", err)
}
return
}
t.Fatalf("expected timeout, got no error")
}

if err != nil {
t.Fatal(err)
}
}

func testSortPeers(peers []*BzzAddr) []*BzzAddr {
comp := func(i, j int) bool {
vi := binary.BigEndian.Uint64(peers[i].OAddr)
vj := binary.BigEndian.Uint64(peers[j].OAddr)
return vi < vj
}
sort.Slice(peers, comp)
return peers
}

// as we are not creating a real node via the protocol,
// we need to create the discovery peer objects for the additional kademlia
// nodes manually
func newDiscPeer(addr pot.Address) (*Peer, error) {
pKey, err := ecdsa.GenerateKey(crypto.S256(), crand.Reader)
if err != nil {
return nil, err
}
pubKey := pKey.PublicKey
nod := enode.NewV4(&pubKey, net.IPv4(127, 0, 0, 1), 0, 0)
bzzAddr := &BzzAddr{OAddr: addr[:], UAddr: []byte(nod.String())}
id := nod.ID()
p2pPeer := p2p.NewPeer(id, id.String(), nil)
return NewPeer(&BzzPeer{
Peer: protocols.NewPeer(p2pPeer, &dummyMsgRW{}, DiscoverySpec),
BzzAddr: bzzAddr,
}, nil), nil
}

type dummyMsgRW struct{}

func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) {
return p2p.Msg{}, nil
}
func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error {
return nil
}
Loading

0 comments on commit 0529015

Please sign in to comment.