Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions x/sync/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func sendRangeRequest(
sender := common.NewMockSender(ctrl)
handler := NewNetworkServer(sender, db, logging.NoLog{})
clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID()
networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{})
networkClient, err := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}, "", prometheus.NewRegistry())
require.NoError(err)
require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp))
client := NewClient(&ClientConfig{
NetworkClient: networkClient,
Expand Down Expand Up @@ -315,7 +316,8 @@ func sendChangeRequest(
sender := common.NewMockSender(ctrl)
handler := NewNetworkServer(sender, db, logging.NoLog{})
clientNodeID, serverNodeID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID()
networkClient := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{})
networkClient, err := NewNetworkClient(sender, clientNodeID, 1, logging.NoLog{}, "", prometheus.NewRegistry())
require.NoError(err)
require.NoError(networkClient.Connected(context.Background(), serverNodeID, version.CurrentApp))
client := NewClient(&ClientConfig{
NetworkClient: networkClient,
Expand Down
15 changes: 12 additions & 3 deletions x/sync/network_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"go.uber.org/zap"

"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -73,15 +75,22 @@ func NewNetworkClient(
myNodeID ids.NodeID,
maxActiveRequests int64,
log logging.Logger,
) NetworkClient {
metricsNamespace string,
registerer prometheus.Registerer,
) (NetworkClient, error) {
peerTracker, err := newPeerTracker(log, metricsNamespace, registerer)
if err != nil {
return nil, fmt.Errorf("failed to create peer tracker: %w", err)
}

return &networkClient{
appSender: appSender,
myNodeID: myNodeID,
outstandingRequestHandlers: make(map[uint32]ResponseHandler),
activeRequests: semaphore.NewWeighted(maxActiveRequests),
peers: newPeerTracker(log),
peers: peerTracker,
log: log,
}
}, nil
}

// Always returns nil because the engine considers errors
Expand Down
66 changes: 49 additions & 17 deletions x/sync/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

stdmath "math"

"go.uber.org/zap"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/math"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/version"
)

Expand Down Expand Up @@ -52,27 +55,56 @@ type peerTracker struct {
// Peers that we're connected to that responded to the last request they were sent.
responsivePeers set.Set[ids.NodeID]
// Max heap that contains the average bandwidth of peers.
bandwidthHeap math.AveragerHeap
averageBandwidth math.Averager
log logging.Logger
// numTrackedPeers prometheus.Gauge
// numResponsivePeers prometheus.Gauge
// averageBandwidthMetric prometheus.Gauge
bandwidthHeap math.AveragerHeap
averageBandwidth math.Averager
log logging.Logger
numTrackedPeers prometheus.Gauge
numResponsivePeers prometheus.Gauge
averageBandwidthMetric prometheus.Gauge
}

func newPeerTracker(log logging.Logger) *peerTracker {
// TODO: initialize metrics
return &peerTracker{
func newPeerTracker(
log logging.Logger,
metricsNamespace string,
registerer prometheus.Registerer,
) (*peerTracker, error) {
t := &peerTracker{
peers: make(map[ids.NodeID]*peerInfo),
trackedPeers: make(set.Set[ids.NodeID]),
responsivePeers: make(set.Set[ids.NodeID]),
bandwidthHeap: math.NewMaxAveragerHeap(),
averageBandwidth: math.NewAverager(0, bandwidthHalflife, time.Now()),
log: log,
// numTrackedPeers: metrics.GetOrRegisterGauge("net_tracked_peers", nil),
// numResponsivePeers: metrics.GetOrRegisterGauge("net_responsive_peers", nil),
// averageBandwidthMetric: metrics.GetOrRegisterGaugeFloat64("net_average_bandwidth", nil),
numTrackedPeers: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "num_tracked_peers",
Help: "number of tracked peers",
},
),
numResponsivePeers: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "num_responsive_peers",
Help: "number of responsive peers",
},
),
averageBandwidthMetric: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "average_bandwidth",
Help: "average sync bandwidth used by peers",
},
),
}

errs := wrappers.Errs{}
errs.Add(
registerer.Register(t.numTrackedPeers),
registerer.Register(t.numResponsivePeers),
registerer.Register(t.averageBandwidthMetric),
)
return t, errs.Err
}

// Returns true if we're not connected to enough peers.
Expand Down Expand Up @@ -158,7 +190,7 @@ func (p *peerTracker) TrackPeer(nodeID ids.NodeID) {
defer p.lock.Unlock()

p.trackedPeers.Add(nodeID)
// p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
}

// Record that we observed that [nodeID]'s bandwidth is [bandwidth].
Expand Down Expand Up @@ -189,9 +221,9 @@ func (p *peerTracker) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {
// TODO danlaine: shouldn't we add the observation of 0
// to the average bandwidth in the if statement?
p.averageBandwidth.Observe(bandwidth, now)
// p.averageBandwidthMetric.Set(p.averageBandwidth.Read())
p.averageBandwidthMetric.Set(p.averageBandwidth.Read())
}
// p.numResponsivePeers.Set(float64(p.responsivePeers.Len()))
p.numResponsivePeers.Set(float64(p.responsivePeers.Len()))
}

// Connected should be called when [nodeID] connects to this node
Expand Down Expand Up @@ -236,9 +268,9 @@ func (p *peerTracker) Disconnected(nodeID ids.NodeID) {

p.bandwidthHeap.Remove(nodeID)
p.trackedPeers.Remove(nodeID)
// p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
p.numTrackedPeers.Set(float64(p.trackedPeers.Len()))
p.responsivePeers.Remove(nodeID)
// p.numResponsivePeers.Set(float64(p.responsivePeers.Len()))
p.numResponsivePeers.Set(float64(p.responsivePeers.Len()))
delete(p.peers, nodeID)
}

Expand Down