Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supporting peer exchange with nwaku #5983

Merged
Merged
2 changes: 1 addition & 1 deletion third_party/nwaku
87 changes: 72 additions & 15 deletions wakuv2/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ package wakuv2
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) );
}

static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) );
}

static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}
Expand All @@ -216,6 +220,10 @@ package wakuv2
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) );
}

static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) );
}

static void cGoWakuLightpushPublish(void* wakuCtx,
const char* pubSubTopic,
const char* jsonWakuMessage,
Expand Down Expand Up @@ -380,8 +388,12 @@ type WakuConfig struct {
Staticnodes []string `json:"staticnodes,omitempty"`
Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"`
Discv5Discovery bool `json:"discv5Discovery,omitempty"`
Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"`
ClusterID uint16 `json:"clusterId,omitempty"`
Shards []uint16 `json:"shards,omitempty"`
PeerExchange bool `json:"peerExchange,omitempty"`
PeerExchangeNode string `json:"peerExchangeNode,omitempty"`
TcpPort uint16 `json:"tcpPort,omitempty"`
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -497,9 +509,11 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon
return nil, err
}

err = node.WakuRelaySubscribe(defaultPubsubTopic)
if err != nil {
return nil, err
if nwakuCfg.EnableRelay {
err = node.WakuRelaySubscribe(defaultPubsubTopic)
if err != nil {
return nil, err
}
}

node.WakuSetEventCallback()
Expand Down Expand Up @@ -2191,10 +2205,23 @@ func (w *Waku) Clean() error {
return nil
}

// TODO-nwaku
func (w *Waku) PeerID() peer.ID {
// return w.node.Host().ID()
return ""
func (w *Waku) PeerID() (peer.ID, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetMyPeerId(w.wakuCtx, resp)

if C.getRet(resp) == C.RET_OK {

peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
id, err := peer.Decode(peerIdStr)
if err != nil {
errMsg := "WakuGetMyPeerId - decoding peerId: %w"
return "", fmt.Errorf(errMsg, err)
}
return id, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg)
}

// validatePrivateKey checks the format of the given private key.
Expand Down Expand Up @@ -2609,18 +2636,21 @@ func wakuStoreQuery(
return "", errors.New(errMsg)
}

func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (string, error) {
func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) {
var resp = C.allocResp()
defer C.freeResp(resp)

C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp)
if C.getRet(resp) == C.RET_OK {
msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return msg, nil
numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64)
if err != nil {
return 0, err
}
return numRecvPeers, nil
}
errMsg := "error WakuPeerExchangeRequest: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return "", errors.New(errMsg)
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return 0, fmt.Errorf("WakuPeerExchangeRequest: %s", errMsg)
}

func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error {
Expand Down Expand Up @@ -2753,6 +2783,34 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error)
return 0, errors.New(errMsg)
}

func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) {
var resp = C.allocResp()
defer C.freeResp(resp)
C.cGoWakuGetPeerIdsFromPeerStore(self.wakuCtx, resp)

if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
return peer.IDSlice{}, nil
}
// peersStr contains a comma-separated list of peer ids
itemsPeerIds := strings.Split(peersStr, ",")

var peers peer.IDSlice
for _, peerId := range itemsPeerIds {
id, err := peer.Decode(peerId)
if err != nil {
return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err)
}
peers = append(peers, id)
}

return peers, nil
}
errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg)
}

func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) {
var resp = C.allocResp()
var cProtocol = C.CString(protocol)
Expand All @@ -2773,8 +2831,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) {
for _, p := range itemsPeerIds {
id, err := peer.Decode(p)
if err != nil {
errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error()
return nil, errors.New(errMsg)
return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err)
}
peers = append(peers, id)
}
Expand Down
151 changes: 147 additions & 4 deletions wakuv2/nwaku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -155,7 +156,7 @@ func parseNodes(rec []string) []*enode.Node {
// IP_ADDRESS=$(hostname -I | awk '{print $1}');
// docker run \
// -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \
// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \
// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG --shard=64 --tcp-port=61000 \
// --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \

func TestBasicWakuV2(t *testing.T) {
Expand Down Expand Up @@ -195,7 +196,6 @@ func TestBasicWakuV2(t *testing.T) {

// Sanity check, not great, but it's probably helpful
err = tt.RetryWithBackOff(func() error {

numConnected, err := w.GetNumConnectedPeers()
if err != nil {
return err
Expand Down Expand Up @@ -323,10 +323,151 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi
return tree, url
}

/*
func TestPeerExchange(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)

discV5NodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}

// start node that will be discovered by PeerExchange
discV5NodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: true,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: false,
Discv5UdpPort: 9001,
TcpPort: 60010,
}

discV5Node, err := New(nil, "", &discV5NodeConfig, &discV5NodeWakuConfig, logger.Named("discV5Node"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, discV5Node.Start())

time.Sleep(1 * time.Second)

discV5NodePeerId, err := discV5Node.PeerID()
require.NoError(t, err)

discv5NodeEnr, err := discV5Node.ENR()
require.NoError(t, err)

pxServerConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}

// start node which serves as PeerExchange server
pxServerWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
Discv5Discovery: true,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: true,
Discv5UdpPort: 9000,
Discv5BootstrapNodes: []string{discv5NodeEnr.String()},
TcpPort: 60011,
}

pxServerNode, err := New(nil, "", &pxServerConfig, &pxServerWakuConfig, logger.Named("pxServerNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, pxServerNode.Start())

// Adding an extra second to make sure PX cache is not empty
time.Sleep(2 * time.Second)

serverNodeMa, err := pxServerNode.ListenAddresses()
require.NoError(t, err)
require.NotNil(t, serverNodeMa)

// Sanity check, not great, but it's probably helpful
options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 30 * time.Second
}

// Check that pxServerNode has discV5Node in its Peer Store
err = tt.RetryWithBackOff(func() error {
peers, err := pxServerNode.GetPeerIdsFromPeerStore()

if err != nil {
return err
}

if slices.Contains(peers, discV5NodePeerId) {
return nil
}

return errors.New("pxServer is missing the discv5 node in its peer store")
}, options)
require.NoError(t, err)

pxClientConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
}

// start light node which uses PeerExchange to discover peers
pxClientWakuConfig := WakuConfig{
EnableRelay: false,
LogLevel: "DEBUG",
Discv5Discovery: false,
ClusterID: 16,
Shards: []uint16{64},
PeerExchange: true,
Discv5UdpPort: 9002,
TcpPort: 60012,
PeerExchangeNode: serverNodeMa[0].String(),
}

lightNode, err := New(nil, "", &pxClientConfig, &pxClientWakuConfig, logger.Named("lightNode"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, lightNode.Start())

time.Sleep(1 * time.Second)

pxServerPeerId, err := pxServerNode.PeerID()
require.NoError(t, err)

// Check that the light node discovered the discV5Node and has both nodes in its peer store
err = tt.RetryWithBackOff(func() error {
peers, err := lightNode.GetPeerIdsFromPeerStore()
if err != nil {
return err
}

if slices.Contains(peers, discV5NodePeerId) && slices.Contains(peers, pxServerPeerId) {
return nil
}
return errors.New("lightnode is missing peers")
}, options)
require.NoError(t, err)

// Now perform the PX request manually to see if it also works
err = tt.RetryWithBackOff(func() error {
numPeersReceived, err := lightNode.WakuPeerExchangeRequest(1)
if err != nil {
return err
}

if numPeersReceived == 1 {
return nil
}
return errors.New("Peer Exchange is not returning peers")
}, options)
require.NoError(t, err)

// Stop nodes
require.NoError(t, lightNode.Stop())
require.NoError(t, pxServerNode.Stop())
require.NoError(t, discV5Node.Stop())

/* logger, err := zap.NewDevelopment()
require.NoError(t, err)
// start node which serve as PeerExchange server
config := &Config{}
config.ClusterID = 16
Expand Down Expand Up @@ -401,9 +542,11 @@ func TestPeerExchange(t *testing.T) {

require.NoError(t, lightNode.Stop())
require.NoError(t, pxServerNode.Stop())
require.NoError(t, discV5Node.Stop())
require.NoError(t, discV5Node.Stop()) */
}

/*

func TestWakuV2Filter(t *testing.T) {
t.Skip("flaky test")

Expand Down
Loading