Skip to content

Commit 415d82c

Browse files
authored
Fix random failure in TestWebsocketNetworkPrioLimit (#2509)
The peers array is modified when adding/removing entries from it. When that does happen, we increase the peersChangeCounter, so that the broadcast method would know that it's peers list need to be refreshed. The said update was missing from prioTracker.setPriority, which was causing the issue.
1 parent 66fbd60 commit 415d82c

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

network/netprio.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package network
1818

1919
import (
2020
"container/heap"
21+
"sync/atomic"
2122

2223
"github.com/algorand/go-algorand/data/basics"
2324
"github.com/algorand/go-algorand/protocol"
@@ -125,6 +126,7 @@ func (pt *prioTracker) setPriority(peer *wsPeer, addr basics.Address, weight uin
125126
peer.prioAddress = addr
126127
peer.prioWeight = weight
127128
heap.Fix(peersHeap{wn}, peer.peerIndex)
129+
atomic.AddInt32(&wn.peersChangeCounter, 1)
128130
}
129131

130132
func (pt *prioTracker) removePeer(peer *wsPeer) {

network/wsNetwork_test.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,7 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
11281128
netB := makeTestWebsocketNode(t)
11291129
netB.SetPrioScheme(&prioB)
11301130
netB.config.GossipFanout = 1
1131+
netB.config.NetAddress = ""
11311132
netB.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
11321133
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterB}})
11331134
netB.Start()
@@ -1141,37 +1142,53 @@ func TestWebsocketNetworkPrioLimit(t *testing.T) {
11411142
netC := makeTestWebsocketNode(t)
11421143
netC.SetPrioScheme(&prioC)
11431144
netC.config.GossipFanout = 1
1145+
netC.config.NetAddress = ""
11441146
netC.phonebook.ReplacePeerList([]string{addrA}, "default", PhoneBookEntryRelayRole)
11451147
netC.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.TxnTag, MessageHandler: counterC}})
11461148
netC.Start()
11471149
defer func() { t.Log("stopping C"); netC.Stop(); t.Log("C done") }()
11481150

11491151
// Wait for response messages to propagate from B+C to A
11501152
select {
1151-
case <-netA.prioResponseChan:
1153+
case peer := <-netA.prioResponseChan:
1154+
netA.peersLock.RLock()
1155+
require.Subset(t, []uint64{prioB.prio, prioC.prio}, []uint64{peer.prioWeight})
1156+
netA.peersLock.RUnlock()
11521157
case <-time.After(time.Second):
11531158
t.Errorf("timeout on netA.prioResponseChan 1")
11541159
}
11551160
select {
1156-
case <-netA.prioResponseChan:
1161+
case peer := <-netA.prioResponseChan:
1162+
netA.peersLock.RLock()
1163+
require.Subset(t, []uint64{prioB.prio, prioC.prio}, []uint64{peer.prioWeight})
1164+
netA.peersLock.RUnlock()
11571165
case <-time.After(time.Second):
11581166
t.Errorf("timeout on netA.prioResponseChan 2")
11591167
}
11601168
waitReady(t, netA, time.After(time.Second))
11611169

1170+
firstPeer := netA.peers[0]
11621171
netA.Broadcast(context.Background(), protocol.TxnTag, nil, true, nil)
11631172

1173+
failed := false
11641174
select {
11651175
case <-counterBdone:
11661176
case <-time.After(time.Second):
11671177
t.Errorf("timeout, B did not receive message")
1178+
failed = true
11681179
}
11691180

11701181
select {
11711182
case <-counterCdone:
11721183
t.Errorf("C received message")
1184+
failed = true
11731185
case <-time.After(time.Second):
11741186
}
1187+
1188+
if failed {
1189+
t.Errorf("NetA had the following two peers priorities : [0]:%s=%d [1]:%s=%d", netA.peers[0].rootURL, netA.peers[0].prioWeight, netA.peers[1].rootURL, netA.peers[1].prioWeight)
1190+
t.Errorf("first peer before broadcasting was %s", firstPeer.rootURL)
1191+
}
11751192
}
11761193

11771194
// Create many idle connections, to see if we have excessive CPU utilization.

0 commit comments

Comments
 (0)