Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix relay #827

Merged
merged 10 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix broken relays
  • Loading branch information
brad-defined committed Mar 22, 2023
commit 5d4d94e418b1be40fafee970aac9dc89e9c5b053
8 changes: 4 additions & 4 deletions control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestControl_GetHostInfoByVpnIp(t *testing.T) {
vpnIp: iputil.Ip2VpnIp(ipNet.IP),
relayState: RelayState{
relays: map[iputil.VpnIp]struct{}{},
relayForByIp: map[iputil.VpnIp]*Relay{},
relayForByIdx: map[uint32]*Relay{},
relayForByIp: map[iputil.VpnIp]*RelayWrapper{},
relayForByIdx: map[uint32]*RelayWrapper{},
},
})

Expand All @@ -77,8 +77,8 @@ func TestControl_GetHostInfoByVpnIp(t *testing.T) {
vpnIp: iputil.Ip2VpnIp(ipNet2.IP),
relayState: RelayState{
relays: map[iputil.VpnIp]struct{}{},
relayForByIp: map[iputil.VpnIp]*Relay{},
relayForByIdx: map[uint32]*Relay{},
relayForByIp: map[iputil.VpnIp]*RelayWrapper{},
relayForByIdx: map[uint32]*RelayWrapper{},
},
})

Expand Down
4 changes: 2 additions & 2 deletions handshake_ix.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func ixHandshakeStage1(f *Interface, addr *udp.Addr, via interface{}, packet []b
lastHandshakeTime: hs.Details.Time,
relayState: RelayState{
relays: map[iputil.VpnIp]struct{}{},
relayForByIp: map[iputil.VpnIp]*Relay{},
relayForByIdx: map[uint32]*Relay{},
relayForByIp: map[iputil.VpnIp]*RelayWrapper{},
relayForByIdx: map[uint32]*RelayWrapper{},
},
}

Expand Down
70 changes: 58 additions & 12 deletions hostmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ type Relay struct {
PeerIp iputil.VpnIp
}

// For synchronization, treat the Relay struct as immutable. To edit the Relay
// struct, make a copy of an existing value, edit the fileds in the copy, and
// then store a pointer to the copy in RelayWrapper.
type RelayWrapper struct {
brad-defined marked this conversation as resolved.
Show resolved Hide resolved
Relay atomic.Pointer[Relay]
}

type HostMap struct {
sync.RWMutex //Because we concurrently read and write to our maps
name string
Expand All @@ -66,9 +73,9 @@ type HostMap struct {
type RelayState struct {
sync.RWMutex

relays map[iputil.VpnIp]struct{} // Set of VpnIp's of Hosts to use as relays to access this peer
relayForByIp map[iputil.VpnIp]*Relay // Maps VpnIps of peers for which this HostInfo is a relay to some Relay info
relayForByIdx map[uint32]*Relay // Maps a local index to some Relay info
relays map[iputil.VpnIp]struct{} // Set of VpnIp's of Hosts to use as relays to access this peer
relayForByIp map[iputil.VpnIp]*RelayWrapper // Maps VpnIps of peers for which this HostInfo is a relay to some Relay info
relayForByIdx map[uint32]*RelayWrapper // Maps a local index to some Relay info
}

func (rs *RelayState) DeleteRelay(ip iputil.VpnIp) {
Expand All @@ -81,7 +88,7 @@ func (rs *RelayState) GetRelayForByIp(ip iputil.VpnIp) (*Relay, bool) {
rs.RLock()
defer rs.RUnlock()
r, ok := rs.relayForByIp[ip]
return r, ok
return r.Relay.Load(), ok
}

func (rs *RelayState) InsertRelayTo(ip iputil.VpnIp) {
Expand Down Expand Up @@ -127,29 +134,68 @@ func (rs *RelayState) RemoveRelay(localIdx uint32) (iputil.VpnIp, bool) {
if !ok {
return iputil.VpnIp(0), false
}
r := relay.Relay.Load()
delete(rs.relayForByIdx, localIdx)
delete(rs.relayForByIp, relay.PeerIp)
return relay.PeerIp, true
delete(rs.relayForByIp, r.PeerIp)
return r.PeerIp, true
}

func (rs *RelayState) CompleteRelayByIP(vpnIp iputil.VpnIp, remoteIdx uint32) bool {
rs.Lock()
defer rs.Unlock()
r, ok := rs.relayForByIp[vpnIp]
if !ok {
return false
}
newRelay := *r.Relay.Load()
newRelay.State = Established
newRelay.RemoteIndex = remoteIdx
r.Relay.Store(&newRelay)
return true
}

func (rs *RelayState) CompleteRelayByIdx(localIdx uint32, remoteIdx uint32) (*Relay, bool) {
rs.Lock()
defer rs.Unlock()
r, ok := rs.relayForByIdx[localIdx]
if !ok {
return nil, false
}
newRelay := *r.Relay.Load()
newRelay.State = Established
newRelay.RemoteIndex = remoteIdx
r.Relay.Store(&newRelay)
return &newRelay, true
}

func (rs *RelayState) QueryRelayForByIp(vpnIp iputil.VpnIp) (*Relay, bool) {
rs.RLock()
defer rs.RUnlock()
r, ok := rs.relayForByIp[vpnIp]
return r, ok
var ret *Relay
if ok {
ret = r.Relay.Load()
}
return ret, ok
}

func (rs *RelayState) QueryRelayForByIdx(idx uint32) (*Relay, bool) {
rs.RLock()
defer rs.RUnlock()
r, ok := rs.relayForByIdx[idx]
return r, ok
var ret *Relay
if ok {
ret = r.Relay.Load()
}
return ret, ok
}
func (rs *RelayState) InsertRelay(ip iputil.VpnIp, idx uint32, r *Relay) {
rs.Lock()
defer rs.Unlock()
rs.relayForByIp[ip] = r
rs.relayForByIdx[idx] = r
rw := &RelayWrapper{}
rw.Relay.Store(r)
rs.relayForByIp[ip] = rw
rs.relayForByIdx[idx] = rw
}

type HostInfo struct {
Expand Down Expand Up @@ -297,8 +343,8 @@ func (hm *HostMap) AddVpnIp(vpnIp iputil.VpnIp, init func(hostinfo *HostInfo)) (
HandshakePacket: make(map[uint8][]byte, 0),
relayState: RelayState{
relays: map[iputil.VpnIp]struct{}{},
relayForByIp: map[iputil.VpnIp]*Relay{},
relayForByIdx: map[uint32]*Relay{},
relayForByIp: map[iputil.VpnIp]*RelayWrapper{},
relayForByIdx: map[uint32]*RelayWrapper{},
},
}
if init != nil {
Expand Down
12 changes: 4 additions & 8 deletions outside.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by
relay, ok := hostinfo.relayState.QueryRelayForByIdx(h.RemoteIndex)
if !ok {
// The only way this happens is if hostmap has an index to the correct HostInfo, but the HostInfo is missing
// its internal mapping. This shouldn't happen!
hostinfo.logger(f.l).WithField("hostinfo", hostinfo.vpnIp).WithField("remoteIndex", h.RemoteIndex).Errorf("HostInfo missing remote index")
// Delete my local index from the hostmap
f.hostMap.DeleteRelayIdx(h.RemoteIndex)
// When the peer doesn't receive any return traffic, its connection_manager will eventually clean up
// the broken relay when it cleans up the associated HostInfo object.
// its internal mapping. This should never happen.
hostinfo.logger(f.l).WithFields(logrus.Fields{"hostinfo": hostinfo.vpnIp, "remoteIndex": h.RemoteIndex}).Error("HostInfo missing remote relay index")
brad-defined marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand All @@ -114,7 +110,7 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by
// find the target Relay info object
targetRelay, ok := targetHI.relayState.QueryRelayForByIp(hostinfo.vpnIp)
if !ok {
hostinfo.logger(f.l).WithField("peerIp", relay.PeerIp).Info("Failed to find relay in hostinfo")
hostinfo.logger(f.l).WithFields(logrus.Fields{"peerIp": relay.PeerIp, "hostInfo": hostinfo.vpnIp}).Info("Failed to find relay in hostinfo")
return
}

Expand All @@ -130,7 +126,7 @@ func (f *Interface) readOutsidePackets(addr *udp.Addr, via interface{}, out []by
hostinfo.logger(f.l).Error("Unexpected Relay Type of Terminal")
}
} else {
hostinfo.logger(f.l).WithField("targetRelayState", targetRelay.State).Info("Unexpected target relay state")
hostinfo.logger(f.l).WithFields(logrus.Fields{"peerIp": relay.PeerIp, "hostInfo": hostinfo.vpnIp, "targetRelayState": targetRelay.State}).Info("Unexpected target relay state")
return
}
}
Expand Down
107 changes: 70 additions & 37 deletions relay_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,14 @@ func AddRelay(l *logrus.Logger, relayHostInfo *HostInfo, hm *HostMap, vpnIp iput

// EstablishRelay updates a Requested Relay to become an Established Relay, which can pass traffic.
func (rm *relayManager) EstablishRelay(relayHostInfo *HostInfo, m *NebulaControl) (*Relay, error) {
relay, ok := relayHostInfo.relayState.QueryRelayForByIdx(m.InitiatorRelayIndex)
relay, ok := relayHostInfo.relayState.CompleteRelayByIdx(m.InitiatorRelayIndex, m.ResponderRelayIndex)
if !ok {
rm.l.WithFields(logrus.Fields{"relayHostInfo": relayHostInfo.vpnIp,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think relayHostInfo -> relayVpnIp.

Probably would be good to do a separate pass to make sure all logs are using the same field names.

"initiatorRelayIndex": m.InitiatorRelayIndex,
"relayFrom": m.RelayFromIp,
"relayTo": m.RelayToIp}).Info("relayManager EstablishRelay relayForByIdx not found")
"relayTo": m.RelayToIp}).Info("relayManager failed to update relay")
return nil, fmt.Errorf("unknown relay")
}
// relay deserves some synchronization
relay.RemoteIndex = m.ResponderRelayIndex
relay.State = Established

return relay, nil
}
Expand Down Expand Up @@ -158,41 +155,60 @@ func (rm *relayManager) handleCreateRelayResponse(h *HostInfo, f *Interface, m *
WithError(err).Error("relayManager Failed to marhsal Control CreateRelayResponse message to create relay")
} else {
f.SendMessageToVpnIp(header.Control, 0, peerHostInfo.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
"relayFrom": iputil.VpnIp(resp.RelayFromIp),
"relayTarget": iputil.VpnIp(resp.RelayToIp),
"initiatorIdx": resp.InitiatorRelayIndex,
"responderIdx": resp.ResponderRelayIndex,
"hostInfo": peerHostInfo.vpnIp}).
brad-defined marked this conversation as resolved.
Show resolved Hide resolved
Info("send CreateRelayResponse")
}
}

func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *NebulaControl) {
rm.l.WithFields(logrus.Fields{
"relayFrom": iputil.VpnIp(m.RelayFromIp),
"relayTarget": iputil.VpnIp(m.RelayToIp),
"initiatorIdx": m.InitiatorRelayIndex,
"hostInfo": h.vpnIp}).
Info("handleCreateRelayRequest")

from := iputil.VpnIp(m.RelayFromIp)
target := iputil.VpnIp(m.RelayToIp)

logMsg := rm.l.WithFields(logrus.Fields{
"relayFrom": from,
"relayTarget": target,
"initiatorIdx": m.InitiatorRelayIndex,
"hostInfo": h.vpnIp})

logMsg.Info("handleCreateRelayRequest")
// Is the target of the relay me?
if target == f.myVpnIp {
existingRelay, ok := h.relayState.QueryRelayForByIp(from)
addRelay := !ok
if ok {
// Clean up existing relay, if this is a new request.
if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
// We got a brand new Relay request, because its index is different than what we saw before.
// Clean up the existing Relay state, and get ready to record new Relay state.
rm.hostmap.RemoveRelay(existingRelay.LocalIndex)
addRelay = true
switch existingRelay.State {
case Requested:
ok = h.relayState.CompleteRelayByIP(from, m.InitiatorRelayIndex)
if !ok {
logMsg.Error("Relay State not found")
return
}
case Established:
if existingRelay.RemoteIndex != m.InitiatorRelayIndex {
// We got a brand new Relay request, because its index is different than what we saw before.
// This should never happen. The peer should never change an index, once created.
logMsg.WithFields(logrus.Fields{
"existingRemoteIdx": existingRelay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
return
}
}
}
if addRelay {
} else {
_, err := AddRelay(rm.l, h, f.hostMap, from, &m.InitiatorRelayIndex, TerminalType, Established)
if err != nil {
logMsg.WithError(err).Error("Failed to add relay")
return
}
}

relay, ok := h.relayState.QueryRelayForByIp(from)
if ok && m.InitiatorRelayIndex != relay.RemoteIndex {
// Do something, Something happened.
if !ok {
logMsg.Error("Relay State not found")
return
}

resp := NebulaControl{
Expand All @@ -204,15 +220,22 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N
}
msg, err := resp.Marshal()
if err != nil {
rm.l.
logMsg.
WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
} else {
f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
"relayFrom": iputil.VpnIp(resp.RelayFromIp),
"relayTarget": iputil.VpnIp(resp.RelayToIp),
"initiatorIdx": resp.InitiatorRelayIndex,
"responderIdx": resp.ResponderRelayIndex,
"hostInfo": h.vpnIp}).
Info("send CreateRelayResponse")
}
return
} else {
// the target is not me. Create a relay to the target, from me.
if rm.GetAmRelay() == false {
if !rm.GetAmRelay() {
return
}
peer, err := rm.hostmap.QueryVpnIp(target)
Expand Down Expand Up @@ -252,10 +275,17 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N
}
msg, err := req.Marshal()
if err != nil {
rm.l.
logMsg.
WithError(err).Error("relayManager Failed to marshal Control message to create relay")
} else {
f.SendMessageToVpnIp(header.Control, 0, target, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
"relayFrom": iputil.VpnIp(req.RelayFromIp),
"relayTarget": iputil.VpnIp(req.RelayToIp),
"initiatorIdx": req.InitiatorRelayIndex,
"responderIdx": req.ResponderRelayIndex,
"hostInfo": target}).
Info("send CreateRelayRequest")
}
}
// Also track the half-created Relay state just received
Expand All @@ -268,24 +298,20 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N
}
_, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, state)
if err != nil {
rm.l.
logMsg.
WithError(err).Error("relayManager Failed to allocate a local index for relay")
return
}
} else {
if relay.RemoteIndex != m.InitiatorRelayIndex {
// This is a stale Relay entry for the same tunnel targets.
// Clean up the existing stuff.
rm.RemoveRelay(relay.LocalIndex)
// Add the new relay
_, err := AddRelay(rm.l, h, f.hostMap, target, &m.InitiatorRelayIndex, ForwardingType, Requested)
if err != nil {
return
}
relay, _ = h.relayState.QueryRelayForByIp(target)
}
switch relay.State {
case Established:
if relay.RemoteIndex != m.InitiatorRelayIndex {
// We got a brand new Relay request, because its index is different than what we saw before.
// This should never happen. The peer should never change an index, once created.
logMsg.WithFields(logrus.Fields{
"existingRemoteIdx": relay.RemoteIndex}).Error("Existing relay mismatch with CreateRelayRequest")
return
}
resp := NebulaControl{
Type: NebulaControl_CreateRelayResponse,
ResponderRelayIndex: relay.LocalIndex,
Expand All @@ -299,6 +325,13 @@ func (rm *relayManager) handleCreateRelayRequest(h *HostInfo, f *Interface, m *N
WithError(err).Error("relayManager Failed to marshal Control CreateRelayResponse message to create relay")
} else {
f.SendMessageToVpnIp(header.Control, 0, h.vpnIp, msg, make([]byte, 12), make([]byte, mtu))
rm.l.WithFields(logrus.Fields{
"relayFrom": iputil.VpnIp(resp.RelayFromIp),
"relayTarget": iputil.VpnIp(resp.RelayToIp),
"initiatorIdx": resp.InitiatorRelayIndex,
"responderIdx": resp.ResponderRelayIndex,
"hostInfo": h.vpnIp}).
Info("send CreateRelayResponse")
}

case Requested:
Expand Down