Skip to content

x/sync: add locks for peerTracker #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,14 @@ func (c *networkClient) RequestAny(
}
defer c.activeRequests.Release(1)

c.lock.Lock()
nodeID, ok := c.peers.GetAnyPeer(minVersion)
if !ok {
c.lock.Unlock()
return ids.EmptyNodeID, nil, fmt.Errorf(
"no peers found matching version %s out of %d peers",
minVersion, c.peers.Size(),
)
}

// Note [c.request] releases [c.lock].
response, err := c.request(ctx, nodeID, request)
return nodeID, response, err
}
Expand All @@ -208,8 +205,6 @@ func (c *networkClient) Request(
}
defer c.activeRequests.Release(1)

c.lock.Lock()
// Note [c.request] releases [c.lock].
return c.request(ctx, nodeID, request)
}

Expand All @@ -219,12 +214,13 @@ func (c *networkClient) Request(
// Releases active requests semaphore if there was an error in sending the request.
// Assumes [nodeID] is never [c.myNodeID] since we guarantee
// [c.myNodeID] will not be added to [c.peers].
// Assumes [c.lock] is held and unlocks [c.lock] before returning.
// Assumes [c.lock] is not held and unlocks [c.lock] before returning.
func (c *networkClient) request(
ctx context.Context,
nodeID ids.NodeID,
request []byte,
) ([]byte, error) {
c.lock.Lock()
c.log.Debug("sending request to peer",
zap.Stringer("nodeID", nodeID),
zap.Int("requestLen", len(request)),
Expand All @@ -246,13 +242,13 @@ func (c *networkClient) request(
handler := newResponseHandler()
c.outstandingRequestHandlers[requestID] = handler

c.lock.Unlock() // unlock so response can be received

var (
response []byte
startTime = time.Now()
)

c.lock.Unlock() // unlock so response can be received

select {
case <-ctx.Done():
c.peers.TrackBandwidth(nodeID, 0)
Expand Down Expand Up @@ -283,9 +279,6 @@ func (c *networkClient) Connected(
nodeID ids.NodeID,
nodeVersion *version.Application,
) error {
c.lock.Lock()
defer c.lock.Unlock()

if nodeID == c.myNodeID {
c.log.Debug("skipping registering self as peer")
return nil
Expand All @@ -298,9 +291,6 @@ func (c *networkClient) Connected(

// Disconnected removes given [nodeID] from the peer list.
func (c *networkClient) Disconnected(_ context.Context, nodeID ids.NodeID) error {
c.lock.Lock()
defer c.lock.Unlock()

if nodeID == c.myNodeID {
c.log.Debug("skipping deregistering self as peer")
return nil
Expand Down
23 changes: 22 additions & 1 deletion x/sync/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sync

import (
"math/rand"
"sync"
"time"

stdmath "math"
Expand Down Expand Up @@ -40,8 +41,9 @@ type peerInfo struct {
// Tracks the bandwidth of responses coming from peers,
// preferring to contact peers with known good bandwidth, connecting
// to new peers with an exponentially decaying probability.
// Note: not thread safe. Caller must handle synchronization.
type peerTracker struct {
// Lock to protect concurrent access to the peer tracker
lock sync.Mutex
// All peers we are connected to
peers map[ids.NodeID]*peerInfo
// Peers that we're connected to that we've sent a request to
Expand Down Expand Up @@ -75,6 +77,7 @@ func newPeerTracker(log logging.Logger) *peerTracker {

// Returns true if we're not connected to enough peers.
// Otherwise returns true probabilistically based on the number of tracked peers.
// Assumes p.lock is held.
func (p *peerTracker) shouldTrackNewPeer() bool {
numResponsivePeers := p.responsivePeers.Len()
if numResponsivePeers < desiredMinResponsivePeers {
Expand Down Expand Up @@ -105,6 +108,9 @@ func (p *peerTracker) shouldTrackNewPeer() bool {
// Otherwise, with probability [randomPeerProbability] returns a random peer from [p.responsivePeers].
// With probability [1-randomPeerProbability] returns the peer in [p.bandwidthHeap] with the highest bandwidth.
func (p *peerTracker) GetAnyPeer(minVersion *version.Application) (ids.NodeID, bool) {
p.lock.Lock()
defer p.lock.Unlock()

if p.shouldTrackNewPeer() {
for nodeID := range p.peers {
// if minVersion is specified and peer's version is less, skip
Expand Down Expand Up @@ -148,13 +154,19 @@ func (p *peerTracker) GetAnyPeer(minVersion *version.Application) (ids.NodeID, b

// Record that we sent a request to [nodeID].
func (p *peerTracker) TrackPeer(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.trackedPeers.Add(nodeID)
// p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
}

// Record that we observed that [nodeID]'s bandwidth is [bandwidth].
// Adds the peer's bandwidth averager to the bandwidth heap.
func (p *peerTracker) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
p.lock.Lock()
defer p.lock.Unlock()

peer := p.peers[nodeID]
if peer == nil {
// we're not connected to this peer, nothing to do here
Expand Down Expand Up @@ -184,6 +196,9 @@ func (p *peerTracker) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {

// Connected should be called when [nodeID] connects to this node
func (p *peerTracker) Connected(nodeID ids.NodeID, nodeVersion *version.Application) {
p.lock.Lock()
defer p.lock.Unlock()

peer := p.peers[nodeID]
if peer == nil {
p.peers[nodeID] = &peerInfo{
Expand Down Expand Up @@ -216,6 +231,9 @@ func (p *peerTracker) Connected(nodeID ids.NodeID, nodeVersion *version.Applicat

// Disconnected should be called when [nodeID] disconnects from this node
func (p *peerTracker) Disconnected(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.bandwidthHeap.Remove(nodeID)
p.trackedPeers.Remove(nodeID)
// p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
Expand All @@ -226,5 +244,8 @@ func (p *peerTracker) Disconnected(nodeID ids.NodeID) {

// Returns the number of peers the node is connected to.
func (p *peerTracker) Size() int {
p.lock.Lock()
defer p.lock.Unlock()

return len(p.peers)
}