diff --git a/.pinned b/.pinned index 110bc93fb6..bdb288614b 100644 --- a/.pinned +++ b/.pinned @@ -1,6 +1,6 @@ bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a -chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e +chronos;https://github.com/status-im/nim-chronos@#0240dd8b8a7fc54676b8d494a9126b5ffbbb4adf dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823 faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 5e45bd98b8..07055b0338 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -225,6 +225,11 @@ proc getMostObservedIP*(self: PeerStore, ipVersion: IPVersion): Opt[MultiAddress ## Returns the most observed IP address or none if the number of observations are less than minCount. return self.identify.getMostObservedIP(ipVersion) +proc getMostObservedIPsAndPorts*(self: PeerStore): seq[MultiAddress] = + ## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations + ## are less than minCount. + return self.identify.getMostObservedIPsAndPorts() + proc replaceMAIpByMostObserved*( self: PeerStore, ma: MultiAddress): Opt[MultiAddress] = diff --git a/libp2p/protocols/connectivity/autonat/core.nim b/libp2p/protocols/connectivity/autonat/core.nim index ce76f7a28b..db49d4c324 100644 --- a/libp2p/protocols/connectivity/autonat/core.nim +++ b/libp2p/protocols/connectivity/autonat/core.nim @@ -58,6 +58,9 @@ type dial*: Option[AutonatDial] response*: Option[AutonatDialResponse] + NetworkReachability* {.pure.} = enum + NotReachable, Reachable, Unknown + proc encode(p: AutonatPeerInfo): ProtoBuffer = result = initProtoBuffer() if p.id.isSome(): diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 3bf96b68ef..a690521ad3 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -42,9 +42,6 @@ type minConfidence: float dialTimeout: Duration - NetworkReachability* {.pure.} = enum - NotReachable, Reachable, Unknown - StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].} proc new*( @@ -135,6 +132,7 @@ proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[Netwo if not isNil(self.statusAndConfidenceHandler): await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) await switch.peerInfo.update() + echo switch.peerStore.getMostObservedIPsAndPorts() return ans proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 1141ace271..41ea59d6a1 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -264,3 +264,9 @@ proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, publi proc getMostObservedIP*(self: Identify, ipVersion: IPVersion): Opt[MultiAddress] = ## Returns the most observed IP address or none if the number of observations are less than minCount. return self.observedAddrManager.getMostObservedIP(ipVersion) + +proc getMostObservedIPsAndPorts*(self: Identify): seq[MultiAddress] = + ## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations + ## are less than minCount. + echo self.observedAddrManager + return self.observedAddrManager.getMostObservedIPsAndPorts() diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 46aa0a259f..371a34e92c 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -42,7 +42,7 @@ type servers*: seq[StreamServer] clients: array[Direction, seq[StreamTransport]] flags: set[ServerFlags] - clientFlags: set[TransportFlags] + clientFlags: set[ClientFlags] acceptFuts: seq[Future[StreamTransport]] TcpTransportTracker* = ref object of TrackerBase @@ -136,13 +136,14 @@ proc new*( clientFlags: if ServerFlags.TcpNoDelay in flags: compilesOr: - {TransportFlags.TcpNoDelay} + {ClientFlags.TcpNoDelay} do: doAssert(false) default(set[TransportFlags]) else: - default(set[TransportFlags]), - upgrader: upgrade) + default(set[ClientFlags]), + upgrader: upgrade, + networkReachability: NetworkReachability.Unknown) return transport @@ -165,6 +166,8 @@ method start*( trace "Invalid address detected, skipping!", address = ma continue + if self.networkReachability == NetworkReachability.NotReachable: + self.flags.incl(ServerFlags.ReusePort) let server = createStreamServer( ma = ma, flags = self.flags, @@ -263,8 +266,13 @@ method dial*( ## trace "Dialing remote peer", address = $address + let transp = + if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0: + self.clientFlags.incl(ClientFlags.ReusePort) + await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0])) + else: + await connect(address, flags = self.clientFlags) - let transp = await connect(address, flags = self.clientFlags) try: let observedAddr = await getObservedAddr(transp) return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index a5a651d7e6..9a06a66f56 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -19,7 +19,10 @@ import ../stream/connection, ../multiaddress, ../multicodec, ../muxers/muxer, - ../upgrademngrs/upgrade + ../upgrademngrs/upgrade, + ../protocols/connectivity/autonat/core + +export core.NetworkReachability logScope: topics = "libp2p transport" @@ -33,6 +36,7 @@ type addrs*: seq[MultiAddress] running*: bool upgrader*: Upgrade + networkReachability*: NetworkReachability proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = newException(TransportClosedError, diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 6c3671f2d2..17cd0dd789 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -77,7 +77,8 @@ proc connect*( ma: MultiAddress, bufferSize = DefaultStreamBufferSize, child: StreamTransport = nil, - flags = default(set[TransportFlags])): Future[StreamTransport] + flags = default(set[ClientFlags]), + localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport] {.raises: [Defect, LPError, MaInvalidAddress].} = ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. @@ -90,7 +91,10 @@ proc connect*( let transportAddress = initTAddress(ma).tryGet() compilesOr: - return connect(transportAddress, bufferSize, child, flags) + if localAddress.isSome(): + return connect(transportAddress, flags, bufferSize, child, initTAddress(localAddress.get()).tryGet()) + else: + return connect(transportAddress, flags, bufferSize, child) do: # support for older chronos versions return connect(transportAddress, bufferSize, child)