Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol methods refactoring. #182

Merged
merged 16 commits into from
Dec 10, 2019
56 changes: 33 additions & 23 deletions pkg/p2p/kadcast/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package kadcast

import (
"encoding/binary"
log "github.com/sirupsen/logrus"
"net"

log "github.com/sirupsen/logrus"

"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
)

Expand Down Expand Up @@ -39,9 +40,9 @@ func getPacketFromStream(stream []byte) Packet {
// Deserializes the packet into an slice of bytes.
func (pac Packet) asBytes() []byte {
hl := len(pac.headers)
l := hl + len(pac.payload)
byteRepr := make([]byte, l)
copy(byteRepr, pac.headers[:])
l := hl + len(pac.payload)
byteRepr := make([]byte, l)
copy(byteRepr, pac.headers[:])
copy(byteRepr[hl:], pac.payload[:])
return byteRepr
}
Expand All @@ -63,21 +64,21 @@ func (pac Packet) getHeadersInfo() (byte, [16]byte, [4]byte, [2]byte) {
// Gets the Packet headers parts and puts them into the
// header attribute of the Packet.
func (pac *Packet) setHeadersInfo(tipus byte, router Router, destPeer Peer) {
var headers []byte
headers := make([]byte, 24)
// Add `Packet` type.
headers = append(headers[:], tipus)
headers[0] = tipus
// Add MyPeer ID
headers = append(headers[:], router.MyPeerInfo.id[:]...)
copy(headers[1:17], router.MyPeerInfo.id[0:16])
// Attach IdNonce
idNonce := getBytesFromUint32(router.myPeerNonce)
headers = append(headers[:], idNonce[:]...)
copy(headers[17:21], idNonce[0:4])
// Attach Port
port := getBytesFromUint16(destPeer.port)
headers = append(headers[:], port[:]...)
copy(headers[21:23], port[0:2])

// Build headers array from the slice.
var headersArr [24]byte
copy(headersArr[:], headers[0:24])
copy(headersArr[:], headers[0:23])

pac.headers = headersArr
}
Expand Down Expand Up @@ -137,7 +138,7 @@ func (pac Packet) getNodesPayloadInfo() []Peer {
}

// ProcessPacket recieves a Packet and processes it according to
// it's type. It gets the packets from the circularqueue that
// it's type. It gets the packets from the circularqueue that
// connects the listeners with the packet processor.
func ProcessPacket(queue *ring.Buffer, router *Router) {
// Instantiate now the variables to not pollute
Expand All @@ -150,12 +151,12 @@ func ProcessPacket(queue *ring.Buffer, router *Router) {
for {
// Get all of the packets that are now on the queue.
queuePackets, _ := queue.GetAll()
NextItem: for _, item := range queuePackets {
for _, item := range queuePackets {
// Get items from the queue packet taken.
byteNum, senderAddr, udpPayload, err = decodeRedPacket(item)
if err != nil {
log.WithError(err).Warn("Error decoding the packet taken from the ring.")
break NextItem
continue
}
// Build packet struct
packet = getPacketFromStream(udpPayload[:])
Expand All @@ -168,8 +169,8 @@ func ProcessPacket(queue *ring.Buffer, router *Router) {
// Peer was not validated.
if err := verifyIDNonce(senderID, nonce); err != nil {
log.WithError(err).Warn("Incorrect packet sender ID. Skipping its processing.")
break NextItem
}
continue
}

// Build Peer info and put the right port on it subsituting the one
// used to send the message by the one where the peer wants to receive
Expand All @@ -181,24 +182,33 @@ func ProcessPacket(queue *ring.Buffer, router *Router) {
// Check packet type and process it.
switch tipus {
case 0:
log.Info("Recieved PING message from %v", peerInf.ip[:])
log.WithField(
"Source-IP", peerInf.ip[:],
).Infoln("Recieved PING message")
handlePing(peerInf, router)
case 1:
log.Info("Recieved PONG message from %v", peerInf.ip[:])
log.WithField(
"Source-IP", peerInf.ip[:],
).Infoln("Recieved PONG message")
handlePong(peerInf, router)

case 2:
log.Info("Recieved FIND_NODES message from %v", peerInf.ip[:])
log.WithField(
"Source-IP", peerInf.ip[:],
).Infoln("Recieved FIND_NODES message")
handleFindNodes(peerInf, router)

case 3:
log.Info("Recieved NODES message from %v", peerInf.ip[:])
log.WithField(
"Source-IP", peerInf.ip[:],
).Infoln("Recieved NODES message")
handleNodes(peerInf, packet, router, byteNum)
}
}
}
}

// Processes the `PING` packet info sending back a
// Processes the `PING` packet info sending back a
// `PONG` message and adding the sender to the buckets.
func handlePing(peerInf Peer, router *Router) {
// Process peer addition to the tree.
Expand All @@ -214,7 +224,7 @@ func handlePong(peerInf Peer, router *Router) {
router.tree.addPeer(router.MyPeerInfo, peerInf)
}

// Processes the `FIND_NODES` packet info sending back a
// Processes the `FIND_NODES` packet info sending back a
// `NODES` message and adding the sender to the buckets.
func handleFindNodes(peerInf Peer, router *Router) {
// Process peer addition to the tree.
Expand All @@ -224,15 +234,15 @@ func handleFindNodes(peerInf Peer, router *Router) {
router.sendNodes(peerInf)
}

// Processes the `NODES` packet info sending back a
// Processes the `NODES` packet info sending back a
// `PING` message to all of the Peers announced on the packet
// and adding the sender to the buckets.
func handleNodes(peerInf Peer, packet Packet, router *Router, byteNum int) {
// See if the packet info is consistent:
// peerNum announced <=> bytesPerPeer * peerNum
if !packet.checkNodesPayloadConsistency(byteNum) {
// Since the packet is not consisten, we just discard it.
log.Info("NODES message recieved with corrupted payload. PeerNum mismatch!\nIgnoring the packet.")
log.Info("NODES message recieved with corrupted payload. Packet ignored.")
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/p2p/kadcast/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func serializePeer(peerBytes []byte) Peer {
func (peer Peer) computePeerNonce() uint32 {
var nonce uint32 = 0
var hash [32]byte
data := make([]byte, 18)
data := make([]byte, 20)
id := peer.id
for {
bytesUint := getBytesFromUint32(nonce)
copy(data[0:16], id[0:16])
copy(data[16:18], bytesUint[0:2])
copy(data[16:20], bytesUint[0:4])
hash = sha3.Sum256(data)
if (hash[31]) == 0 {
return nonce
Expand Down
65 changes: 29 additions & 36 deletions pkg/p2p/kadcast/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,31 @@ package kadcast

import (
"errors"
log "github.com/sirupsen/logrus"
"time"

log "github.com/sirupsen/logrus"
)

// InitBootstrap inits the Bootstrapping process by sending
// InitBootstrap inits the Bootstrapping process by sending
// a `PING` message to every bootstrapping node repeatedly.
// If it tried 3 or more times and no new `Peers` were added,
// it panics.
// Otherways, it returns `nil` and logs the Number of peers
// it panics.
// Otherways, it returns `nil` and logs the Number of peers
// the node is connected to at the end of the process.
func InitBootstrap(router *Router, bootNodes []Peer) error {
log.Info("Bootstrapping process started.")
// Get PeerList ordered by distance so we can compare it
// after the `PONG` arrivals.
initPeerNum := router.tree.getTotalPeers()
for i := 0; i <= 3; i++ {
// Send `PING` to the bootstrap nodes.
for _, peer := range bootNodes {
router.sendPing(peer)
}
// Wait for `PONG` responses.
time.Sleep(time.Second * 5)
// If new peers were added (the bootstrap nodes)
// we consider that the bootstrapping succeeded.
actualPeers := router.tree.getTotalPeers()

actualPeers := router.pollBootstrappingNodes(bootNodes, time.Second*5)
if actualPeers <= initPeerNum {
if i == 3 {
return errors.New("\nMaximum number of attempts achieved. Please review yor connection settings\n")
}
return errors.New("\nMaximum number of attempts achieved. Please review yor connection settings\n")
}
log.WithFields(log.Fields{
"Tries": i,
"Retries": i,
}).Warn("Bootstrapping nodes were not added.\nTrying again..")
} else {
break
Expand All @@ -46,31 +40,30 @@ func InitBootstrap(router *Router, bootNodes []Peer) error {

// StartNetworkDiscovery triggers the network discovery process.
// The node basically sends `FIND_NODES` messages to the nodes it
// is currently connected to and evaluates the `Peers` that were added
// on each iteration.
// is currently connected to and evaluates the `Peers` that were added
// on each iteration.
// If the closest peer to ours is the same during two iterations of the
// `FIND_NODES` message, we finish the process logging the ammout of peers
// we are currently connected to.
// Otherways, if the closest Peer on two consecutive iterations changes, we
// Otherways, if the closest Peer on two consecutive iterations changes, we
// keep queriyng the `alpha` closest nodes with `FIND_NODES` messages.
func StartNetworkDiscovery(router *Router) {
var actualClosest []Peer
previousClosest := router.getXClosestPeersTo(1, router.MyPeerInfo)
// Ask for nodes to `alpha` closest nodes to my peer.
router.sendFindNodes()
// Wait until response arrives and we query the nodes.
time.Sleep(time.Second * 5)
for {
actualClosest = router.getXClosestPeersTo(1, router.MyPeerInfo)
if actualClosest[0] == previousClosest[0] {
log.Info("Network Discovery process has finished!.\nYou're now connected to %v", router.tree.getTotalPeers())
return
}
// We get the closest actual Peer.
// Get closest actual Peer.
previousClosestArr := router.getXClosestPeersTo(1, router.MyPeerInfo)
previousClosest := previousClosestArr[0]

// Ask for new peers, wait for `PONG` arrivals and get the
// new closest `Peer`.
actualClosest := router.pollClosestPeer(5 * time.Second)

// Until we don't get a peer closer to our node on each poll,
// we look for more nodes.
for actualClosest != previousClosest {
previousClosest = actualClosest
// Send `FIND_NODES` again.
router.sendFindNodes()
// Wait until response arrives and we query the nodes.
time.Sleep(time.Second * 15)
actualClosest = router.pollClosestPeer(5 * time.Second)
}

log.WithFields(log.Fields{
"peers_connected": router.tree.getTotalPeers(),
}).Info("Network Discovery process finished.")
}
74 changes: 60 additions & 14 deletions pkg/p2p/kadcast/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package kadcast

import (
"net"
"sync"
"sort"
"time"
)

// K is the number of peers that a node will send on
// a `NODES` message.
const K int = 20

// Alpha is the number of nodes to which a node will
// ask for new nodes with `FIND_NODES` messages.
const Alpha int = 3
Expand Down Expand Up @@ -46,7 +49,6 @@ func MakeRouter(externIP [4]byte, port uint16) Router {
// //
// --------------------------------------------------//


// Returns the complete list of Peers in order to be sorted
// as they have the xor distance in respec to a Peer as a parameter.
func (router Router) getPeerSortDist(refPeer Peer) []PeerSort {
Expand Down Expand Up @@ -106,32 +108,76 @@ func (router Router) getXClosestPeersTo(peerNum int, refPeer Peer) []Peer {
return xPeers
}

// Sends a `FIND_NODES` messages to the `alpha` closest peers
// the node knows and waits for a certain time in order to wait
// for the `PONG` message arrivals.
// Then looks for the closest peer to the node itself into the
// buckets and returns it.
func (router Router) pollClosestPeer(t time.Duration) Peer {
var wg sync.WaitGroup
var ps []Peer
wg.Add(1)
router.sendFindNodes()

timer := time.AfterFunc(t, func() {
ps = router.getXClosestPeersTo(1, router.MyPeerInfo)
wg.Done()
})

wg.Wait()
timer.Stop()
return ps[0]
}

// Sends a `PING` messages to the bootstrap nodes that
// the node knows and waits for a certain time in order to wait
// for the `PONG` message arrivals.
// Returns back the new number of peers the node is connected to.
func (router Router) pollBootstrappingNodes(bootNodes []Peer, t time.Duration) uint64 {
var wg sync.WaitGroup
var peerNum uint64

wg.Add(1)
for _, peer := range bootNodes {
router.sendPing(peer)
}

timer := time.AfterFunc(t, func() {
peerNum = uint64(router.tree.getTotalPeers())
wg.Done()
})

wg.Wait()
timer.Stop()
return peerNum
}

// ------- Packet-sending utilities for the Router ------- //

// Builds and sends a `PING` packet
func (router Router) sendPing(reciever Peer) {
func (router Router) sendPing(receiver Peer) {
// Build empty packet.
var packet Packet
// Fill the headers with the type, ID, Nonce and destPort.
packet.setHeadersInfo(0, router, reciever)
packet.setHeadersInfo(0, router, receiver)

// Since return values from functions are not addressable, we need to
// allocate the reciever UDPAddr
destUDPAddr := reciever.getUDPAddr()
// allocate the receiver UDPAddr
destUDPAddr := receiver.getUDPAddr()
// Send the packet
sendUDPPacket("udp", destUDPAddr, packet.asBytes())
}

// Builds and sends a `PONG` packet
func (router Router) sendPong(reciever Peer) {
func (router Router) sendPong(receiver Peer) {
// Build empty packet.
var packet Packet
// Fill the headers with the type, ID, Nonce and destPort.
packet.setHeadersInfo(1, router, reciever)
packet.setHeadersInfo(1, router, receiver)

// Since return values from functions are not addressable, we need to
// allocate the reciever UDPAddr
destUDPAddr := reciever.getUDPAddr()
// allocate the receiver UDPAddr
destUDPAddr := receiver.getUDPAddr()
// Send the packet
sendUDPPacket("udp", destUDPAddr, packet.asBytes())
}
Expand All @@ -153,17 +199,17 @@ func (router Router) sendFindNodes() {
}

// Builds and sends a `NODES` packet.
func (router Router) sendNodes(reciever Peer) {
func (router Router) sendNodes(receiver Peer) {
// Build empty packet
var packet Packet
// Set headers
packet.setHeadersInfo(3, router, reciever)
// Set payload with the `k` peers closest to reciever.
peersToSend := packet.setNodesPayload(router, reciever)
packet.setHeadersInfo(3, router, receiver)
// Set payload with the `k` peers closest to receiver.
peersToSend := packet.setNodesPayload(router, receiver)
// If we don't have any peers to announce, we just skip sending
// the `NODES` messsage.
if peersToSend == 0 {
return
}
sendUDPPacket("udp", reciever.getUDPAddr(), packet.asBytes())
sendUDPPacket("udp", receiver.getUDPAddr(), packet.asBytes())
}
Loading