Skip to content

Commit

Permalink
Exchange proxy mode via signal (netbirdio#727)
Browse files Browse the repository at this point in the history
Before defining if we will use direct or proxy connection we will exchange a 
message with the other peer if the modes match we keep the decision 
from the shouldUseProxy function otherwise we skip using direct connection.

Added a feature support message to the signal protocol
  • Loading branch information
mlsmaycon authored Mar 16, 2023
1 parent 6143b81 commit 731d3ae
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 54 deletions.
30 changes: 30 additions & 0 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp
return nil
}

func sendSignal(message *sProto.Message, s signal.Client) error {
return s.Send(message)
}

// SignalOfferAnswer signals either an offer or an answer to remote peer
func SignalOfferAnswer(offerAnswer peer.OfferAnswer, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error {
var t sProto.Body_Type
Expand All @@ -369,6 +373,10 @@ func SignalOfferAnswer(offerAnswer peer.OfferAnswer, myKey wgtypes.Key, remoteKe
if err != nil {
return err
}

// indicates message support in gRPC
msg.Body.FeaturesSupported = []uint32{signal.DirectCheck}

err = s.Send(msg)
if err != nil {
return err
Expand Down Expand Up @@ -827,6 +835,9 @@ func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, er
peerConn.SetSignalCandidate(signalCandidate)
peerConn.SetSignalOffer(signalOffer)
peerConn.SetSignalAnswer(signalAnswer)
peerConn.SetSendSignalMessage(func(message *sProto.Message) error {
return sendSignal(message, e.signal)
})

return peerConn, nil
}
Expand All @@ -850,6 +861,9 @@ func (e *Engine) receiveSignalEvents() {
if err != nil {
return err
}

conn.RegisterProtoSupportMeta(msg.Body.GetFeaturesSupported())

conn.OnRemoteOffer(peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
Expand All @@ -863,6 +877,9 @@ func (e *Engine) receiveSignalEvents() {
if err != nil {
return err
}

conn.RegisterProtoSupportMeta(msg.Body.GetFeaturesSupported())

conn.OnRemoteAnswer(peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
Expand All @@ -878,6 +895,19 @@ func (e *Engine) receiveSignalEvents() {
return err
}
conn.OnRemoteCandidate(candidate)
case sProto.Body_MODE:
protoMode := msg.GetBody().GetMode()
if protoMode == nil {
return fmt.Errorf("received an empty mode message")
}

err := conn.OnModeMessage(peer.ModeMessage{
Direct: protoMode.GetDirect(),
})
if err != nil {
log.Errorf("failed processing a mode message -> %s", err)
return err
}
}

return nil
Expand Down
113 changes: 101 additions & 12 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peer

import (
"context"
"fmt"
"net"
"strings"
"sync"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/netbirdio/netbird/client/internal/proxy"
"github.com/netbirdio/netbird/iface"
"github.com/netbirdio/netbird/version"
signal "github.com/netbirdio/netbird/signal/client"
sProto "github.com/netbirdio/netbird/signal/proto"
)

// ConnConfig is a peer Connection configuration
Expand Down Expand Up @@ -69,8 +72,9 @@ type Conn struct {
// signalCandidate is a handler function to signal remote peer about local connection candidate
signalCandidate func(candidate ice.Candidate) error
// signalOffer is a handler function to signal remote peer our connection offer (credentials)
signalOffer func(OfferAnswer) error
signalAnswer func(OfferAnswer) error
signalOffer func(OfferAnswer) error
signalAnswer func(OfferAnswer) error
sendSignalMessage func(message *sProto.Message) error

// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
Expand All @@ -85,7 +89,20 @@ type Conn struct {

statusRecorder *Status

proxy proxy.Proxy
proxy proxy.Proxy
remoteModeCh chan ModeMessage
meta meta
}

// meta holds meta information about a connection
type meta struct {
protoSupport signal.FeaturesSupport
}

// ModeMessage represents a connection mode chosen by the peer
type ModeMessage struct {
// Direct indicates that it decided to use a direct connection
Direct bool
}

// GetConf returns the connection config
Expand All @@ -109,6 +126,7 @@ func NewConn(config ConnConfig, statusRecorder *Status) (*Conn, error) {
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
statusRecorder: statusRecorder,
remoteModeCh: make(chan ModeMessage, 1),
}, nil
}

Expand Down Expand Up @@ -366,15 +384,7 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error {
}

peerState := State{PubKey: conn.config.Key}
useProxy := shouldUseProxy(pair)
var p proxy.Proxy
if useProxy {
p = proxy.NewWireguardProxy(conn.config.ProxyConfig)
peerState.Direct = false
} else {
p = proxy.NewNoProxy(conn.config.ProxyConfig, remoteWgPort)
peerState.Direct = true
}
p := conn.getProxyWithMessageExchange(pair, remoteWgPort)
conn.proxy = p
err = p.Start(remoteConn)
if err != nil {
Expand All @@ -390,6 +400,7 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error {
if pair.Local.Type() == ice.CandidateTypeRelay || pair.Remote.Type() == ice.CandidateTypeRelay {
peerState.Relayed = true
}
peerState.Direct = p.Type() == proxy.TypeNoProxy

err = conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
Expand All @@ -399,6 +410,63 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error {
return nil
}

func (conn *Conn) getProxyWithMessageExchange(pair *ice.CandidatePair, remoteWgPort int) proxy.Proxy {

useProxy := shouldUseProxy(pair)
localDirectMode := !useProxy
remoteDirectMode := localDirectMode

if conn.meta.protoSupport.DirectCheck {
go conn.sendLocalDirectMode(localDirectMode)
// will block until message received or timeout
remoteDirectMode = conn.receiveRemoteDirectMode()
}

if localDirectMode && remoteDirectMode {
log.Debugf("using WireGuard direct mode with peer %s", conn.config.Key)
return proxy.NewNoProxy(conn.config.ProxyConfig, remoteWgPort)
}

log.Debugf("falling back to local proxy mode with peer %s", conn.config.Key)
return proxy.NewWireguardProxy(conn.config.ProxyConfig)
}

func (conn *Conn) sendLocalDirectMode(localMode bool) {
// todo what happens when we couldn't deliver this message?
// we could retry, etc but there is no guarantee

err := conn.sendSignalMessage(&sProto.Message{
Key: conn.config.LocalKey,
RemoteKey: conn.config.Key,
Body: &sProto.Body{
Type: sProto.Body_MODE,
Mode: &sProto.Mode{
Direct: &localMode,
},
NetBirdVersion: version.NetbirdVersion(),
},
})
if err != nil {
log.Errorf("failed to send local proxy mode to remote peer %s, error: %s", conn.config.Key, err)
}
}

func (conn *Conn) receiveRemoteDirectMode() bool {
timeout := time.Second
timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case receivedMSG := <-conn.remoteModeCh:
return receivedMSG.Direct
case <-timer.C:
// we didn't receive a message from remote so we assume that it supports the direct mode to keep the old behaviour
log.Debugf("timeout after %s while waiting for remote direct mode message from remote peer %s",
timeout, conn.config.Key)
return true
}
}

// cleanup closes all open resources and sets status to StatusDisconnected
func (conn *Conn) cleanup() error {
log.Debugf("trying to cleanup %s", conn.config.Key)
Expand Down Expand Up @@ -459,6 +527,11 @@ func (conn *Conn) SetSignalCandidate(handler func(candidate ice.Candidate) error
conn.signalCandidate = handler
}

// SetSendSignalMessage sets a handler function to be triggered by Conn when there is new message to send via signal
func (conn *Conn) SetSendSignalMessage(handler func(message *sProto.Message) error) {
conn.sendSignalMessage = handler
}

// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates
// and then signals them to the remote peer
func (conn *Conn) onICECandidate(candidate ice.Candidate) {
Expand Down Expand Up @@ -613,3 +686,19 @@ func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate) {
func (conn *Conn) GetKey() string {
return conn.config.Key
}

// OnModeMessage unmarshall the payload message and send it to the mode message channel
func (conn *Conn) OnModeMessage(message ModeMessage) error {
select {
case conn.remoteModeCh <- message:
return nil
default:
return fmt.Errorf("unable to process mode message: channel busy")
}
}

// RegisterProtoSupportMeta register supported proto message in the connection metadata
func (conn *Conn) RegisterProtoSupportMeta(support []uint32) {
protoSupport := signal.ParseFeaturesSupported(support)
conn.meta.protoSupport = protoSupport
}
109 changes: 109 additions & 0 deletions client/internal/peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (

"github.com/magiconair/properties/assert"
"github.com/pion/ice/v2"
"golang.org/x/sync/errgroup"

"github.com/netbirdio/netbird/client/internal/proxy"
"github.com/netbirdio/netbird/iface"
sproto "github.com/netbirdio/netbird/signal/proto"
)

var connConf = ConnConfig{
Expand Down Expand Up @@ -329,3 +331,110 @@ func TestConn_ShouldUseProxy(t *testing.T) {
})
}
}

func TestGetProxyWithMessageExchange(t *testing.T) {
publicHostCandidate := &mockICECandidate{
AddressFunc: func() string {
return "8.8.8.8"
},
TypeFunc: func() ice.CandidateType {
return ice.CandidateTypeHost
},
}
relayCandidate := &mockICECandidate{
AddressFunc: func() string {
return "1.1.1.1"
},
TypeFunc: func() ice.CandidateType {
return ice.CandidateTypeRelay
},
}

testCases := []struct {
name string
candatePair *ice.CandidatePair
inputDirectModeSupport bool
inputRemoteModeMessage bool
expected proxy.Type
}{
{
name: "Should Result In Using Wireguard Proxy When Local Eval Is Use Proxy",
candatePair: &ice.CandidatePair{
Local: relayCandidate,
Remote: publicHostCandidate,
},
inputDirectModeSupport: true,
inputRemoteModeMessage: true,
expected: proxy.TypeWireguard,
},
{
name: "Should Result In Using Wireguard Proxy When Remote Eval Is Use Proxy",
candatePair: &ice.CandidatePair{
Local: publicHostCandidate,
Remote: publicHostCandidate,
},
inputDirectModeSupport: true,
inputRemoteModeMessage: false,
expected: proxy.TypeWireguard,
},
{
name: "Should Result In Using Wireguard Proxy When Remote Direct Mode Support Is False And Local Eval Is Use Proxy",
candatePair: &ice.CandidatePair{
Local: relayCandidate,
Remote: publicHostCandidate,
},
inputDirectModeSupport: false,
inputRemoteModeMessage: false,
expected: proxy.TypeWireguard,
},
{
name: "Should Result In Using Direct When Remote Direct Mode Support Is False And Local Eval Is No Use Proxy",
candatePair: &ice.CandidatePair{
Local: publicHostCandidate,
Remote: publicHostCandidate,
},
inputDirectModeSupport: false,
inputRemoteModeMessage: false,
expected: proxy.TypeNoProxy,
},
{
name: "Should Result In Using Direct When Local And Remote Eval Is No Proxy",
candatePair: &ice.CandidatePair{
Local: publicHostCandidate,
Remote: publicHostCandidate,
},
inputDirectModeSupport: true,
inputRemoteModeMessage: true,
expected: proxy.TypeNoProxy,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
g := errgroup.Group{}
conn, err := NewConn(connConf, nil)
if err != nil {
t.Fatal(err)
}
conn.meta.protoSupport.DirectCheck = testCase.inputDirectModeSupport
conn.SetSendSignalMessage(func(message *sproto.Message) error {
return nil
})

g.Go(func() error {
return conn.OnModeMessage(ModeMessage{
Direct: testCase.inputRemoteModeMessage,
})
})

resultProxy := conn.getProxyWithMessageExchange(testCase.candatePair, 1000)

err = g.Wait()
if err != nil {
t.Error(err)
}
if resultProxy.Type() != testCase.expected {
t.Errorf("result didn't match expected value: Expected: %s, Got: %s", testCase.expected, resultProxy.Type())
}
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
go.opentelemetry.io/otel/metric v0.33.0
go.opentelemetry.io/otel/sdk/metric v0.33.0
golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -128,7 +129,6 @@ require (
golang.org/x/exp v0.0.0-20220518171630-0b5c67f07fdf // indirect
golang.org/x/image v0.0.0-20200430140353-33d19683fad8 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.zx2c4.com/go118/netip v0.0.0-20211111135330-a4a02eeacf9d // indirect
Expand Down
Loading

0 comments on commit 731d3ae

Please sign in to comment.