From 948c6d349745da8efd86645480452c876b71c7e3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 24 Oct 2024 16:21:40 -0400 Subject: [PATCH] refactor(nwaku)_: ping --- eth-node/bridge/geth/wakuv2.go | 4 +- go.mod | 2 +- go.sum | 4 +- .../go-waku/waku/v2/api/common/pinger.go | 37 ++++++++++++++++ ...odeRequestor.go => storenode_requestor.go} | 0 .../go-waku/waku/v2/api/history/cycle.go | 26 +++-------- vendor/modules.txt | 2 +- wakuv2/gowaku.go | 8 ++-- wakuv2/nwaku.go | 44 ++++++++++++++++--- 9 files changed, 93 insertions(+), 34 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go rename vendor/github.com/waku-org/go-waku/waku/v2/api/common/{storenodeRequestor.go => storenode_requestor.go} (100%) diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index c35a18e8a1..f9a0697ed6 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -331,7 +331,9 @@ func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID { } func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool { - return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return w.waku.StorenodeCycle.WaitForAvailableStoreNode(ctx) } func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) { diff --git a/go.mod b/go.mod index feac1f059c..8934cda49e 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 + github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 9bfc78ac6c..3144ff6ec7 100644 --- a/go.sum +++ b/go.sum @@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 h1:R2LscQHxKdVVdRIz7zcZWOkjcZDz753fflW5TPunJN0= -github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a h1:epN2bp1mPzdg3S7S2iR72GsUPix7irc3UgM4W9NZJpU= +github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go new file mode 100644 index 0000000000..ba8c26a21b --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/pinger.go @@ -0,0 +1,37 @@ +package common + +import ( + "context" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +type Pinger interface { + PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) +} + +type defaultPingImpl struct { + host host.Host +} + +func NewDefaultPinger(host host.Host) Pinger { + return &defaultPingImpl{ + host: host, + } +} + +func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + pingResultCh := ping.Ping(ctx, d.host, peerID) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case r := <-pingResultCh: + if r.Error != nil { + return 0, r.Error + } + return r.RTT, nil + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go similarity index 100% rename from vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go rename to vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenode_requestor.go diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index d345314069..9404579bf5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -14,9 +14,8 @@ import ( "sync" "time" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" ) @@ -55,9 +54,8 @@ type StorenodeCycle struct { logger *zap.Logger - host host.Host - storenodeConfigProvider StorenodeConfigProvider + pinger common.Pinger StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}] StorenodeChangedEmitter *Emitter[peer.ID] @@ -71,7 +69,7 @@ type StorenodeCycle struct { peers map[peer.ID]peerStatus } -func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { +func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle { return &StorenodeCycle{ StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](), StorenodeChangedEmitter: NewEmitter[peer.ID](), @@ -81,9 +79,8 @@ func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle { } } -func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) { +func (m *StorenodeCycle) Start(ctx context.Context) { m.logger.Debug("starting storenode cycle") - m.host = h m.failedRequests = make(map[peer.ID]uint) m.peers = make(map[peer.ID]peerStatus) @@ -194,7 +191,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - rtt, err := m.pingPeer(ctx, peerID) + rtt, err := m.pinger.PingPeer(ctx, peerID) if err == nil { // pinging storenodes might fail, but we don't care availableStorenodesMutex.Lock() availableStorenodes[peerID] = rtt @@ -233,19 +230,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, return result } -func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { - pingResultCh := ping.Ping(ctx, m.host, peerID) - select { - case <-ctx.Done(): - return 0, ctx.Err() - case r := <-pingResultCh: - if r.Error != nil { - return 0, r.Error - } - return r.RTT, nil - } -} - func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { // we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581 if overrideDNS { diff --git a/vendor/modules.txt b/vendor/modules.txt index 137423d741..4b8aaf5c7a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 +# github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/gowaku.go b/wakuv2/gowaku.go index 6d9012bd63..2eea195278 100644 --- a/wakuv2/gowaku.go +++ b/wakuv2/gowaku.go @@ -75,6 +75,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" + gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -1067,10 +1069,10 @@ func (w *Waku) Start() error { return fmt.Errorf("failed to start go-waku node: %v", err) } - w.StorenodeCycle = history.NewStorenodeCycle(w.logger) - w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger) + w.StorenodeCycle = history.NewStorenodeCycle(w.logger, commonapi.NewDefaultPinger(w.node.Host())) + w.HistoryRetriever = history.NewHistoryRetriever(missing.NewDefaultStorenodeRequestor(w.node.Store()), NewHistoryProcessorWrapper(w), w.logger) - w.StorenodeCycle.Start(w.ctx, w.node.Host()) + w.StorenodeCycle.Start(w.ctx) w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index f0b689f0c6..842c21d192 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -212,6 +212,10 @@ package wakuv2 WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); } + static void cGoWakuPingPeer(void* ctx, char* peerAddr, int timeoutMs, void* resp) { + WAKU_CALL (waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) callback, resp) ); + } + static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } @@ -1374,10 +1378,10 @@ func (w *Waku) Start() error { return fmt.Errorf("failed to start go-waku node: %v", err) } */ - w.StorenodeCycle = history.NewStorenodeCycle(w.logger) + w.StorenodeCycle = history.NewStorenodeCycle(w.logger, newPinger(w.wakuCtx)) w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger) - w.StorenodeCycle.Start(w.ctx, newPinger(w.wakuCtx)) + w.StorenodeCycle.Start(w.ctx) w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.PeerID())) @@ -3064,18 +3068,48 @@ func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, reque return result, nil } -func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor { - return &storenodeRequestor{ +type pinger struct { + wakuCtx unsafe.Pointer +} + +func newPinger(wakuCtx unsafe.Pointer) commonapi.Pinger { + return &pinger{ wakuCtx: wakuCtx, - logger: logger.Named("storenodeRequestor"), } } +func (p *pinger) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { + var resp = C.allocResp() + var cPeerId = C.CString(peerID.String()) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPeerId)) + + C.cGoWakuPingPeer(p.wakuCtx, cPeerId, C.int(time.Minute.Milliseconds()), resp) + if C.getRet(resp) == C.RET_OK { + rttStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + rttInt, err := strconv.ParseInt(rttStr, 10, 64) + if err != nil { + return 0, err + } + return time.Duration(rttInt), nil + } + + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, fmt.Errorf("PingPeer: %s", errMsg) +} + type storenodeRequestor struct { wakuCtx unsafe.Pointer logger *zap.Logger } +func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor { + return &storenodeRequestor{ + wakuCtx: wakuCtx, + logger: logger.Named("storenodeRequestor"), + } +} + func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) { requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())