Skip to content

Commit

Permalink
Merge d869cd3 into c5a825e
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored Oct 19, 2024
2 parents c5a825e + d869cd3 commit 2472c3e
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 53 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ jobs:
run: |
postgres_enabled=0
if [ ${{ runner.os }} == "Linux" ]; then
sudo apt-get update
sudo apt-get install -y libpcre3 libpcre3-dev
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
postgres_enabled=1
fi
Expand Down
4 changes: 2 additions & 2 deletions tests/test_waku_switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ suite "Waku Switch":
## Given
let
sourceSwitch = newTestSwitch()
wakuSwitch = newWakuSwitch(rng = rng())
wakuSwitch = newWakuSwitch(rng = rng(), relay = Relay.new())
await sourceSwitch.start()
await wakuSwitch.start()

Expand All @@ -46,7 +46,7 @@ suite "Waku Switch":
asyncTest "Waku Switch acts as circuit relayer":
## Setup
let
wakuSwitch = newWakuSwitch(rng = rng())
wakuSwitch = newWakuSwitch(rng = rng(), relay = Relay.new())
sourceClient = RelayClient.new()
destClient = RelayClient.new()
sourceSwitch = newCircuitRelayClientSwitch(sourceClient)
Expand Down
7 changes: 6 additions & 1 deletion waku/common/utils/nat.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ proc setupNat*(
warn "NAT already initialized, skipping as cannot be done multiple times"
else:
singletonNat = true
let extIp = getExternalIP(strategy)
var extIp = none(IpAddress)
try:
extIp = getExternalIP(strategy)
except Exception:
warn "exception in setupNat", error = getCurrentExceptionMsg()

if extIP.isSome():
endpoint.ip = some(extIp.get())
# RedirectPorts in considered a gcsafety violation
Expand Down
36 changes: 36 additions & 0 deletions waku/discovery/autonat_service.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import
chronos,
chronicles,
bearssl/rand,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/connectivity/autonat/core

const AutonatCheckInterval = Opt.some(chronos.seconds(30))

proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = AutonatCheckInterval,
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)

proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()

autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)

return autonatService
31 changes: 28 additions & 3 deletions waku/factory/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import
libp2p/crypto/crypto,
libp2p/builders,
libp2p/nameresolving/nameresolver,
libp2p/transports/wstransport
libp2p/transports/wstransport,
libp2p/protocols/connectivity/relay/client,
libp2p/protocols/connectivity/relay/relay,
libp2p/services/autorelayservice,
libp2p/services/hpservice
import
../waku_enr,
../discovery/waku_discv5,
../waku_node,
../node/peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../discovery/autonat_service

type
WakuNodeBuilder* = object # General
Expand All @@ -38,6 +43,7 @@ type
switchSslSecureKey: Option[string]
switchSslSecureCert: Option[string]
switchSendSignedPeerRecord: Option[bool]
switchIsRelayClient: bool ## circuit-relay related

#Rate limit configs for non-relay req-resp protocols
rateLimitSettings: Option[seq[string]]
Expand Down Expand Up @@ -126,12 +132,14 @@ proc withSwitchConfiguration*(
secureKey = none(string),
secureCert = none(string),
agentString = none(string),
isRelayClient = false,
) =
builder.switchMaxConnections = maxConnections
builder.switchSendSignedPeerRecord = some(sendSignedPeerRecord)
builder.switchSslSecureKey = secureKey
builder.switchSslSecureCert = secureCert
builder.switchAgentString = agentString
builder.switchIsRelayClient = isRelayClient

if not nameResolver.isNil():
builder.switchNameResolver = some(nameResolver)
Expand All @@ -154,6 +162,22 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.record.isNone():
return err("node record is required")

let autonatService = autonat_service.getAutonatService(rng)
var services: seq[Service]

var relay = Relay.new()
## by default, the node is configured as a server relay-circuit node

if builder.switchIsRelayClient:
## The node is considered to be behind a NAT or firewall and then it
## should struggle to be reachable and establish connections to other nodes
relay = RelayClient.new()
let autoRelayService = AutoRelayService.new(1, RelayClient(relay), nil, rng)
let holePunchService = HPService.new(autonatService, autoRelayService)
services = @[Service(holePunchService)]
else:
services = @[Service(autonatService)]

var switch: Switch
try:
switch = newWakuSwitch(
Expand All @@ -170,7 +194,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
agentString = builder.switchAgentString,
peerStoreCapacity = builder.peerStorageCapacity,
services = @[Service(getAutonatService(rng))],
services = services,
relay = relay,
)
except CatchableError:
return err("failed to create switch: " & getCurrentExceptionMsg())
Expand Down
10 changes: 10 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ type WakuNodeConf* = object
name: "dns4-domain-name"
.}: string

## Circuit-relay config
isRelayClient* {.
desc:
"""Set the node as a relay-client.
Set it to true for nodes that run behind a NAT or firewall and
hence would have reachability issues.""",
defaultValue: false,
name: "relay-client"
.}: bool

## Relay config
relay* {.
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"
Expand Down
1 change: 1 addition & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ proc initNode(
sendSignedPeerRecord = conf.relayPeerExchange,
# We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString),
isRelayClient = conf.isRelayClient,
)
builder.withColocationLimit(conf.colocationLimit)
builder.withPeerManagerConfig(
Expand Down
17 changes: 16 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/peerid,
libp2p/discovery/discoverymngr,
libp2p/discovery/rendezvousinterface,
eth/keys,
presto,
metrics,
Expand All @@ -24,6 +26,7 @@ import
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc,
../discovery/waku_discv5,
../waku_enr/sharding,
Expand All @@ -49,6 +52,7 @@ type Waku* = object

wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
discoveryMngr: DiscoveryManager

node*: WakuNode

Expand Down Expand Up @@ -189,6 +193,7 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =

let node = nodeRes.get()

## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
if confCopy.storenode == "":
Expand Down Expand Up @@ -270,7 +275,7 @@ proc updateWaku(waku: ptr Waku): Result[void, string] =

waku[].node.announcedAddresses = netConf.announcedAddresses

printNodeNetworkInfo(waku[].node)
?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node)

return ok()

Expand All @@ -297,6 +302,16 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()

## libp2p DiscoveryManager
waku[].discoveryMngr = DiscoveryManager()
waku[].discoveryMngr.add(
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
)
if not isNil(waku[].node.wakuRelay):
for topic in waku[].node.wakuRelay.getSubscribedTopics():
debug "advertise rendezvous namespace", topic
waku[].discoveryMngr.advertise(RdvNamespace(topic))

return ok()

# Waku shutdown
Expand Down
57 changes: 20 additions & 37 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/connectivity/relay/relay,
libp2p/protocols/connectivity/relay/client,
libp2p/protocols/rendezvous,
libp2p/builders,
libp2p/transports/transport,
Expand Down Expand Up @@ -50,7 +52,8 @@ import
../waku_rln_relay,
./config,
./peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
waku/discovery/autonat_service

declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
Expand Down Expand Up @@ -116,33 +119,6 @@ type
contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings

proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = Opt.some(chronos.seconds(120)),
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)

proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.gcsafe, async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()

autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)

return autonatService

proc new*(
T: type WakuNode,
netConfig: NetConfig,
Expand Down Expand Up @@ -433,9 +409,6 @@ proc startRelay*(node: WakuNode) {.async.} =

await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)

# Start the WakuRelay protocol
await node.wakuRelay.start()

info "relay started successfully"

proc mountRelay*(
Expand Down Expand Up @@ -1289,11 +1262,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =

return false

proc printNodeNetworkInfo*(node: WakuNode): void =
proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] =
let peerInfo = node.switch.peerInfo
var announcedStr = ""
var listenStr = ""
var localIp = ""
var localIp = "0.0.0.0"

try:
localIp = $getPrimaryIPAddr()
Expand All @@ -1302,20 +1275,29 @@ proc printNodeNetworkInfo*(node: WakuNode): void =

info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs

var newAnnouncedAddresses = newSeq[MultiAddress](0)
for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
## Replace "0.0.0.0" or "127.0.0.1" with the localIp
let newAddr = ($address).replace("0.0.0.0", localIp).replace("127.0.0.1", localIp)
let fulladdr = "[" & $newAddr & "/p2p/" & $peerInfo.peerId & "]"
announcedStr &= fulladdr
let newMultiAddr = MultiAddress.init(newAddr).valueOr:
return err("error in updateAnnouncedAddrWithPrimaryIpAddr: " & $error)
newAnnouncedAddresses.add(newMultiAddr)

node.announcedAddresses = newAnnouncedAddresses

for transport in node.switch.transports:
for address in transport.addrs:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
let fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr

## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr, localIp = localIp
info "Announcing addresses", full = announcedStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()

return ok()

proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
Expand Down Expand Up @@ -1355,7 +1337,8 @@ proc start*(node: WakuNode) {.async.} =
node.started = true

if not zeroPortPresent:
printNodeNetworkInfo(node)
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error
else:
info "Listening port is dynamically allocated, address and ENR generation postponed"

Expand Down
5 changes: 4 additions & 1 deletion waku/node/waku_switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/rendezvous,
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/nameresolver,
libp2p/builders,
libp2p/switch,
Expand Down Expand Up @@ -78,6 +79,8 @@ proc newWakuSwitch*(
peerStoreCapacity = none(int), # defaults to 1.25 maxConnections
services: seq[switch.Service] = @[],
rendezvous: RendezVous = nil,
isRelayClient: bool = false,
relay: Relay,
): Switch {.raises: [Defect, IOError, LPError].} =
var b = SwitchBuilder
.new()
Expand All @@ -92,7 +95,7 @@ proc newWakuSwitch*(
.withTcpTransport(transportFlags)
.withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord)
.withCircuitRelay()
.withCircuitRelay(relay)
.withAutonat()

if peerStoreCapacity.isSome():
Expand Down
Loading

0 comments on commit 2472c3e

Please sign in to comment.