Skip to content

Commit

Permalink
feat(swarm): delay /webrtc-direct dials by 1 second (libp2p#3078)
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt authored Dec 5, 2024
1 parent 084ebf3 commit 31b9d3a
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 8 deletions.
35 changes: 28 additions & 7 deletions p2p/net/swarm/dial_ranker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ const (

// RelayDelay is the duration by which relay dials are delayed relative to direct addresses
RelayDelay = 500 * time.Millisecond

// delay for other transport addresses. This will apply to /webrtc-direct.
PublicOtherDelay = 1 * time.Second
PrivateOtherDelay = 100 * time.Millisecond
)

// NoDelayDialRanker ranks addresses with no delay. This is useful for simultaneous connect requests.
func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
return getAddrDelay(addrs, 0, 0, 0)
return getAddrDelay(addrs, 0, 0, 0, 0)
}

// DefaultDialRanker determines the ranking of outgoing connection attempts.
Expand Down Expand Up @@ -67,8 +71,11 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// 5. If only one of TCP IPv6 or TCP IPv4 addresses are present, dial the TCP address with the lowest port
// first. After this we dial the rest of the TCP addresses delayed by 250ms (PublicTCPDelay) for public
// addresses, and 30ms (PrivateTCPDelay) for local addresses.
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PrivateTCPDelay
// 6. When a TCP socket is connected and awaiting security and muxer upgrade, we stop new dials for 2*PublicTCPDelay
// to allow for the upgrade to complete.
// 7. WebRTC Direct, and other IP transport addresses are dialed 1 second after the last QUIC or TCP dial.
// We only ever need to dial these if the peer doesn't have any other transport available, in which
// case these are dialed immediately.
//
// We dial lowest ports first as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
Expand All @@ -83,13 +90,18 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
}

res := make([]network.AddrDelay, 0, len(addrs))
res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, PrivateOtherDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, PublicOtherDelay, relayOffset)...)
var maxDelay time.Duration
if len(res) > 0 {
maxDelay = res[len(res)-1].Delay
}

for i := 0; i < len(addrs); i++ {
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: 0})
res = append(res, network.AddrDelay{Addr: addrs[i], Delay: maxDelay + PublicOtherDelay})
}

res = append(res, getAddrDelay(pvt, PrivateTCPDelay, PrivateQUICDelay, 0)...)
res = append(res, getAddrDelay(public, PublicTCPDelay, PublicQUICDelay, 0)...)
res = append(res, getAddrDelay(relay, PublicTCPDelay, PublicQUICDelay, relayOffset)...)
return res
}

Expand All @@ -98,7 +110,7 @@ func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// offset is used to delay all addresses by a fixed duration. This is useful for delaying all relay
// addresses relative to direct addresses.
func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.Duration,
offset time.Duration) []network.AddrDelay {
otherDelay time.Duration, offset time.Duration) []network.AddrDelay {
if len(addrs) == 0 {
return nil
}
Expand Down Expand Up @@ -158,6 +170,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D

res := make([]network.AddrDelay, 0, len(addrs))
var tcpFirstDialDelay time.Duration
var lastQUICOrTCPDelay time.Duration
for i, addr := range addrs {
var delay time.Duration
switch {
Expand All @@ -176,6 +189,7 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
delay = quicDelay
}
}
lastQUICOrTCPDelay = delay
tcpFirstDialDelay = delay + tcpDelay
case isProtocolAddr(addr, ma.P_TCP):
// We dial an IPv6 address, then after tcpDelay an IPv4
Expand All @@ -193,6 +207,10 @@ func getAddrDelay(addrs []ma.Multiaddr, tcpDelay time.Duration, quicDelay time.D
}
}
delay += tcpFirstDialDelay
lastQUICOrTCPDelay = delay
// if it's neither quic, webtransport, tcp, or websocket address
default:
delay = lastQUICOrTCPDelay + otherDelay
}
res = append(res, network.AddrDelay{Addr: addr, Delay: offset + delay})
}
Expand Down Expand Up @@ -230,6 +248,9 @@ func score(a ma.Multiaddr) int {
pi, _ := strconv.Atoi(p)
return ip4Weight + pi + (1 << 20)
}
if _, err := a.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil {
return 1 << 21
}
return (1 << 30)
}

Expand Down
70 changes: 69 additions & 1 deletion p2p/net/swarm/dial_ranker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNoDelayDialRanker(t *testing.T) {
q3v1 := ma.StringCast("/ip4/1.2.3.4/udp/3/quic-v1")
q4 := ma.StringCast("/ip4/1.2.3.4/udp/4/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
wrtc1 := ma.StringCast("/ip4/1.1.1.1/udp/1/webrtc-direct")

testCase := []struct {
name string
Expand All @@ -37,7 +38,7 @@ func TestNoDelayDialRanker(t *testing.T) {
}{
{
name: "quic+webtransport filtered when quicv1",
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1},
addrs: []ma.Multiaddr{q1, q2, q3, q4, q1v1, q2v1, q3v1, wt1, t1, wrtc1},
output: []network.AddrDelay{
{Addr: q1, Delay: 0},
{Addr: q2, Delay: 0},
Expand All @@ -48,6 +49,7 @@ func TestNoDelayDialRanker(t *testing.T) {
{Addr: q3v1, Delay: 0},
{Addr: wt1, Delay: 0},
{Addr: t1, Delay: 0},
{Addr: wrtc1, Delay: 0},
},
},
}
Expand Down Expand Up @@ -287,3 +289,69 @@ func TestDelayRankerRelay(t *testing.T) {
})
}
}

func TestDelayRankerOtherTransportDelay(t *testing.T) {
q1v1 := ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1")
q1v16 := ma.StringCast("/ip6/1::2/udp/1/quic-v1")
t1 := ma.StringCast("/ip4/1.2.3.5/tcp/1/")
t1v6 := ma.StringCast("/ip6/1::2/tcp/1")
wrtc1 := ma.StringCast("/ip4/1.2.3.4/udp/1/webrtc-direct")
wrtc1v6 := ma.StringCast("/ip6/1::2/udp/1/webrtc-direct")
onion1 := ma.StringCast("/onion3/vww6ybal4bd7szmgncyruucpgfkqahzddi37ktceo3ah7ngmcopnpyyd:1234")
onlyIP := ma.StringCast("/ip4/1.2.3.4/")
testCase := []struct {
name string
addrs []ma.Multiaddr
output []network.AddrDelay
}{
{
name: "quic-with-other",
addrs: []ma.Multiaddr{q1v1, q1v16, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v16, Delay: 0},
{Addr: q1v1, Delay: PublicQUICDelay},
{Addr: wrtc1, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "quic-and-tcp-with-other",
addrs: []ma.Multiaddr{q1v1, t1, t1v6, wrtc1, wrtc1v6, onion1, onlyIP},
output: []network.AddrDelay{
{Addr: q1v1, Delay: 0},
{Addr: t1v6, Delay: PublicQUICDelay},
{Addr: t1, Delay: 2 * PublicQUICDelay},
{Addr: wrtc1, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: wrtc1v6, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onlyIP, Delay: 2*PublicQUICDelay + PublicOtherDelay},
{Addr: onion1, Delay: 2*PublicQUICDelay + 2*PublicOtherDelay},
},
},
{
name: "only-non-ip-addr",
addrs: []ma.Multiaddr{onion1},
output: []network.AddrDelay{
{Addr: onion1, Delay: PublicOtherDelay},
},
},
}
for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
res := DefaultDialRanker(tc.addrs)
if len(res) != len(tc.output) {
log.Errorf("expected %s got %s", tc.output, res)
t.Errorf("expected elems: %d got: %d", len(tc.output), len(res))
return
}
sortAddrDelays(res)
sortAddrDelays(tc.output)
for i := 0; i < len(tc.output); i++ {
if !tc.output[i].Addr.Equal(res[i].Addr) || tc.output[i].Delay != res[i].Delay {
t.Fatalf("expected %+v got %+v", tc.output, res)
}
}
})
}
}
4 changes: 4 additions & 0 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swarm

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -295,6 +296,9 @@ loop:
}
ad.dialed = true
ad.dialRankingDelay = now.Sub(ad.createdAt)
if _, err := ad.addr.ValueForProtocol(ma.P_WEBRTC_DIRECT); err == nil {
fmt.Println("dial ranking delay", ad.addr, ad.dialRankingDelay)
}
err := w.s.dialNextAddr(ad.ctx, w.peer, ad.addr, w.resch)
if err != nil {
// Errored without attempting a dial. This happens in case of
Expand Down
69 changes: 69 additions & 0 deletions p2p/test/basichost/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -199,3 +202,69 @@ func TestAddrFactorCertHashAppend(t *testing.T) {
return hasWebRTC && hasWebTransport
}, 5*time.Second, 100*time.Millisecond)
}

func TestOnlyWebRTCDirectDialNoDelay(t *testing.T) {
// This tests that only webrtc-direct dials are dialled immediately
// and not delayed by dial ranker.
h1, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/0/webrtc-direct",
),
)
require.NoError(t, err)
h2, err := libp2p.New(
libp2p.Transport(libp2pwebrtc.New),
libp2p.NoListenAddrs,
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), swarm.PrivateOtherDelay-10*time.Millisecond)
defer cancel()
err = h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()})
require.NoError(t, err)
}

func TestWebRTCWithQUICManyConnections(t *testing.T) {
// Correctly fixes: https://github.com/libp2p/js-libp2p/issues/2805

// The server has both /quic-v1 and /webrtc-direct listen addresses
h, err := libp2p.New(
libp2p.Transport(libp2pquic.NewTransport),
libp2p.Transport(libp2pwebrtc.New),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/quic-v1"),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/udp/0/webrtc-direct"),
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)
defer h.Close()

const N = 200
// These N dialers have both /quic-v1 and /webrtc-direct transports
var dialers [N]host.Host
for i := 0; i < N; i++ {
dialers[i], err = libp2p.New(libp2p.NoListenAddrs)
require.NoError(t, err)
defer dialers[i].Close()
}
// This dialer has only /webrtc-direct transport
d, err := libp2p.New(libp2p.Transport(libp2pwebrtc.New), libp2p.NoListenAddrs)
require.NoError(t, err)
defer d.Close()

for i := 0; i < N; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// With happy eyeballs these dialers will connect over only /quic-v1
// and not stall the /webrtc-direct handshake goroutines.
// it is fine if the dial fails, we just want to ensure that there's space
// in the /webrtc-direct listen queue
_ = dialers[i].Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The webrtc only dialer should be able to connect to the peer
err = d.Connect(ctx, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
}

0 comments on commit 31b9d3a

Please sign in to comment.