diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 390cad33d43..5edd2629ee2 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1191,11 +1191,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri return nil, fmt.Errorf("could not create connection manager: %w", err) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - networkMetrics, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: networkMetrics, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) libp2pNode, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 6f825421278..78ddc464fb7 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -702,11 +702,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/config/default-config.yml b/config/default-config.yml index 371fc4c385c..9834694b0e2 100644 --- a/config/default-config.yml +++ b/config/default-config.yml @@ -63,6 +63,9 @@ network-config: # The default interval at which the gossipsub score tracer logs the peer scores. This is used for debugging and forensics purposes. # Note that we purposefully choose this logging interval high enough to avoid spamming the logs. gossipsub-score-tracer-interval: 1m + # The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node. + # Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history. + gossipsub-rpc-sent-tracker-cache-size: 1_000_000 # Peer scoring is the default value for enabling peer scoring gossipsub-peer-scoring-enabled: true # Gossipsub rpc inspectors configs diff --git a/engine/Readme.md b/engine/Readme.md index 8faebe0b332..cd082cdf557 100644 --- a/engine/Readme.md +++ b/engine/Readme.md @@ -1,5 +1,4 @@ # Notifier - The Notifier implements the following state machine ![Notifier State Machine](/docs/NotifierStateMachine.png) diff --git a/engine/verification/Readme.md b/engine/verification/Readme.md new file mode 100644 index 00000000000..ff527a432b0 --- /dev/null +++ b/engine/verification/Readme.md @@ -0,0 +1,170 @@ +# Verification Node +The Verification Node in the Flow blockchain network is a critical component responsible for +verifying `ExecutionResult`s and generating `ResultApproval`s. +Its primary role is to ensure the integrity and validity of block execution by performing verification processes. +In a nutshell, the Verification Node is responsible for the following: +1. Following the chain for new finalized blocks (`Follower` engine). +2. Processing the execution results in the finalized blocks and determining assigned chunks to the node (`Assigner` engine). +3. Requesting chunk data pack from Execution Nodes for the assigned chunks (`Fetcher` and `Requester` engines). +4. Verifying the assigned chunks and emitting `ResultApproval`s for the verified chunks to Consensus Nodes (`Verifier` engine). +![architecture.png](architecture.png) + + +## Block Consumer ([consumer.go](verification%2Fassigner%2Fblockconsumer%2Fconsumer.go)) +The `blockconsumer` package efficiently manages the processing of finalized blocks in Verification Node of Flow blockchain. +Specifically, it listens for notifications from the `Follower` engine regarding finalized blocks, and systematically +queues these blocks for processing. The package employs parallel workers, each an instance of the `Assigner` engine, +to fetch and process blocks from the queue. The `BlockConsumer` diligently coordinates this process by only assigning +a new block to a worker once it has completed processing its current block and signaled its availability. +This ensures that the processing is not only methodical but also resilient to any node crashes. +In case of a crash, the `BlockConsumer` resumes from where it left off by reading the processed block index from storage, reassigning blocks from the queue to workers, +thereby guaranteeing no loss of data. + +## Assigner Engine +The `Assigner` [engine](verification%2Fassigner%2Fengine.go) is an integral part of the verification process in Flow, +focusing on processing the execution results in the finalized blocks, performing chunk assignments on the results, and +queuing the assigned chunks for further processing. The Assigner engine is a worker of the `BlockConsumer` engine, +which assigns finalized blocks to the Assigner engine for processing. +This engine reads execution receipts included in each finalized block, +determines which chunks are assigned to the node for verification, +and stores the assigned chunks into the chunks queue for further processing (by the `Fetcher` engine). + +The core behavior of the Assigner engine is implemented in the `ProcessFinalizedBlock` function. +This function initiates the process of execution receipt indexing, chunk assignment, and processing the assigned chunks. +For every receipt in the block, the engine determines chunk assignments using the verifiable chunk assignment algorithm of Flow. +Each assigned chunk is then processed by the `processChunk` method. This method is responsible for storing a chunk locator in the chunks queue, +which is a crucial step for further processing of the chunks by the fetcher engine. +Deduplication of chunk locators is handled by the chunks queue. +The Assigner engine provides robustness by handling the situation where a node is not authorized at a specific block ID. +It verifies the role of the result executor, checks if the node has been ejected, and assesses the node's staked weight before granting authorization. +Lastly, once the Assigner engine has completed processing the receipts in a block, it sends a notification to the block consumer. This is inline with +Assigner engine as a worker of the block consumer informing the consumer that it is ready to process the next block. +This ensures a smooth and efficient flow of data in the system, promoting consistency across different parts of the Flow architecture. + +### Chunk Locator +A chunk locator in the Flow blockchain is an internal structure of the Verification Nodes that points to a specific chunk +within a specific execution result of a block. It's an important part of the verification process in the Flow network, +allowing verification nodes to efficiently identify, retrieve, and verify individual chunks of computation. + +```go +type ChunkLocator struct { + ResultID flow.Identifier // The identifier of the ExecutionResult + Index uint64 // Index of the chunk +} +``` +- `ResultID`: This is the identifier of the execution result that the chunk is a part of. The execution result contains a list of chunks which each represent a portion of the computation carried out by execution nodes. Each execution result is linked to a specific block in the blockchain. +- `Index`: This is the index of the chunk within the execution result's list of chunks. It's an easy way to refer to a specific chunk within a specific execution result. + +**Note-1**: The `ChunkLocator` doesn't contain the chunk itself but points to where the chunk can be found. In the context of the `Assigner` engine, the `ChunkLocator` is stored in a queue after chunk assignment is done, so the `Fetcher` engine can later retrieve the chunk for verification. +**Note-2**: The `ChunkLocator` is never meant to be sent over the networking layer to another Flow node. It's an internal structure of the verification nodes, and it's only used for internal communication between the `Assigner` and `Fetcher` engines. + + +## ChunkConsumer +The `ChunkConsumer` ([consumer](verification%2Ffetcher%2Fchunkconsumer%2Fconsumer.go)) package orchestrates the processing of chunks in the Verification Node of the Flow blockchain. +Specifically, it keeps tabs on chunks that are assigned for processing by the `Assigner` engine and systematically enqueues these chunks for further handling. +To expedite the processing, the package deploys parallel workers, with each worker being an instance of the `Fetcher` engine, which retrieves and processes the chunks from the queue. +The `ChunkConsumer` administers this process by ensuring that a new chunk is assigned to a worker only after it has finalized processing its current chunk and signaled that it is ready for more. +This systematic approach guarantees not only efficiency but also robustness against any node failures. In an event where a node crashes, +the `ChunkConsumer` picks up right where it left, redistributing chunks from the queue to the workers, ensuring that there is no loss of data or progress. + +## Fetcher Engine - The Journey of a `ChunkLocator` to a `VerifiableChunkData` +The Fetcher [engine.go](fetcher%2Fengine.go) of the Verification Nodes focuses on the lifecycle of a `ChunkLocator` as it transitions into a `VerifiableChunkData`. + +### `VerifiableChunkData` +`VerifiableChunkData` refers to a data structure that encapsulates all the necessary components and resources required to +verify a chunk within the Flow blockchain network. It represents a chunk that has undergone processing and is ready for verification. + +The `VerifiableChunkData` object contains the following key elements: +```go +type VerifiableChunkData struct { + IsSystemChunk bool // indicates whether this is a system chunk + Chunk *flow.Chunk // the chunk to be verified + Header *flow.Header // BlockHeader that contains this chunk + Result *flow.ExecutionResult // execution result of this block + ChunkDataPack *flow.ChunkDataPack // chunk data package needed to verify this chunk + EndState flow.StateCommitment // state commitment at the end of this chunk + TransactionOffset uint32 // index of the first transaction in a chunk within a block +} +``` +1. `IsSystemChunk`: A boolean value that indicates whether the chunk is a system chunk. System chunk is a specific chunk typically representing the last chunk within an execution result. +2. `Chunk`: The actual chunk that needs to be verified. It contains the relevant data and instructions related to the execution of transactions within the blockchain. +3. `Header`: The `BlockHeader` associated with the chunk. It provides important contextual information about the block that the chunk belongs to. +4. `Result`: The `ExecutionResult` object that corresponds to the execution of the block containing the chunk. It contains information about the execution status, including any errors or exceptions encountered during the execution process. +5. `ChunkDataPack`: The `ChunkDataPack`, which is a package containing additional data and resources specific to the chunk being verified. It provides supplementary information required for the verification process. +6. `EndState`: The state commitment at the end of the chunk. It represents the final state of the blockchain after executing all the transactions within the chunk. +7. `TransactionOffset`: An index indicating the position of the first transaction within the chunk in relation to the entire block. This offset helps in locating and tracking individual transactions within the blockchain. +By combining these elements, the VerifiableChunkData object forms a comprehensive representation of a chunk ready for verification. It serves as an input to the `Verifier` engine, which utilizes this data to perform the necessary checks and validations to ensure the integrity and correctness of the chunk within the Flow blockchain network. + +### The Journey of a `ChunkLocator` to a `VerifiableChunkData` +Upon receiving the `ChunkLocator`, the `Fetcher` engine’s `validateAuthorizedExecutionNodeAtBlockID` function is responsible +for validating the authenticity of the sender. It evaluates whether the sender is an authorized execution node for the respective block. +The function cross-references the sender’s credentials against the state snapshot of the specific block. +In the case of unauthorized or invalid credentials, an error is logged, and the `ChunkLocator` is rejected. +For authorized credentials, the processing continues. + +Once authenticated, the `ChunkLocator` is utilized to retrieve the associated Chunk Data Pack. +The `requestChunkDataPack` function takes the Chunk Locator and generates a `ChunkDataPackRequest`. +During this stage, the function segregates execution nodes into two categories - those which agree with the execution result (`agrees`) and those which do not (`disagrees`). +This information is encapsulated within the `ChunkDataPackRequest` and is forwarded to the `Requester` Engine. +The `Requester` Engine handles the retrieval of the `ChunkDataPack` from the network of execution nodes. + +After the Chunk Data Pack is successfully retrieved by the `Requester` Engine, +the next phase involves structuring this data for verification and constructing a `VerifiableChunkData`. +It’s imperative that this construction is performed with utmost accuracy to ensure that the data is in a state that can be properly verified. + +The final step in the lifecycle is forwarding the `VerifiableChunkData` to the `Verifier` Engine. The `Verifier` Engine is tasked with the critical function +of thoroughly analyzing and verifying the data. Depending on the outcome of this verification process, +the chunk may either pass verification successfully or be rejected due to discrepancies. + +### Handling Sealed Chunks +In parallel, the `Fetcher` engine remains vigilant regarding the sealed status of chunks. +The `NotifyChunkDataPackSealed` function monitors the sealing status. +If the Consensus Nodes seal a chunk, this function ensures that the `Fetcher` Engine acknowledges this update and discards the respective +`ChunkDataPack` from its processing pipeline as it is now sealed (i.e., has been verified by an acceptable quota of Verification Nodes). + +## Requester Engine - Retrieving the `ChunkDataPack` +The `Requester` [engine](requester%2Frequester.go) is responsible for handling the request and retrieval of chunk data packs in the Flow blockchain network. +It acts as an intermediary between the `Fetcher` engine and the Execution Nodes, facilitating the communication and coordination required +to obtain the necessary `ChunkDataPack` for verification. + +The `Requester` engine receives `ChunkDataPackRequest`s from the `Fetcher`. +These requests contain information such as the chunk ID, block height, agree and disagree executors, and other relevant details. +Upon receiving a `ChunkDataPackRequest`, the `Requester` engine adds it to the pending requests cache for tracking and further processing. +The Requester engine periodically checks the pending chunk data pack requests and dispatches them to the Execution Nodes for retrieval. +It ensures that only qualified requests are dispatched based on certain criteria, such as the chunk ID and request history. +The dispatching process involves creating a `ChunkDataRequest` message and publishing it to the network. +The request is sent to a selected number of Execution Nodes, determined by the `requestTargets` parameter. + +When an Execution Node receives a `ChunkDataPackRequest`, it processes the request and generates a `ChunkDataResponse` +message containing the requested chunk data pack. The execution node sends this response back to the`Requester` engine. +The `Requester` engine receives the chunk data pack response, verifies its integrity, and passes it to the registered `ChunkDataPackHandler`, +i.e., the `Fetcher` engine. + +### Retry and Backoff Mechanism +In case a `ChunkDataPackRequest` does not receive a response within a certain period, the `Requester` engine retries the request to ensure data retrieval. +It implements an exponential backoff mechanism for retrying failed requests. +The retry interval, backoff multiplier, and backoff intervals can be customized using the respective configuration parameters. + +### Handling Sealed Blocks +If a `ChunkDataPackRequest` pertains to a block that has already been sealed, the `Requester` engine recognizes this and +removes the corresponding request from the pending requests cache. +It notifies the `ChunkDataPackHandler` (i.e., the `Fetcher` engine) about the sealing of the block to ensure proper handling. + +### Parallel Chunk Data Pack Retrieval +The `Requester` processes a number of chunk data pack requests in parallel, +dispatching them to execution nodes and handling the received responses. +However, it is important to note that if a chunk data pack request does not receive a response from the execution nodes, +the `Requester` engine can become stuck in processing, waiting for the missing chunk data pack. +To mitigate this, the engine implements a retry and backoff mechanism, ensuring that requests are retried and backed off if necessary. +This mechanism helps to prevent prolonged waiting and allows the engine to continue processing other requests while waiting for the missing chunk data pack response. + +## Verifier Engine - Verifying Chunks +The `Verifier` [engine](verifier%2Fengine.go) is responsible for verifying chunks, generating `ResultApproval`s, and maintaining a cache of `ResultApproval`s. +It receives verifiable chunks along with the necessary data for verification, verifies the chunks by constructing a partial trie, +executing transactions, and checking the final state commitment and other chunk metadata. +If the verification is successful, it generates a `ResultApproval` and broadcasts it to the consensus nodes. + +The `Verifier` Engine offers the following key features: +1. **Verification of Chunks**: The engine receives verifiable chunks, which include the chunk to be verified, the associated header, execution result, and chunk data pack. It performs the verification process, which involves constructing a partial trie, executing transactions, and checking the final state commitment. The verification process ensures the integrity and validity of the chunk. +2. **Generation of Result Approvals**: If the verification process is successful, the engine generates a result approval for the verified chunk. The result approval includes the block ID, execution result ID, chunk index, attestation, approver ID, attestation signature, and SPoCK (Secure Proof of Confidential Knowledge) signature. The result approval provides a cryptographic proof of the chunk's validity and is used to seal the block. +3. **Cache of Result Approvals**: The engine maintains a cache of result approvals for efficient retrieval and lookup. The result approvals are stored in a storage module, allowing quick access to the approvals associated with specific chunks and execution results. diff --git a/engine/verification/architecture.png b/engine/verification/architecture.png new file mode 100644 index 00000000000..a1a16dec61b Binary files /dev/null and b/engine/verification/architecture.png differ diff --git a/follower/follower_builder.go b/follower/follower_builder.go index b8f3eaa5bc4..e2eb43cb49c 100644 --- a/follower/follower_builder.go +++ b/follower/follower_builder.go @@ -604,11 +604,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr pis = append(pis, pi) } - meshTracer := tracer.NewGossipSubMeshTracer( - builder.Logger, - builder.Metrics.Network, - builder.IdentityProvider, - builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: builder.Logger, + Metrics: builder.Metrics.Network, + IDProvider: builder.IdentityProvider, + LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork), + RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) node, err := p2pbuilder.NewNodeBuilder( builder.Logger, diff --git a/module/metrics/herocache.go b/module/metrics/herocache.go index 54e287bdb1b..f3a88341c87 100644 --- a/module/metrics/herocache.go +++ b/module/metrics/herocache.go @@ -146,6 +146,15 @@ func GossipSubRPCInspectorQueueMetricFactory(f HeroCacheMetricsFactory, networkT return f(namespaceNetwork, r) } +func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { + // we don't use the public prefix for the metrics here for sake of backward compatibility of metric name. + r := ResourceNetworkingRPCSentTrackerCache + if networkType == network.PublicNetwork { + r = PrependPublicPrefix(r) + } + return f(namespaceNetwork, r) +} + func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics { r := ResourceNetworkingRpcInspectorNotificationQueue if networkType == network.PublicNetwork { diff --git a/module/metrics/labels.go b/module/metrics/labels.go index 353e1b3ca25..9febc9ab391 100644 --- a/module/metrics/labels.go +++ b/module/metrics/labels.go @@ -92,6 +92,7 @@ const ( ResourceNetworkingApplicationLayerSpamReportQueue = "application_layer_spam_report_queue" ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache" ResourceNetworkingDisallowListCache = "disallow_list_cache" + ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache" ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel diff --git a/network/internal/p2pfixtures/fixtures.go b/network/internal/p2pfixtures/fixtures.go index 40229337dfa..29ee0509fbb 100644 --- a/network/internal/p2pfixtures/fixtures.go +++ b/network/internal/p2pfixtures/fixtures.go @@ -102,11 +102,16 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif idProvider := id.NewFixedIdentityProvider(nodeIds) defaultFlowConfig, err := config.DefaultConfig() require.NoError(t, err) - meshTracer := tracer.NewGossipSubMeshTracer( - logger, - metrics.NewNoopCollector(), - idProvider, - defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval) + + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metrics.NewNoopCollector(), + IDProvider: idProvider, + LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) builder := p2pbuilder.NewNodeBuilder( logger, diff --git a/network/netconf/flags.go b/network/netconf/flags.go index 3d8e6357e76..bdf821aa60b 100644 --- a/network/netconf/flags.go +++ b/network/netconf/flags.go @@ -37,9 +37,10 @@ const ( gracePeriod = "libp2p-grace-period" silencePeriod = "libp2p-silence-period" // gossipsub - peerScoring = "gossipsub-peer-scoring-enabled" - localMeshLogInterval = "gossipsub-local-mesh-logging-interval" - scoreTracerInterval = "gossipsub-score-tracer-interval" + peerScoring = "gossipsub-peer-scoring-enabled" + localMeshLogInterval = "gossipsub-local-mesh-logging-interval" + rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size" + scoreTracerInterval = "gossipsub-score-tracer-interval" // gossipsub validation inspector gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size" validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers" @@ -66,7 +67,7 @@ func AllFlagNames() []string { return []string{ networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay, dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio, - fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, scoreTracerInterval, + fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage, ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval, @@ -107,6 +108,7 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) { flags.Bool(peerScoring, config.GossipSubConfig.PeerScoring, "enabling peer scoring on pubsub network") flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub") flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable") + flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.") // gossipsub RPC control message validation limits used for validation configuration and rate limiting flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers") flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.") diff --git a/network/p2p/inspector/internal/cache/cache.go b/network/p2p/inspector/internal/cache/cache.go index 133fd0a9ac7..82d8f781a98 100644 --- a/network/p2p/inspector/internal/cache/cache.go +++ b/network/p2p/inspector/internal/cache/cache.go @@ -40,9 +40,7 @@ type RecordCache struct { // NewRecordCache creates a new *RecordCache. // Args: -// - sizeLimit: the maximum number of records that the cache can hold. -// - logger: the logger used by the cache. -// - collector: the metrics collector used by the cache. +// - config: record cache config. // - recordEntityFactory: a factory function that creates a new spam record. // Returns: // - *RecordCache, the created cache. diff --git a/network/p2p/p2pbuilder/libp2pNodeBuilder.go b/network/p2p/p2pbuilder/libp2pNodeBuilder.go index c0a6412297c..8e550b4fa94 100644 --- a/network/p2p/p2pbuilder/libp2pNodeBuilder.go +++ b/network/p2p/p2pbuilder/libp2pNodeBuilder.go @@ -26,6 +26,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" flownet "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/netconf" "github.com/onflow/flow-go/network/p2p" @@ -494,7 +495,16 @@ func DefaultNodeBuilder( builder.EnableGossipSubPeerScoring(nil) } - meshTracer := tracer.NewGossipSubMeshTracer(logger, metricsCfg.Metrics, idProvider, gossipCfg.LocalMeshLogInterval) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: metricsCfg.Metrics, + IDProvider: idProvider, + LoggerInterval: gossipCfg.LocalMeshLogInterval, + RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork), + RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) + builder.SetGossipSubTracer(meshTracer) builder.SetGossipSubScoreTracerInterval(gossipCfg.ScoreTracerInterval) diff --git a/network/p2p/p2pconf/gossipsub.go b/network/p2p/p2pconf/gossipsub.go index f9155129efd..d297f5cba8b 100644 --- a/network/p2p/p2pconf/gossipsub.go +++ b/network/p2p/p2pconf/gossipsub.go @@ -21,6 +21,8 @@ type GossipSubConfig struct { LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"` // ScoreTracerInterval is the interval at which the score tracer logs the peer scores. ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"` + // RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer. + RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"` // PeerScoring is whether to enable GossipSub peer scoring. PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"` } diff --git a/network/p2p/tracer/gossipSubMeshTracer.go b/network/p2p/tracer/gossipSubMeshTracer.go index 7cd4dd2b692..1cc25fd2565 100644 --- a/network/p2p/tracer/gossipSubMeshTracer.go +++ b/network/p2p/tracer/gossipSubMeshTracer.go @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/network/p2p" + "github.com/onflow/flow-go/network/p2p/tracer/internal" "github.com/onflow/flow-go/utils/logging" ) @@ -43,23 +44,35 @@ type GossipSubMeshTracer struct { idProvider module.IdentityProvider loggerInterval time.Duration metrics module.GossipSubLocalMeshMetrics + rpcSentTracker *internal.RPCSentTracker } var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil) -func NewGossipSubMeshTracer( - logger zerolog.Logger, - metrics module.GossipSubLocalMeshMetrics, - idProvider module.IdentityProvider, - loggerInterval time.Duration) *GossipSubMeshTracer { +type GossipSubMeshTracerConfig struct { + Logger zerolog.Logger + Metrics module.GossipSubLocalMeshMetrics + IDProvider module.IdentityProvider + LoggerInterval time.Duration + RpcSentTrackerCacheCollector module.HeroCacheMetrics + RpcSentTrackerCacheSize uint32 +} +// NewGossipSubMeshTracer creates a new *GossipSubMeshTracer. +// Args: +// - *GossipSubMeshTracerConfig: the mesh tracer config. +// Returns: +// - *GossipSubMeshTracer: new mesh tracer. +func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer { + rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector) g := &GossipSubMeshTracer{ RawTracer: NewGossipSubNoopTracer(), topicMeshMap: make(map[string]map[peer.ID]struct{}), - idProvider: idProvider, - metrics: metrics, - logger: logger.With().Str("component", "gossip_sub_topology_tracer").Logger(), - loggerInterval: loggerInterval, + idProvider: config.IDProvider, + metrics: config.Metrics, + logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(), + loggerInterval: config.LoggerInterval, + rpcSentTracker: rpcSentTracker, } g.Component = component.NewComponentManagerBuilder(). @@ -139,6 +152,15 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) { lg.Info().Hex("flow_id", logging.ID(id.NodeID)).Str("role", id.Role.String()).Msg("pruned peer") } +// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent. +// This function can be updated to track other control messages in the future as required. +func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) { + switch { + case len(rpc.GetControl().GetIhave()) > 0: + t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + } +} + // logLoop logs the mesh peers of the local node for each topic at a regular interval. func (t *GossipSubMeshTracer) logLoop(ctx irrecoverable.SignalerContext) { ticker := time.NewTicker(t.loggerInterval) diff --git a/network/p2p/tracer/gossipSubMeshTracer_test.go b/network/p2p/tracer/gossipSubMeshTracer_test.go index fc14b280282..a2da0584f94 100644 --- a/network/p2p/tracer/gossipSubMeshTracer_test.go +++ b/network/p2p/tracer/gossipSubMeshTracer_test.go @@ -11,8 +11,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/onflow/flow-go/config" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" "github.com/onflow/flow-go/network/p2p" @@ -29,6 +31,8 @@ import ( // One of the nodes is running with an unknown peer id, which the identity provider is mocked to return an error and // the mesh tracer should log a warning message. func TestGossipSubMeshTracer(t *testing.T) { + defaultConfig, err := config.DefaultConfig() + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx) sporkId := unittest.IdentifierFixture() @@ -61,7 +65,15 @@ func TestGossipSubMeshTracer(t *testing.T) { // we only need one node with a meshTracer to test the meshTracer. // meshTracer logs at 1 second intervals for sake of testing. collector := mockmodule.NewGossipSubLocalMeshMetrics(t) - meshTracer := tracer.NewGossipSubMeshTracer(logger, collector, idProvider, 1*time.Second) + meshTracerCfg := &tracer.GossipSubMeshTracerConfig{ + Logger: logger, + Metrics: collector, + IDProvider: idProvider, + LoggerInterval: time.Second, + RpcSentTrackerCacheCollector: metrics.NewNoopCollector(), + RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, + } + meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg) tracerNode, tracerId := p2ptest.NodeFixture( t, sporkId, diff --git a/network/p2p/tracer/internal/cache.go b/network/p2p/tracer/internal/cache.go new file mode 100644 index 00000000000..b916133b270 --- /dev/null +++ b/network/p2p/tracer/internal/cache.go @@ -0,0 +1,86 @@ +package internal + +import ( + "fmt" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata" + "github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool" + "github.com/onflow/flow-go/module/mempool/stdmap" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// rpcCtrlMsgSentCacheConfig configuration for the rpc sent cache. +type rpcCtrlMsgSentCacheConfig struct { + logger zerolog.Logger + sizeLimit uint32 + collector module.HeroCacheMetrics +} + +// rpcSentCache cache that stores rpcSentEntity. These entity's represent RPC control messages sent from the local node. +type rpcSentCache struct { + // c is the underlying cache. + c *stdmap.Backend +} + +// newRPCSentCache creates a new *rpcSentCache. +// Args: +// - config: record cache config. +// Returns: +// - *rpcSentCache: the created cache. +// Note that this cache is intended to track control messages sent by the local node, +// it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked. +func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache { + backData := herocache.NewCache(config.sizeLimit, + herocache.DefaultOversizeFactor, + heropool.LRUEjection, + config.logger.With().Str("mempool", "gossipsub-rpc-control-messages-sent").Logger(), + config.collector) + return &rpcSentCache{ + c: stdmap.NewBackend(stdmap.WithBackData(backData)), + } +} + +// add initializes the record cached for the given messageEntityID if it does not exist. +// Returns true if the record is initialized, false otherwise (i.e.: the record already exists). +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - bool: true if the record is initialized, false otherwise (i.e.: the record already exists). +// Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the +// subsequent calls return false and do not change the record (i.e.: the record is not re-initialized). +func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType)) +} + +// has checks if the RPC message has been cached indicating it has been sent. +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - bool: true if the RPC has been cache indicating it was sent from the local node. +func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool { + return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType)) +} + +// size returns the number of records in the cache. +func (r *rpcSentCache) size() uint { + return r.c.Size() +} + +// rpcSentEntityID creates an entity ID from the topic, messageID and control message type. +// Args: +// - topic: the topic ID. +// - messageId: the message ID. +// - controlMsgType: the rpc control message type. +// Returns: +// - flow.Identifier: the entity ID. +func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier { + return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType))) +} diff --git a/network/p2p/tracer/internal/cache_test.go b/network/p2p/tracer/internal/cache_test.go new file mode 100644 index 00000000000..c92b42b5e02 --- /dev/null +++ b/network/p2p/tracer/internal/cache_test.go @@ -0,0 +1,122 @@ +package internal + +import ( + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestCache_Add tests the add method of the rpcSentCache. +// It ensures that the method returns true when a new record is initialized +// and false when an existing record is initialized. +func TestCache_Add(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageID1 := unittest.IdentifierFixture().String() + messageID2 := unittest.IdentifierFixture().String() + + // test initializing a record for an ID that doesn't exist in the cache + initialized := cache.add(topic, messageID1, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + + // test initializing a record for an ID that already exists in the cache + initialized = cache.add(topic, messageID1, controlMsgType) + require.False(t, initialized, "expected record not to be initialized") + require.True(t, cache.has(topic, messageID1, controlMsgType), "expected record to exist") + + // test initializing a record for another ID + initialized = cache.add(topic, messageID2, controlMsgType) + require.True(t, initialized, "expected record to be initialized") + require.True(t, cache.has(topic, messageID2, controlMsgType), "expected record to exist") +} + +// TestCache_ConcurrentInit tests the concurrent initialization of records. +// The test covers the following scenarios: +// 1. Multiple goroutines initializing records for different ids. +// 2. Ensuring that all records are correctly initialized. +func TestCache_ConcurrentAdd(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageIds := unittest.IdentifierListFixture(10) + + var wg sync.WaitGroup + wg.Add(len(messageIds)) + + for _, id := range messageIds { + go func(id flow.Identifier) { + defer wg.Done() + cache.add(topic, id.String(), controlMsgType) + }(id) + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that all records are correctly initialized + for _, id := range messageIds { + require.True(t, cache.has(topic, id.String(), controlMsgType)) + } +} + +// TestCache_ConcurrentSameRecordInit tests the concurrent initialization of the same record. +// The test covers the following scenarios: +// 1. Multiple goroutines attempting to initialize the same record concurrently. +// 2. Only one goroutine successfully initializes the record, and others receive false on initialization. +// 3. The record is correctly initialized in the cache and can be retrieved using the Get method. +func TestCache_ConcurrentSameRecordAdd(t *testing.T) { + cache := cacheFixture(t, 100, zerolog.Nop(), metrics.NewNoopCollector()) + controlMsgType := p2pmsg.CtrlMsgIHave + topic := channels.PushBlocks.String() + messageID := unittest.IdentifierFixture().String() + const concurrentAttempts = 10 + + var wg sync.WaitGroup + wg.Add(concurrentAttempts) + + successGauge := atomic.Int32{} + + for i := 0; i < concurrentAttempts; i++ { + go func() { + defer wg.Done() + initSuccess := cache.add(topic, messageID, controlMsgType) + if initSuccess { + successGauge.Inc() + } + }() + } + + unittest.RequireReturnsBefore(t, wg.Wait, 100*time.Millisecond, "timed out waiting for goroutines to finish") + + // ensure that only one goroutine successfully initialized the record + require.Equal(t, int32(1), successGauge.Load()) + + // ensure that the record is correctly initialized in the cache + require.True(t, cache.has(topic, messageID, controlMsgType)) +} + +// cacheFixture returns a new *RecordCache. +func cacheFixture(t *testing.T, sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *rpcSentCache { + config := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + r := newRPCSentCache(config) + // expect cache to be empty + require.Equalf(t, uint(0), r.size(), "cache size must be 0") + require.NotNil(t, r) + return r +} diff --git a/network/p2p/tracer/internal/rpc_send_entity.go b/network/p2p/tracer/internal/rpc_send_entity.go new file mode 100644 index 00000000000..b3f56ce0b55 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_send_entity.go @@ -0,0 +1,37 @@ +package internal + +import ( + "github.com/onflow/flow-go/model/flow" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// rpcSentEntity struct representing an RPC control message sent from local node. +// This struct implements the flow.Entity interface and uses messageID field deduplication. +type rpcSentEntity struct { + // messageID the messageID of the rpc control message. + messageID flow.Identifier + // controlMsgType the control message type. + controlMsgType p2pmsg.ControlMessageType +} + +var _ flow.Entity = (*rpcSentEntity)(nil) + +// ID returns the node ID of the sender, which is used as the unique identifier of the entity for maintenance and +// deduplication purposes in the cache. +func (r rpcSentEntity) ID() flow.Identifier { + return r.messageID +} + +// Checksum returns the node ID of the sender, it does not have any purpose in the cache. +// It is implemented to satisfy the flow.Entity interface. +func (r rpcSentEntity) Checksum() flow.Identifier { + return r.messageID +} + +// newRPCSentEntity returns a new rpcSentEntity. +func newRPCSentEntity(id flow.Identifier, controlMessageType p2pmsg.ControlMessageType) rpcSentEntity { + return rpcSentEntity{ + messageID: id, + controlMsgType: controlMessageType, + } +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker.go b/network/p2p/tracer/internal/rpc_sent_tracker.go new file mode 100644 index 00000000000..6d44ac984a3 --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker.go @@ -0,0 +1,47 @@ +package internal + +import ( + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/module" + p2pmsg "github.com/onflow/flow-go/network/p2p/message" +) + +// RPCSentTracker tracks RPC messages that are sent. +type RPCSentTracker struct { + cache *rpcSentCache +} + +// NewRPCSentTracker returns a new *NewRPCSentTracker. +func NewRPCSentTracker(logger zerolog.Logger, sizeLimit uint32, collector module.HeroCacheMetrics) *RPCSentTracker { + config := &rpcCtrlMsgSentCacheConfig{ + sizeLimit: sizeLimit, + logger: logger, + collector: collector, + } + return &RPCSentTracker{cache: newRPCSentCache(config)} +} + +// OnIHaveRPCSent caches a unique entity message ID for each message ID included in each rpc iHave control message. +// Args: +// - *pubsub.RPC: the rpc sent. +func (t *RPCSentTracker) OnIHaveRPCSent(iHaves []*pb.ControlIHave) { + controlMsgType := p2pmsg.CtrlMsgIHave + for _, iHave := range iHaves { + topicID := iHave.GetTopicID() + for _, messageID := range iHave.GetMessageIDs() { + t.cache.add(topicID, messageID, controlMsgType) + } + } +} + +// WasIHaveRPCSent checks if an iHave control message with the provided message ID was sent. +// Args: +// - string: the topic ID of the iHave RPC. +// - string: the message ID of the iHave RPC. +// Returns: +// - bool: true if the iHave rpc with the provided message ID was sent. +func (t *RPCSentTracker) WasIHaveRPCSent(topicID, messageID string) bool { + return t.cache.has(topicID, messageID, p2pmsg.CtrlMsgIHave) +} diff --git a/network/p2p/tracer/internal/rpc_sent_tracker_test.go b/network/p2p/tracer/internal/rpc_sent_tracker_test.go new file mode 100644 index 00000000000..7b9c4ec9acb --- /dev/null +++ b/network/p2p/tracer/internal/rpc_sent_tracker_test.go @@ -0,0 +1,89 @@ +package internal + +import ( + "testing" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/config" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestNewRPCSentTracker ensures *RPCSenTracker is created as expected. +func TestNewRPCSentTracker(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) +} + +// TestRPCSentTracker_IHave ensures *RPCSentTracker tracks sent iHave control messages as expected. +func TestRPCSentTracker_IHave(t *testing.T) { + tracker := mockTracker(t) + require.NotNil(t, tracker) + + t.Run("WasIHaveRPCSent should return false for iHave message Id that has not been tracked", func(t *testing.T) { + require.False(t, tracker.WasIHaveRPCSent("topic_id", "message_id")) + }) + + t.Run("WasIHaveRPCSent should return true for iHave message after it is tracked with OnIHaveRPCSent", func(t *testing.T) { + numOfMsgIds := 100 + testCases := []struct { + topic string + messageIDS []string + }{ + {channels.PushBlocks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.ReceiveApprovals.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.SyncCommittee.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + {channels.RequestChunks.String(), unittest.IdentifierListFixture(numOfMsgIds).Strings()}, + } + iHaves := make([]*pb.ControlIHave, len(testCases)) + for i, testCase := range testCases { + testCase := testCase + iHaves[i] = &pb.ControlIHave{ + TopicID: &testCase.topic, + MessageIDs: testCase.messageIDS, + } + } + rpc := rpcFixture(withIhaves(iHaves)) + tracker.OnIHaveRPCSent(rpc.GetControl().GetIhave()) + + for _, testCase := range testCases { + for _, messageID := range testCase.messageIDS { + require.True(t, tracker.WasIHaveRPCSent(testCase.topic, messageID)) + } + } + }) +} + +func mockTracker(t *testing.T) *RPCSentTracker { + logger := zerolog.Nop() + cfg, err := config.DefaultConfig() + require.NoError(t, err) + collector := metrics.NewNoopCollector() + tracker := NewRPCSentTracker(logger, cfg.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize, collector) + return tracker +} + +type rpcFixtureOpt func(*pubsub.RPC) + +func withIhaves(iHave []*pb.ControlIHave) rpcFixtureOpt { + return func(rpc *pubsub.RPC) { + rpc.Control.Ihave = iHave + } +} + +func rpcFixture(opts ...rpcFixtureOpt) *pubsub.RPC { + rpc := &pubsub.RPC{ + RPC: pb.RPC{ + Control: &pb.ControlMessage{}, + }, + } + for _, opt := range opts { + opt(rpc) + } + return rpc +}