Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 102 additions & 43 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
)

const (
typeLabel = "type"
pushType = "push"
pullType = "pull"
typeLabel = "type"
pushType = "push"
pullType = "pull"
unsentType = "unsent"
sentType = "sent"

defaultGossipableCount = 64
)
Expand All @@ -45,6 +47,12 @@ var (
pullLabels = prometheus.Labels{
typeLabel: pullType,
}
unsentLabels = prometheus.Labels{
typeLabel: unsentType,
}
sentLabels = prometheus.Labels{
typeLabel: sentType,
}

ErrInvalidDiscardedSize = errors.New("discarded size cannot be negative")
ErrInvalidTargetGossipSize = errors.New("target gossip size cannot be negative")
Expand All @@ -70,11 +78,12 @@ type ValidatorGossiper struct {
// Metrics that are tracked across a gossip protocol. A given protocol should
// only use a single instance of Metrics.
type Metrics struct {
sentCount *prometheus.CounterVec
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
tracking prometheus.Gauge
sentCount *prometheus.CounterVec
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
tracking *prometheus.GaugeVec
trackingLifetimeAverage prometheus.Gauge
}

// NewMetrics returns a common set of metrics
Expand Down Expand Up @@ -103,10 +112,15 @@ func NewMetrics(
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}, metricLabels),
tracking: prometheus.NewGauge(prometheus.GaugeOpts{
tracking: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking",
Help: "number of gossipables being tracked",
}, metricLabels),
trackingLifetimeAverage: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "gossip_tracking_lifetime_average",
Help: "average duration a gossipable has been tracked (ns)",
}),
}
err := utils.Err(
Expand All @@ -115,6 +129,7 @@ func NewMetrics(
metrics.Register(m.receivedCount),
metrics.Register(m.receivedBytes),
metrics.Register(m.tracking),
metrics.Register(m.trackingLifetimeAverage),
)
return m, err
}
Expand Down Expand Up @@ -265,10 +280,10 @@ func NewPushGossiper[T Gossipable](
targetGossipSize: targetGossipSize,
maxRegossipFrequency: maxRegossipFrequency,

tracking: make(map[ids.ID]time.Time),
pending: buffer.NewUnboundedDeque[T](0),
issued: buffer.NewUnboundedDeque[T](0),
discarded: &cache.LRU[ids.ID, struct{}]{Size: discardedSize},
tracking: make(map[ids.ID]*tracking),
toGossip: buffer.NewUnboundedDeque[T](0),
toRegossip: buffer.NewUnboundedDeque[T](0),
discarded: &cache.LRU[ids.ID, struct{}]{Size: discardedSize},
}, nil
}

Expand All @@ -282,17 +297,31 @@ type PushGossiper[T Gossipable] struct {
targetGossipSize int
maxRegossipFrequency time.Duration

lock sync.Mutex
tracking map[ids.ID]time.Time
pending buffer.Deque[T]
issued buffer.Deque[T]
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
lock sync.Mutex
tracking map[ids.ID]*tracking
addedTimeSum float64 // unix nanoseconds
toGossip buffer.Deque[T]
toRegossip buffer.Deque[T]
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
}

type tracking struct {
addedTime float64 // unix nanoseconds
lastGossiped time.Time
}

// Gossip flushes any queued gossipables.
func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
var (
now = time.Now()
nowUnixNano = float64(now.UnixNano())
)

p.lock.Lock()
defer p.lock.Unlock()
defer func() {
p.updateMetrics(nowUnixNano)
p.lock.Unlock()
}()

if len(p.tracking) == 0 {
return nil
Expand All @@ -301,75 +330,78 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
now = time.Now()
)

// Iterate over all pending gossipables (never been sent before).
// Iterate over all unsent gossipables.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.pending.PopLeft()
gossipable, ok := p.toGossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
continue
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}

maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency)

// Iterate over all issued gossipables (have been sent before) to fill any
// remaining space in gossip batch.
// Iterate over all previously sent gossipables to fill any remaining space
// in the gossip batch.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.issued.PopLeft()
gossipable, ok := p.toRegossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if issued once
p.addedTimeSum -= tracking.addedTime
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if previously sent
continue
}

// Ensure we don't attempt to send a gossipable too frequently.
lastGossipTime := p.tracking[gossipID]
if maxLastGossipTimeToRegossip.Before(lastGossipTime) {
if maxLastGossipTimeToRegossip.Before(tracking.lastGossiped) {
// Put the gossipable on the front of the queue to keep items sorted
// by last issuance time.
p.issued.PushLeft(gossipable)
p.toRegossip.PushLeft(gossipable)
break
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// Should never happen because we've already issued this once.
// Should never happen because we've already sent this once.
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.issued.PushRight(gossipable)
p.tracking[gossipID] = now
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}
p.metrics.tracking.Set(float64(len(p.tracking)))

// If there is nothing to gossip, we can exit early.
if len(gossip) == 0 {
Expand Down Expand Up @@ -400,26 +432,53 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
// Add enqueues new gossipables to be pushed. If a gossiable is already tracked,
// it is not added again.
func (p *PushGossiper[T]) Add(gossipables ...T) {
var (
now = time.Now()
nowUnixNano = float64(now.UnixNano())
)

p.lock.Lock()
defer p.lock.Unlock()
defer func() {
p.updateMetrics(nowUnixNano)
p.lock.Unlock()
}()

// Add new gossipables to the pending queue.
now := time.Now()
// Add new gossipables to be sent.
for _, gossipable := range gossipables {
gossipID := gossipable.GossipID()
if _, ok := p.tracking[gossipID]; ok {
continue
}

tracking := &tracking{
addedTime: nowUnixNano,
}
if _, ok := p.discarded.Get(gossipID); ok {
// Pretend that recently discarded transactions were just gossiped.
p.tracking[gossipID] = now
p.issued.PushRight(gossipable)
tracking.lastGossiped = now
p.toRegossip.PushRight(gossipable)
} else {
p.tracking[gossipID] = time.Time{}
p.pending.PushRight(gossipable)
p.toGossip.PushRight(gossipable)
}
p.tracking[gossipID] = tracking
p.addedTimeSum += nowUnixNano
}
}

func (p *PushGossiper[_]) updateMetrics(nowUnixNano float64) {
var (
numUnsent = float64(p.toGossip.Len())
numSent = float64(p.toRegossip.Len())
numTracking = numUnsent + numSent
averageLifetime float64
)
if numTracking != 0 {
averageLifetime = nowUnixNano - p.addedTimeSum/numTracking
}
p.metrics.tracking.Set(float64(len(p.tracking)))

p.metrics.tracking.With(unsentLabels).Set(numUnsent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool

p.metrics.tracking.With(sentLabels).Set(numSent)
p.metrics.trackingLifetimeAverage.Set(averageLifetime)
}

// Every calls [Gossip] every [frequency] amount of time.
Expand Down