Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an ObservedAddrManager and add an AddressMapper in AutonatService and AutoRelayService #871

Merged
merged 49 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
2033712
[skip ci] Start of upgrade refacto
Menduist Nov 22, 2022
df97314
kinda working
Menduist Nov 23, 2022
1ab9cc9
remove upgraded system
Menduist Nov 23, 2022
8b76889
re-add custom matcher
Menduist Nov 23, 2022
b954dad
Simplify connManager
Menduist Nov 23, 2022
c0316e3
fix
Menduist Nov 23, 2022
12ac83f
fix tests
Menduist Nov 24, 2022
7ad1ea8
try fix test
Menduist Nov 25, 2022
263b9e8
Fix GossipSub race condition
Menduist Nov 25, 2022
7b98af6
fix more race conditions
Menduist Nov 25, 2022
99bc8ee
Fix GossipSub race condition
Menduist Nov 25, 2022
6992d81
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Jan 2, 2023
a6cec9f
Better fix
Menduist Jan 2, 2023
696a5b4
Merge branch 'fixgossiprace' into upgraderefacto
Menduist Jan 2, 2023
92a425c
Fix typo
Menduist Jan 3, 2023
95fb447
Merge branch 'fixgossiprace' into upgraderefacto
Menduist Jan 3, 2023
7a74dd3
fix ci
Menduist Jan 3, 2023
5510ef7
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Jan 10, 2023
281a15a
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Jan 24, 2023
6acf4b4
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Jan 25, 2023
c279e5a
Fix pubsub
Menduist Jan 25, 2023
5c6fe92
fix short agent
Menduist Jan 30, 2023
5dc290b
Update libp2p/multistream.nim
Menduist Jan 30, 2023
c5f2dbd
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Feb 21, 2023
de06b46
Add getWrapped to YamuxChannel
Menduist Feb 21, 2023
2ab82c0
Merge remote-tracking branch 'origin/upgraderefacto' into upgraderefacto
Menduist Feb 21, 2023
685966c
fix autonat
Menduist Feb 21, 2023
8618c85
Merge branch 'unstable' into upgraderefacto
diegomrsantos Feb 22, 2023
b07e66b
Merge remote-tracking branch 'origin/unstable' into upgraderefacto
Menduist Mar 2, 2023
c9cc2bc
Trigger events before identify
Menduist Mar 2, 2023
1fee13c
ObservedMAManager
diegomrsantos Mar 2, 2023
19de3d7
improvements
diegomrsantos Mar 3, 2023
7627325
replace the heap by a seq
diegomrsantos Mar 3, 2023
92bfd83
Move manager to identify
diegomrsantos Mar 6, 2023
4fc7b6d
Simplify api
diegomrsantos Mar 6, 2023
22d6c05
Improvements after code review
diegomrsantos Mar 8, 2023
aee8dcf
Merge branch 'unstable' into observed-addr-manager
diegomrsantos Mar 8, 2023
7cb27cc
More fixes
diegomrsantos Mar 8, 2023
c462503
move procs here for reuse
diegomrsantos Mar 10, 2023
e6908d0
improve naming
diegomrsantos Mar 10, 2023
a378b2d
remove proc
diegomrsantos Mar 10, 2023
f4b34f2
Add an AddressMapper to the AutoRelayService
diegomrsantos Mar 13, 2023
e1c4c9e
Remove AddressMapper when AutonatService stops
diegomrsantos Mar 14, 2023
13bfc97
changes after review
diegomrsantos Mar 22, 2023
f83148c
move procs to oam
diegomrsantos Mar 22, 2023
2ead028
make oam more generic
diegomrsantos Mar 23, 2023
410ae77
improve replace
diegomrsantos Mar 24, 2023
8ddf277
add enableAddressMapper flag
diegomrsantos Mar 24, 2023
1eb7731
simplify procs in oam
diegomrsantos Mar 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pinned
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#5d3da66e563d21277b57a9b601744273c083a01b
chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823
faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a
httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f
Expand Down
80 changes: 80 additions & 0 deletions libp2p/observedaddrmanager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/[sequtils, tables],
chronos,
multiaddress

type
## Manages observed MultiAddresses by reomte peers. It keeps track of the most observed IP and IP/Port.
ObservedAddrManager* = ref object of RootObj
observedIPsAndPorts: seq[MultiAddress]
Menduist marked this conversation as resolved.
Show resolved Hide resolved
maxSize: int
minCount: int

IPVersion* = enum
Menduist marked this conversation as resolved.
Show resolved Hide resolved
IPv4, IPv6

proc addObservation*(self:ObservedAddrManager, observedAddr: MultiAddress) =
## Adds a new observed MultiAddress. If the number of observations exceeds maxSize, the oldest one is removed.
## Both IP and IP/Port are tracked.
if self.observedIPsAndPorts.len >= self.maxSize:
self.observedIPsAndPorts.del(0)
self.observedIPsAndPorts.add(observedAddr)

proc getIP(self: ObservedAddrManager, observations: seq[MultiAddress], ipVersion: MaPattern): Opt[MultiAddress] =
var countTable = toCountTable(observations)
countTable.sort()
var orderedPairs = toSeq(countTable.pairs)
for (ma, count) in orderedPairs:
let ip = ma[0].get()
if ipVersion.match(ip) and count >= self.minCount:
return Opt.some(ma)
return Opt.none(MultiAddress)

proc getMostObservedIP*(self: ObservedAddrManager, ipVersion: IPVersion): Opt[MultiAddress] =
## Returns the most observed IP address or none if the number of observations are less than minCount.
let observedIPs = self.observedIPsAndPorts.mapIt(it[0].get())
return self.getIP(observedIPs, if ipVersion == IPv4: IP4 else: IP6)

proc getMostObservedIPAndPort*(self: ObservedAddrManager, ipVersion: IPVersion): Opt[MultiAddress] =
## Returns the most observed IP/Port address or none if the number of observations are less than minCount.
return self.getIP(self.observedIPsAndPorts, if ipVersion == IPv4: IP4 else: IP6)

proc getMostObservedIPsAndPorts*(self: ObservedAddrManager): seq[MultiAddress] =
## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations
## are less than minCount.
var res: seq[MultiAddress]
let ip4 = self.getMostObservedIPAndPort(IPv4)
if ip4.isSome():
res.add(ip4.get())
let ip6 = self.getMostObservedIPAndPort(IPv6)
if ip6.isSome():
res.add(ip6.get())
return res

proc `$`*(self: ObservedAddrManager): string =
## Returns a string representation of the ObservedAddrManager.
return "IPs and Ports: " & $self.observedIPsAndPorts

proc new*(
T: typedesc[ObservedAddrManager],
maxSize = 10,
minCount = 3): T =
## Creates a new ObservedAddrManager.
return T(
observedIPsAndPorts: newSeq[MultiAddress](),
maxSize: maxSize,
minCount: minCount)
39 changes: 38 additions & 1 deletion libp2p/peerstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ else:

import
std/[tables, sets, options, macros],
chronos,
chronos, chronicles,
./crypto/crypto,
./protocols/identify,
./protocols/protocol,
Expand Down Expand Up @@ -220,3 +220,40 @@ proc identify*(
peerStore.updatePeerInfo(info)
finally:
await stream.closeWithEOF()

proc getMostObservedIP*(self: PeerStore, ipVersion: IPVersion): Opt[MultiAddress] =
## Returns the most observed IP address or none if the number of observations are less than minCount.
return self.identify.getMostObservedIP(ipVersion)

proc replaceMAIpByMostObserved*(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by guessDialableAddrs which is used in hpservice and the dcutr server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but this is unrelated to the peer store

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would you add it? I think it is related to the observed addresses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObservedAddressManager?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self: PeerStore,
ma: MultiAddress): Opt[MultiAddress] =
try:
let maIP = ma[0]
let maWithoutIP = ma[1..^1]

if maWithoutIP.isErr():
return Opt.none(MultiAddress)

let observedIP =
if IP4.match(maIP.get()):
self.getMostObservedIP(IPv4)
else:
self.getMostObservedIP(IPv6)

let newMA =
if observedIP.isNone() or maIP.get() == observedIP.get():
ma
else:
observedIP.get() & maWithoutIP.get()

return Opt.some(newMA)
except CatchableError as error:
debug "Error while handling manual port forwarding", msg = error.msg
return Opt.none(MultiAddress)

proc guessDialableAddrs*(self: PeerStore, listenAddrs: seq[MultiAddress]): seq[MultiAddress] =
for l in listenAddrs:
let guess = self.replaceMAIpByMostObserved(l)
if guess.isSome():
result.add(guess.get())
30 changes: 29 additions & 1 deletion libp2p/protocols/connectivity/autonat/service.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ else:
import std/[options, deques, sequtils]
import chronos, metrics
import ../../../switch
import ../../../wire
import client
import ../../../utils/heartbeat
import ../../../crypto/crypto
Expand All @@ -27,6 +28,7 @@ declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability
type
AutonatService* = ref object of Service
newConnectedPeerHandler: PeerEventHandler
addressMapper: AddressMapper
scheduleHandle: Future[void]
networkReachability: NetworkReachability
confidence: Option[float]
Expand Down Expand Up @@ -133,6 +135,7 @@ proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[Netwo
await self.handleAnswer(ans)
if not isNil(self.statusAndConfidenceHandler):
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
await switch.peerInfo.update()
Menduist marked this conversation as resolved.
Show resolved Hide resolved
return ans

proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
Expand All @@ -153,7 +156,30 @@ proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.asy
heartbeat "Scheduling AutonatService run", interval:
await service.run(switch)

proc addressMapper(
self: AutonatService,
peerStore: PeerStore,
listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =

var addrs = newSeq[MultiAddress]()
for listenAddr in listenAddrs:
var processedMA = listenAddr
try:
let hostIP = initTAddress(listenAddr).get()
if not hostIP.isGlobal():
if self.networkReachability == NetworkReachability.Reachable:
Menduist marked this conversation as resolved.
Show resolved Hide resolved
let maOpt = peerStore.replaceMAIpByMostObserved(listenAddr) # handle manual port forwarding
if maOpt.isSome():
processedMA = maOpt.get()
except CatchableError as exc:
debug "Error while handling address mapper", msg = exc.msg
addrs.add(processedMA)
return addrs

method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return await addressMapper(self, switch.peerStore, listenAddrs)

info "Setting up AutonatService"
let hasBeenSetup = await procCall Service(self).setup(switch)
if hasBeenSetup:
Expand All @@ -163,14 +189,14 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
if self.scheduleInterval.isSome():
self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get())
switch.peerInfo.addressMappers.add(self.addressMapper)
return hasBeenSetup

method run*(self: AutonatService, switch: Switch) {.async, public.} =
trace "Running AutonatService"
await askConnectedPeers(self, switch)
await self.callHandler()


method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} =
info "Stopping AutonatService"
let hasBeenStopped = await procCall Service(self).stop(switch)
Expand All @@ -180,6 +206,8 @@ method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public
self.scheduleHandle = nil
if not isNil(self.newConnectedPeerHandler):
switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined)
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
await switch.peerInfo.update()
return hasBeenStopped

proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) =
Expand Down
46 changes: 28 additions & 18 deletions libp2p/protocols/identify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import ../protobuf/minprotobuf,
../multiaddress,
../protocols/protocol,
../utility,
../errors
../errors,
../observedaddrmanager

export observedaddrmanager

logScope:
topics = "libp2p identify"
Expand Down Expand Up @@ -56,6 +59,7 @@ type
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
sendSignedPeerRecord*: bool
observedAddrManager*: ObservedAddrManager

IdentifyPushHandler* = proc (
peer: PeerId,
Expand Down Expand Up @@ -160,7 +164,8 @@ proc new*(
): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord
sendSignedPeerRecord: sendSignedPeerRecord,
observedAddrManager: ObservedAddrManager.new(),
)
identify.init()
identify
Expand All @@ -182,7 +187,7 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec

proc identify*(p: Identify,
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async, gcsafe.} =
trace "initiating identify", conn
Expand All @@ -194,23 +199,24 @@ proc identify*(p: Identify,
let infoOpt = decodeMsg(message)
if infoOpt.isNone():
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
result = infoOpt.get()

if result.pubkey.isSome:
let peer = PeerId.init(result.pubkey.get())
if peer.isErr:
raise newException(IdentityInvalidMsgError, $peer.error)
else:
result.peerId = peer.get()
if peer.get() != remotePeerId:
trace "Peer ids don't match",
remote = peer,
local = remotePeerId

raise newException(IdentityNoMatchError, "Peer ids don't match")
else:

var info = infoOpt.get()
if info.pubkey.isNone():
raise newException(IdentityInvalidMsgError, "No pubkey in identify")

let peer = PeerId.init(info.pubkey.get())
if peer.isErr:
raise newException(IdentityInvalidMsgError, $peer.error)

if peer.get() != remotePeerId:
trace "Peer ids don't match", remote = peer, local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
info.peerId = peer.get()

if info.observedAddr.isSome:
self.observedAddrManager.addObservation(info.observedAddr.get())
return info

proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
## Create a IdentifyPush protocol. `handler` will be called every time
## a peer sends us new `PeerInfo`
Expand Down Expand Up @@ -254,3 +260,7 @@ proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, publi
## Send new `peerInfo`s to a connection
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer)

proc getMostObservedIP*(self: Identify, ipVersion: IPVersion): Opt[MultiAddress] =
## Returns the most observed IP address or none if the number of observations are less than minCount.
return self.observedAddrManager.getMostObservedIP(ipVersion)
17 changes: 15 additions & 2 deletions libp2p/services/autorelayservice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ type
backingOff: seq[PeerId]
peerAvailable: AsyncEvent
onReservation: OnReservationHandler
addressMapper: AddressMapper
rng: ref HmacDrbgContext

proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} =
proc addressMapper(
self: AutoRelayService,
listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return concat(toSeq(self.relayAddresses.values))

proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, switch: Switch) {.async.} =
while self.running:
let
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5))
Expand All @@ -46,11 +52,15 @@ proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId)
break
if relayPid notin self.relayAddresses or self.relayAddresses[relayPid] != relayedAddr:
self.relayAddresses[relayPid] = relayedAddr
await switch.peerInfo.update()
if not self.onReservation.isNil():
self.onReservation(concat(toSeq(self.relayAddresses.values)))
await sleepAsync chronos.seconds(ttl - 30)

method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} =
self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} =
return await addressMapper(self, listenAddrs)

let hasBeenSetUp = await procCall Service(self).setup(switch)
if hasBeenSetUp:
proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} =
Expand All @@ -63,6 +73,7 @@ method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcs
future[].cancel()
switch.addPeerEventHandler(handlePeerJoined, Joined)
switch.addPeerEventHandler(handlePeerLeft, Left)
switch.peerInfo.addressMappers.add(self.addressMapper)
await self.run(switch)
return hasBeenSetUp

Expand Down Expand Up @@ -96,7 +107,7 @@ proc innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} =
for relayPid in connectedPeers:
if self.relayPeers.len() >= self.numRelays:
break
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId)
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch)

if self.relayPeers.len() > 0:
await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait()
Expand All @@ -116,6 +127,8 @@ method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsa
if hasBeenStopped:
self.running = false
self.runner.cancel()
switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper)
await switch.peerInfo.update()
return hasBeenStopped

proc getAddresses*(self: AutoRelayService): seq[MultiAddress] =
Expand Down
4 changes: 2 additions & 2 deletions tests/asyncunit.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest2
import unittest2, chronos

export unittest2
export unittest2, chronos

template asyncTeardown*(body: untyped): untyped =
teardown:
Expand Down
4 changes: 4 additions & 0 deletions tests/testautonatservice.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,13 @@ suite "Autonat Service":
check autonatService.networkReachability() == NetworkReachability.Reachable
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3

check switch1.peerInfo.addrs == switch1.peerStore.guessDialableAddrs(switch1.peerInfo.listenAddrs)

await allFuturesThrowing(
switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop())

check switch1.peerInfo.addrs == switch1.peerInfo.listenAddrs

asyncTest "Peer must be not reachable and then reachable":

let autonatClientStub = AutonatClientStub.new(expectedDials = 6)
Expand Down
Loading