Skip to content

Commit

Permalink
Make sure the TCP connection is closed when a peer is deleted
Browse files Browse the repository at this point in the history
It could be that a peer gets deleted and added back during the
transition from active state to open confirm. In that case, the TCP
connection from the old version of the peer could still be up. This
is a problem if the server is a listener only as the remote peer
would consider the old TCP connection as being valid and it won't
be able to connect until the TCP connection is eventually cleaned
by the Golang GC.
  • Loading branch information
hardeker committed Dec 4, 2023
1 parent 95cf2c1 commit 08529d6
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 3 deletions.
23 changes: 23 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,29 @@ func (s *BgpServer) Serve() {
"Key": addr,
"State": state})

if state == bgp.BGP_FSM_ACTIVE {
var conn net.Conn
select {
case conn = <-fsm.connCh:
default:
if fsm.conn != nil {
conn = fsm.conn
fsm.conn = nil
}
}
if conn != nil {
err := conn.Close()
if err != nil {
s.logger.Error("failed to close existing tcp connection",
log.Fields{
"Topic": "Peer",
"Key": addr,
"State": state})
}
}
}
close(fsm.connCh)

if fsm.state == bgp.BGP_FSM_ESTABLISHED {
s.notifyWatcher(watchEventTypePeerState, &watchEventPeer{
PeerAS: fsm.peerInfo.AS,
Expand Down
121 changes: 118 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,26 @@ func TestListPolicyAssignment(t *testing.T) {
assert.Equal(4, len(ps))
}

func waitEstablished(s *BgpServer, ch chan struct{}) {
s.WatchEvent(context.Background(), &api.WatchEventRequest{Peer: &api.WatchEventRequest_Peer{}}, func(r *api.WatchEventResponse) {
func waitState(s *BgpServer, ch chan struct{}, state api.PeerState_SessionState) {
watchCtx, watchCancel := context.WithCancel(context.Background())
s.WatchEvent(watchCtx, &api.WatchEventRequest{Peer: &api.WatchEventRequest_Peer{}}, func(r *api.WatchEventResponse) {
if peer := r.GetPeer(); peer != nil {
if peer.Type == api.WatchEventResponse_PeerEvent_STATE && peer.Peer.State.SessionState == api.PeerState_ESTABLISHED {
if peer.Type == api.WatchEventResponse_PeerEvent_STATE && peer.Peer.State.SessionState == state {
close(ch)
watchCancel()
}
}
})
}

func waitActive(s *BgpServer, ch chan struct{}) {
waitState(s, ch, api.PeerState_ACTIVE)
}

func waitEstablished(s *BgpServer, ch chan struct{}) {
waitState(s, ch, api.PeerState_ESTABLISHED)
}

func TestListPathEnableFiltered(test *testing.T) {
assert := assert.New(test)
s := NewBgpServer()
Expand Down Expand Up @@ -1141,6 +1151,111 @@ func TestGracefulRestartTimerExpired(t *testing.T) {
}
}

func TestTcpConnectionClosedAfterPeerDel(t *testing.T) {
assert := assert.New(t)
s1 := NewBgpServer()
go s1.Serve()
err := s1.StartBgp(context.Background(), &api.StartBgpRequest{
Global: &api.Global{
Asn: 1,
RouterId: "1.1.1.1",
ListenPort: 10179,
},
})
assert.Nil(err)
defer s1.StopBgp(context.Background(), &api.StopBgpRequest{})

p1 := &api.Peer{
Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAsn: 2,
},
Transport: &api.Transport{
PassiveMode: true,
},
}

activeCh := make(chan struct{})
go waitActive(s1, activeCh)
err = s1.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1})
assert.Nil(err)
<-activeCh

// We delete the peer incoming channel from the server list so that we can
// intercept the transition from ACTIVE state to OPENSENT state.
neighbor1 := s1.neighborMap[p1.Conf.NeighborAddress]
incoming := neighbor1.fsm.incomingCh
err = s1.mgmtOperation(func() error {
s1.delIncoming(incoming)
return nil
}, true)
assert.Nil(err)

s2 := NewBgpServer()
go s2.Serve()
err = s2.StartBgp(context.Background(), &api.StartBgpRequest{
Global: &api.Global{
Asn: 2,
RouterId: "2.2.2.2",
ListenPort: -1,
},
})
require.NoError(t, err)
defer s2.StopBgp(context.Background(), &api.StopBgpRequest{})

p2 := &api.Peer{
Conf: &api.PeerConf{
NeighborAddress: "127.0.0.1",
PeerAsn: 1,
},
Transport: &api.Transport{
RemotePort: 10179,
},
Timers: &api.Timers{
Config: &api.TimersConfig{
ConnectRetry: 1,
IdleHoldTimeAfterReset: 1,
},
},
}

err = s2.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p2})
assert.Nil(err)

// Wait for the s1 to receive the tcp connection from s2.
ev := <-incoming.Out()
msg := ev.(*fsmMsg)
nextState := msg.MsgData.(bgp.FSMState)
assert.Equal(nextState, bgp.BGP_FSM_OPENSENT)
assert.NotEmpty(msg.fsm.conn)

// Add the peer incoming channel back to the server
err = s1.mgmtOperation(func() error {
s1.addIncoming(incoming)
return nil
}, true)
assert.Nil(err)

// Delete the peer from s1.
err = s1.DeletePeer(context.Background(), &api.DeletePeerRequest{Address: p1.Conf.NeighborAddress})
assert.Nil(err)

// Send the message OPENSENT transition message again to the server.
incoming.In() <- msg

// Wait for peer connection channel to be closed and check that the open
// tcp connection has also been closed.
<-neighbor1.fsm.connCh
assert.Empty(neighbor1.fsm.conn)

// Check that we can establish the peering when re-adding the peer.
establishedCh := make(chan struct{})
go waitEstablished(s2, establishedCh)
err = s1.AddPeer(context.Background(), &api.AddPeerRequest{Peer: p1})
assert.Nil(err)
<-establishedCh
}

func TestFamiliesForSoftreset(t *testing.T) {
f := func(f bgp.RouteFamily) oc.AfiSafi {
return oc.AfiSafi{
Expand Down

0 comments on commit 08529d6

Please sign in to comment.