Skip to content

Commit

Permalink
reset LastHighestIhavesSent every 1 minute
Browse files Browse the repository at this point in the history
  • Loading branch information
kc1116 committed Jul 12, 2023
1 parent e38c31e commit 20b917e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 19 deletions.
16 changes: 10 additions & 6 deletions network/p2p/tracer/gossipSubMeshTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const (

// MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh.
MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat"

// defaultLastHighestIHaveRPCSizeResetInterval this default interval should always be equal to the gossipsub heart beat interval.
defaultLastHighestIHaveRPCSizeResetInterval = time.Minute
)

// The GossipSubMeshTracer component in the GossipSub pubsub.RawTracer that is designed to track the local
Expand Down Expand Up @@ -69,12 +72,13 @@ type GossipSubMeshTracerConfig struct {
func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer {
lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger()
rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{
Logger: lg,
RPCSentCacheSize: config.RpcSentTrackerCacheSize,
RPCSentCacheCollector: config.RpcSentTrackerCacheCollector,
WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector,
WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize,
NumOfWorkers: config.RpcSentTrackerNumOfWorkers,
Logger: lg,
RPCSentCacheSize: config.RpcSentTrackerCacheSize,
RPCSentCacheCollector: config.RpcSentTrackerCacheCollector,
WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector,
WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize,
NumOfWorkers: config.RpcSentTrackerNumOfWorkers,
LastHighestIhavesSentResetInterval: defaultLastHighestIHaveRPCSizeResetInterval,
})
g := &GossipSubMeshTracer{
RawTracer: NewGossipSubNoopTracer(),
Expand Down
36 changes: 33 additions & 3 deletions network/p2p/tracer/internal/rpc_sent_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"crypto/rand"
"fmt"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/onflow/flow-go/engine/common/worker"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/mempool/queue"
p2pmsg "github.com/onflow/flow-go/network/p2p/message"
)
Expand All @@ -30,7 +32,8 @@ type RPCSentTracker struct {
cache *rpcSentCache
workerPool *worker.Pool[trackRPC]
// lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent.
lastHighestIHaveRPCSize *atomic.Int64
lastHighestIHaveRPCSize *atomic.Int64
lastHighestIHaveRPCSizeResetInterval time.Duration
}

// RPCSentTrackerConfig configuration for the RPCSentTracker.
Expand All @@ -46,6 +49,8 @@ type RPCSentTrackerConfig struct {
WorkerQueueCacheSize uint32
// NumOfWorkers number of workers in the worker pool.
NumOfWorkers int
// LastHighestIhavesSentResetInterval the refresh interval to reset the lastHighestIHaveRPCSize.
LastHighestIhavesSentResetInterval time.Duration
}

// NewRPCSentTracker returns a new *NewRPCSentTracker.
Expand All @@ -62,15 +67,20 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker {
config.WorkerQueueCacheCollector)

tracker := &RPCSentTracker{
cache: newRPCSentCache(cacheConfig),
lastHighestIHaveRPCSize: atomic.NewInt64(0),
cache: newRPCSentCache(cacheConfig),
lastHighestIHaveRPCSize: atomic.NewInt64(0),
lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval,
}
tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC](
config.Logger,
store,
tracker.rpcSent).Build()

tracker.Component = component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
tracker.lastHighestIHaveRPCSizeResetLoop(ctx)
}).
AddWorkers(config.NumOfWorkers, tracker.workerPool.WorkerLogic()).
Build()

Expand Down Expand Up @@ -134,6 +144,26 @@ func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 {
return t.lastHighestIHaveRPCSize.Load()
}

// lastHighestIHaveRPCSizeResetLoop resets the lastHighestIHaveRPCSize to 0 on each interval tick.
func (t *RPCSentTracker) lastHighestIHaveRPCSizeResetLoop(ctx irrecoverable.SignalerContext) {
ticker := time.NewTicker(t.lastHighestIHaveRPCSizeResetInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
default:
}

select {
case <-ctx.Done():
return
case <-ticker.C:
t.lastHighestIHaveRPCSize.Store(0)
}
}
}

// nonce returns random string that is used to store unique items in herocache.
func nonce() ([]byte, error) {
b := make([]byte, 16)
Expand Down
32 changes: 22 additions & 10 deletions network/p2p/tracer/internal/rpc_sent_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

// TestNewRPCSentTracker ensures *RPCSenTracker is created as expected.
func TestNewRPCSentTracker(t *testing.T) {
tracker := mockTracker(t)
tracker := mockTracker(t, time.Minute)
require.NotNil(t, tracker)
}

Expand All @@ -28,7 +28,7 @@ func TestRPCSentTracker_IHave(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

tracker := mockTracker(t)
tracker := mockTracker(t, time.Minute)
require.NotNil(t, tracker)

tracker.Start(signalerCtx)
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)

tracker := mockTracker(t)
tracker := mockTracker(t, 3*time.Second)
require.NotNil(t, tracker)

tracker.Start(signalerCtx)
Expand Down Expand Up @@ -116,6 +116,17 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) {
}, time.Second, 100*time.Millisecond)

require.Equal(t, int64(expectedLastHighestSize), tracker.LastHighestIHaveRPCSize())

// after setting sending large RPC lastHighestIHaveRPCSize should reset to 0 after lastHighestIHaveRPCSize reset loop tick
largeIhave := 50000
require.NoError(t, tracker.RPCSent(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds)))))
require.Eventually(t, func() bool {
return tracker.LastHighestIHaveRPCSize() == int64(largeIhave)
}, 1*time.Second, 100*time.Millisecond)

require.Eventually(t, func() bool {
return tracker.LastHighestIHaveRPCSize() == 0
}, 4*time.Second, 100*time.Millisecond)
}

// mockIHaveFixture generate list of iHaves of size n. Each iHave will be created with m number of random message ids.
Expand All @@ -132,16 +143,17 @@ func mockIHaveFixture(n, m int) []*pb.ControlIHave {
return iHaves
}

func mockTracker(t *testing.T) *RPCSentTracker {
func mockTracker(t *testing.T, lastHighestIhavesSentResetInterval time.Duration) *RPCSentTracker {
cfg, err := config.DefaultConfig()
require.NoError(t, err)
tracker := NewRPCSentTracker(&RPCSentTrackerConfig{
Logger: zerolog.Nop(),
RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RPCSentCacheCollector: metrics.NewNoopCollector(),
WorkerQueueCacheCollector: metrics.NewNoopCollector(),
WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
NumOfWorkers: 1,
Logger: zerolog.Nop(),
RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
RPCSentCacheCollector: metrics.NewNoopCollector(),
WorkerQueueCacheCollector: metrics.NewNoopCollector(),
WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize,
NumOfWorkers: 1,
LastHighestIhavesSentResetInterval: lastHighestIhavesSentResetInterval,
})
return tracker
}
Expand Down

0 comments on commit 20b917e

Please sign in to comment.