diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index cd0d548e3c0..21bc9e5de6c 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -24,6 +24,9 @@ const ( // MeshLogIntervalWarnMsg is the message logged by the tracer every logInterval if there are unknown peers in the mesh. MeshLogIntervalWarnMsg = "unknown peers in topic mesh peers of local node since last heartbeat" + + // defaultLastHighestIHaveRPCSizeResetInterval this default interval should always be equal to the gossipsub heart beat interval. + defaultLastHighestIHaveRPCSizeResetInterval = time.Minute ) // The GossipSubMeshTracer component in the GossipSub pubsub.RawTracer that is designed to track the local @@ -69,12 +72,13 @@ type GossipSubMeshTracerConfig struct { func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { lg := config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger() rpcSentTracker := internal.NewRPCSentTracker(&internal.RPCSentTrackerConfig{ - Logger: lg, - RPCSentCacheSize: config.RpcSentTrackerCacheSize, - RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, - WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, - WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, - NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + Logger: lg, + RPCSentCacheSize: config.RpcSentTrackerCacheSize, + RPCSentCacheCollector: config.RpcSentTrackerCacheCollector, + WorkerQueueCacheCollector: config.RpcSentTrackerWorkerQueueCacheCollector, + WorkerQueueCacheSize: config.RpcSentTrackerWorkerQueueCacheSize, + NumOfWorkers: config.RpcSentTrackerNumOfWorkers, + LastHighestIhavesSentResetInterval: defaultLastHighestIHaveRPCSizeResetInterval, }) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go index 09f29436c49..63ae18edb81 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -3,6 +3,7 @@ package internal import ( "crypto/rand" "fmt" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -12,6 +13,7 @@ import ( "github.com/onflow/flow-go/engine/common/worker" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/queue" p2pmsg "github.com/onflow/flow-go/network/p2p/message" ) @@ -30,7 +32,8 @@ type RPCSentTracker struct { cache *rpcSentCache workerPool *worker.Pool[trackRPC] // lastHighestIHaveRPCSize tracks the size of the last largest iHave rpc control message sent. - lastHighestIHaveRPCSize *atomic.Int64 + lastHighestIHaveRPCSize *atomic.Int64 + lastHighestIHaveRPCSizeResetInterval time.Duration } // RPCSentTrackerConfig configuration for the RPCSentTracker. @@ -46,6 +49,8 @@ type RPCSentTrackerConfig struct { WorkerQueueCacheSize uint32 // NumOfWorkers number of workers in the worker pool. NumOfWorkers int + // LastHighestIhavesSentResetInterval the refresh interval to reset the lastHighestIHaveRPCSize. + LastHighestIhavesSentResetInterval time.Duration } // NewRPCSentTracker returns a new *NewRPCSentTracker. @@ -62,8 +67,9 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { config.WorkerQueueCacheCollector) tracker := &RPCSentTracker{ - cache: newRPCSentCache(cacheConfig), - lastHighestIHaveRPCSize: atomic.NewInt64(0), + cache: newRPCSentCache(cacheConfig), + lastHighestIHaveRPCSize: atomic.NewInt64(0), + lastHighestIHaveRPCSizeResetInterval: config.LastHighestIhavesSentResetInterval, } tracker.workerPool = worker.NewWorkerPoolBuilder[trackRPC]( config.Logger, @@ -71,6 +77,10 @@ func NewRPCSentTracker(config *RPCSentTrackerConfig) *RPCSentTracker { tracker.rpcSent).Build() tracker.Component = component.NewComponentManagerBuilder(). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + tracker.lastHighestIHaveRPCSizeResetLoop(ctx) + }). AddWorkers(config.NumOfWorkers, tracker.workerPool.WorkerLogic()). Build() @@ -134,6 +144,26 @@ func (t *RPCSentTracker) LastHighestIHaveRPCSize() int64 { return t.lastHighestIHaveRPCSize.Load() } +// lastHighestIHaveRPCSizeResetLoop resets the lastHighestIHaveRPCSize to 0 on each interval tick. +func (t *RPCSentTracker) lastHighestIHaveRPCSizeResetLoop(ctx irrecoverable.SignalerContext) { + ticker := time.NewTicker(t.lastHighestIHaveRPCSizeResetInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + default: + } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.lastHighestIHaveRPCSize.Store(0) + } + } +} + // nonce returns random string that is used to store unique items in herocache. func nonce() ([]byte, error) { b := make([]byte, 16) diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go index 47e897442f8..7d2e9d57a6a 100644 --- a/network/p2p/tracer/internal/rpc_sent_tracker_test.go +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -19,7 +19,7 @@ import ( // TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. func TestNewRPCSentTracker(t *testing.T) { - tracker := mockTracker(t) + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) } @@ -28,7 +28,7 @@ func TestRPCSentTracker_IHave(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - tracker := mockTracker(t) + tracker := mockTracker(t, time.Minute) require.NotNil(t, tracker) tracker.Start(signalerCtx) @@ -81,7 +81,7 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) - tracker := mockTracker(t) + tracker := mockTracker(t, 3*time.Second) require.NotNil(t, tracker) tracker.Start(signalerCtx) @@ -116,6 +116,17 @@ func TestRPCSentTracker_LastHighestIHaveRPCSize(t *testing.T) { }, time.Second, 100*time.Millisecond) require.Equal(t, int64(expectedLastHighestSize), tracker.LastHighestIHaveRPCSize()) + + // after setting sending large RPC lastHighestIHaveRPCSize should reset to 0 after lastHighestIHaveRPCSize reset loop tick + largeIhave := 50000 + require.NoError(t, tracker.RPCSent(rpcFixture(withIhaves(mockIHaveFixture(largeIhave, numOfMessageIds))))) + require.Eventually(t, func() bool { + return tracker.LastHighestIHaveRPCSize() == int64(largeIhave) + }, 1*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return tracker.LastHighestIHaveRPCSize() == 0 + }, 4*time.Second, 100*time.Millisecond) } // mockIHaveFixture generate list of iHaves of size n. Each iHave will be created with m number of random message ids. @@ -132,16 +143,17 @@ func mockIHaveFixture(n, m int) []*pb.ControlIHave { return iHaves } -func mockTracker(t *testing.T) *RPCSentTracker { +func mockTracker(t *testing.T, lastHighestIhavesSentResetInterval time.Duration) *RPCSentTracker { cfg, err := config.DefaultConfig() require.NoError(t, err) tracker := NewRPCSentTracker(&RPCSentTrackerConfig{ - Logger: zerolog.Nop(), - RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, - RPCSentCacheCollector: metrics.NewNoopCollector(), - WorkerQueueCacheCollector: metrics.NewNoopCollector(), - WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, - NumOfWorkers: 1, + Logger: zerolog.Nop(), + RPCSentCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + RPCSentCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheCollector: metrics.NewNoopCollector(), + WorkerQueueCacheSize: cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerQueueCacheSize, + NumOfWorkers: 1, + LastHighestIhavesSentResetInterval: lastHighestIhavesSentResetInterval, }) return tracker }