Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Networking] GossipSub iWant Flooding Mitigation #4574

Merged
merged 49 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0093a47
add iWant control message validation
kc1116 Jul 19, 2023
41ef40b
Update component.go
kc1116 Jul 25, 2023
5cf8b9d
Merge branch 'master' of github.com:onflow/flow-go into khalil/6472-i…
kc1116 Jul 25, 2023
322aee5
Update network/p2p/inspector/validation/control_message_validation_in…
kc1116 Aug 1, 2023
03a4423
add rpc control tracking to control message validation inspector
kc1116 Aug 16, 2023
85f63c6
distribute invalid control message notification for iwant errs
kc1116 Aug 16, 2023
3e99842
sample all message IDs
kc1116 Aug 16, 2023
2e6c88a
add duplicate messageID and cache miss threshold tests
kc1116 Aug 16, 2023
b745ea3
Delete export_report.json
kc1116 Aug 16, 2023
acf9486
Merge branch 'master' into khalil/6472-iwant-flooding-detection
kc1116 Aug 16, 2023
3ff2eaf
update WithIWant godoc and var names
kc1116 Aug 22, 2023
eccb7a1
allow a threshold of duplicate message ids
kc1116 Aug 22, 2023
38d5caf
update threshold comparisons
kc1116 Aug 22, 2023
704a20d
Update network/p2p/inspector/validation/control_message_validation_in…
kc1116 Aug 22, 2023
26a7159
Update network/p2p/inspector/validation/errors.go
kc1116 Aug 22, 2023
c815a12
Update network/p2p/inspector/validation/errors.go
kc1116 Aug 22, 2023
9c09799
Merge branch 'khalil/6472-iwant-flooding-detection' of github.com:onf…
kc1116 Aug 22, 2023
d9c7970
add round trip tests for new errors
kc1116 Aug 22, 2023
bb87e90
add happy path tests for util dup str tracker
kc1116 Aug 22, 2023
e3f7fb8
Update network/p2p/tracer/internal/rpc_sent_tracker.go
kc1116 Aug 22, 2023
3b7aa43
Merge branch 'khalil/6472-iwant-flooding-detection' of github.com:onf…
kc1116 Aug 22, 2023
7721c00
distribute notif only for expected errors
kc1116 Aug 22, 2023
48b8fa1
Update rpc_sent_tracker.go
kc1116 Aug 22, 2023
c0b999a
Merge branch 'master' into khalil/6472-iwant-flooding-detection
kc1116 Aug 22, 2023
848a708
add trace level log to improve debugging
kc1116 Aug 24, 2023
efdeb97
add debug log with cache miss and duplicate counts
kc1116 Aug 24, 2023
c9e1dab
fix flag
kc1116 Aug 24, 2023
fb521fc
add logging.KeyNetworkingSecurity to all error logs
kc1116 Aug 24, 2023
ba7406e
Update config/default-config.yml
kc1116 Aug 24, 2023
6d226ad
Update insecure/corruptlibp2p/fixtures.go
kc1116 Aug 24, 2023
acb84a0
rename inspectDisseminatedNotif -> inspectDisseminatedNotifyFunc
kc1116 Aug 24, 2023
4090cb0
move utils to utils file
kc1116 Aug 24, 2023
45e77ff
Update validation_inspector_test.go
kc1116 Aug 24, 2023
a6d9eaa
move utils to utils file
kc1116 Aug 24, 2023
130fc6b
Merge branch 'khalil/6472-iwant-flooding-detection' of github.com:onf…
kc1116 Aug 24, 2023
e249547
Update network/p2p/inspector/validation/control_message_validation_in…
kc1116 Aug 24, 2023
1f71d13
Merge branch 'master' into khalil/6472-iwant-flooding-detection
kc1116 Aug 24, 2023
eb6434d
fix test
kc1116 Aug 24, 2023
e530edc
Merge branch 'master' into khalil/6472-iwant-flooding-detection
kc1116 Aug 24, 2023
897cb98
remove old test
kc1116 Aug 24, 2023
96c1d4f
improve geometric test flakiness
kc1116 Aug 27, 2023
2e3a710
improve echoengine test flakiness
kc1116 Aug 27, 2023
a865809
add 500 millisecond sleep before lauching second batch of goroutines
kc1116 Aug 27, 2023
88ab754
quarantine flaky tests
kc1116 Aug 28, 2023
8150c52
Update decay_test.go
kc1116 Aug 28, 2023
367b94c
Update decay_test.go
kc1116 Aug 28, 2023
37cfa4d
Update echoengine_test.go
kc1116 Aug 29, 2023
7c1d963
Update validation_inspector_test.go
kc1116 Aug 29, 2023
a4fcdb5
Update validation_inspector_test.go
kc1116 Aug 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ network-config:
ihave-async-inspection-sample-size-percentage: .10
# Max number of ihave messages in a sample to be inspected
ihave-max-sample-size: 100

# Max number of iwant messages in a sample to be inspected
gossipsub-rpc-iwant-max-sample-size: 1_000_000
# The allowed threshold of iWant messages receievd without a corresponding tracked iHave message that was sent
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
gossipsub-rpc-iwant-cache-miss-threshold: .5

# RPC metrics observer inspector configs
# The number of metrics inspector pool workers
gossipsub-rpc-metrics-inspector-workers: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestValidationInspector_DuplicateTopicId_Detection(t *testing.T) {
notification, ok := args[0].(*p2p.InvCtrlMsgNotif)
require.True(t, ok)
require.Equal(t, spammer.SpammerNode.Host().ID(), notification.PeerID)
require.True(t, validation.IsErrDuplicateTopic(notification.Err))
require.True(t, validation.IsDuplicateFoundErr(notification.Err))
require.Equal(t, messageCount, notification.Count)
switch notification.MsgType {
case p2pmsg.CtrlMsgGraft:
Expand Down
7 changes: 7 additions & 0 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@
metricsInspectorNumberOfWorkers = "gossipsub-rpc-metrics-inspector-workers"
metricsInspectorCacheSize = "gossipsub-rpc-metrics-inspector-cache-size"

iwantMaxSampleSize = "gossipsub-rpc-iwant-max-sample-size"
iwantCacheMissThreshold = "gossipsub-rpc-iwant-cache-miss-threshold"

alspDisabled = "alsp-disable-penalty"
alspSpamRecordCacheSize = "alsp-spam-record-cache-size"
alspSpamRecordQueueSize = "alsp-spam-report-queue-size"
Expand All @@ -73,6 +76,7 @@
scoreTracerInterval, gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage,
ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval,
iwantMaxSampleSize, iwantCacheMissThreshold,
}
}

Expand Down Expand Up @@ -133,6 +137,9 @@
flags.Float64(ihaveSyncSampleSizePercentage, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveSyncInspectSampleSizePercentage, "percentage of ihave messages to sample during synchronous validation")
flags.Float64(ihaveAsyncSampleSizePercentage, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveAsyncInspectSampleSizePercentage, "percentage of ihave messages to sample during asynchronous validation")
flags.Float64(ihaveMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IHaveInspectionMaxSampleSize, "max number of ihaves to sample when performing validation")

flags.Uint64(iwantMaxSampleSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize, "max number of ihaves to sample when performing validation")

Check failure on line 141 in network/netconf/flags.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize (variable of type uint) as uint64 value in argument to flags.Uint64) (typecheck)

Check failure on line 141 in network/netconf/flags.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize (variable of type uint) as uint64 value in argument to flags.Uint64) (typecheck)

Check failure on line 141 in network/netconf/flags.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize (variable of type uint) as uint64 value in argument to flags.Uint64) (typecheck)

Check failure on line 141 in network/netconf/flags.go

View workflow job for this annotation

GitHub Actions / Lint (./)

cannot use config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.MaxSampleSize (variable of type uint) as uint64 value in argument to flags.Uint64 (typecheck)
flags.Float64(iwantCacheMissThreshold, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.IWantRPCInspectionConfig.CacheMissThreshold, "max number of ihaves to sample when performing validation")
}

// rpcInspectorValidationLimits utility func that adds flags for each of the validation limits for each control message type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ControlMsgValidationInspector struct {
tracker *cache.ClusterPrefixedMessagesReceivedTracker
idProvider module.IdentityProvider
rateLimiters map[p2pmsg.ControlMessageType]p2p.BasicRateLimiter
rpcTracker p2p.RPCControlTracking
}

var _ component.Component = (*ControlMsgValidationInspector)(nil)
Expand Down Expand Up @@ -196,6 +197,57 @@ func (c *ControlMsgValidationInspector) Inspect(from peer.ID, rpc *pubsub.RPC) e
return nil
}

// inspectIWant inspects RPC iWant control messages. This func will sample the iWants and perform validation on each iWant in the sample.
// Ensuring that the following are true:
// - Each iWant corresponds to an iHave that was sent.
// - Each topic in the iWant sample is a valid topic.
// If the number of iWants that do not have a corresponding iHave exceed the configured threshold an error is returned.
// Args:
// - iWant: the list of iWant control messages.
// Returns:
// - DuplicateFoundErr: if there are any duplicate message ids found in across any of the iWants.
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
// - IWantCacheMissThresholdErr: if the rate of cache misses exceeds the configured allowed threshold.
// - error: if any error occurs while sampling the iWants
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
func (c *ControlMsgValidationInspector) inspectIWant(iWants []*pubsub_pb.ControlIWant) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is better to also receive the peer.ID of the sender of iWants and use the peer.ID in all the logs and errors generated by the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sampleSize := uint(10 * c.rpcTracker.LastHighestIHaveRPCSize())
if sampleSize == 0 || sampleSize > c.config.IWantRPCInspectionConfig.MaxSampleSize {
sampleSize = c.config.IWantRPCInspectionConfig.MaxSampleSize
}
kc1116 marked this conversation as resolved.
Show resolved Hide resolved

swap := func(i, j uint) {
iWants[i], iWants[j] = iWants[j], iWants[i]
}
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
err := c.sampleCtrlMessages(uint(len(iWants)), sampleSize, swap)
if err != nil {
return fmt.Errorf("failed to sample iwant messages: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine and is a fail-safe approach, i.e., we don't want to continue the execution if we cannot sample a control message properly. But please note that whatever error a worker processFunc returns will result in the node to crash, hence we may need to be more cautious on what to return as error and what to log.


tracker := make(duplicateStrTracker)
cacheMisses := float64(0)
totalMessageIDS := float64(0)
for i := uint(0); i < sampleSize; i++ {
iWant := iWants[i]
for _, messageID := range iWant.GetMessageIDs() {
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
if tracker.isDuplicate(messageID) {
return NewDuplicateFoundErr(fmt.Errorf("duplicate message ID found: %s", messageID))
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
}

if !c.rpcTracker.WasIHaveRPCSent(messageID) {
cacheMisses++
}
tracker.set(messageID)
totalMessageIDS++
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before returning the method, please add a debug level log logging the previous log's content as well as the duplicate and cacheMisses counters. It would be instrumental for forensics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// check cache miss rate
if cacheMisses/totalMessageIDS > c.config.IWantRPCInspectionConfig.CacheMissThreshold {
return NewIWantCacheMissThresholdErr(cacheMisses, totalMessageIDS, c.config.IWantRPCInspectionConfig.CacheMissThreshold)
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

// Name returns the name of the rpc inspector.
func (c *ControlMsgValidationInspector) Name() string {
return rpcInspectorComponentName
Expand Down Expand Up @@ -267,9 +319,16 @@ func (c *ControlMsgValidationInspector) blockingIHaveSamplePreprocessing(from pe
// If the RPC control message count exceeds the configured hard threshold we perform synchronous topic validation on a subset
// of the control messages. This is used for control message types that do not have an upper bound on the amount of messages a node can send.
func (c *ControlMsgValidationInspector) blockingPreprocessingSampleRpc(from peer.ID, validationConfig *p2pconf.CtrlMsgValidationConfig, controlMessage *pubsub_pb.ControlMessage, sampleSize uint) error {
if validationConfig.ControlMsg != p2pmsg.CtrlMsgIHave && validationConfig.ControlMsg != p2pmsg.CtrlMsgIWant {
return fmt.Errorf("unexpected control message type %s encountered during blocking pre-processing sample rpc, expected %s or %s", validationConfig.ControlMsg, p2pmsg.CtrlMsgIHave, p2pmsg.CtrlMsgIWant)
if validationConfig.ControlMsg != p2pmsg.CtrlMsgIHave {
return fmt.Errorf("unexpected control message type %s encountered during blocking pre-processing sample rpc, expected %s", validationConfig.ControlMsg, p2pmsg.CtrlMsgIHave)
}

iHaves := controlMessage.GetIhave()
totalIhaves := uint(len(iHaves))
swap := func(i, j uint) {
iHaves[i], iHaves[j] = iHaves[j], iHaves[i]
}

activeClusterIDS := c.tracker.GetActiveClusterIds()
count := c.getCtrlMsgCount(validationConfig.ControlMsg, controlMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have observed that the current implementation of getCtrlMsgCount does not meet our specific requirements. Instead of returning the total count of control message types within the RPC, it should provide the total number of message-ids for a specific message type. Take the iHave message as an example; an RPC may contain several iHave messages, each with a distinct set of message-ids.

This situation creates a problem, particularly in the evaluation of count > validationConfig.HardThreshold. In this context, count is representative of the total number of iHave messages, each containing a distinct message-id count, whereas validationConfig.HardThreshold denotes the total number of iHave message-ids. Consequently, the condition within the if statement is often incorrectly assessed as false in many cases. This inconsistency must be addressed to align the logic with our intended functionality.

I kindly request that you refactor this function to accurately reflect the message-id count for the given message type, aligning with the discussion found at this GitHub comment. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered in the follow up PR that adapts this strategy to the other control messages #4642

lg := c.logger.With().
Expand All @@ -280,7 +339,7 @@ func (c *ControlMsgValidationInspector) blockingPreprocessingSampleRpc(from peer
if count > validationConfig.HardThreshold {
// for iHave control message topic validation we only validate a random subset of the messages
// shuffle the ihave messages to perform random validation on a subset of size sampleSize
err := c.sampleCtrlMessages(p2pmsg.CtrlMsgIHave, controlMessage, sampleSize)
err := c.sampleCtrlMessages(totalIhaves, sampleSize, swap)
if err != nil {
return fmt.Errorf("failed to sample ihave messages: %w", err)
}
Expand All @@ -306,7 +365,7 @@ func (c *ControlMsgValidationInspector) blockingPreprocessingSampleRpc(from peer
// to randomize async validation to avoid data race that can occur when
// performing the sampling asynchronously.
// for iHave control message topic validation we only validate a random subset of the messages
err := c.sampleCtrlMessages(p2pmsg.CtrlMsgIHave, controlMessage, sampleSize)
err := c.sampleCtrlMessages(totalIhaves, sampleSize, swap)
if err != nil {
return fmt.Errorf("failed to sample ihave messages: %w", err)
}
Expand All @@ -315,17 +374,10 @@ func (c *ControlMsgValidationInspector) blockingPreprocessingSampleRpc(from peer

// sampleCtrlMessages performs sampling on the specified control message that will randomize
// the items in the control message slice up to index sampleSize-1.
func (c *ControlMsgValidationInspector) sampleCtrlMessages(ctrlMsgType p2pmsg.ControlMessageType, ctrlMsg *pubsub_pb.ControlMessage, sampleSize uint) error {
switch ctrlMsgType {
case p2pmsg.CtrlMsgIHave:
iHaves := ctrlMsg.GetIhave()
swap := func(i, j uint) {
iHaves[i], iHaves[j] = iHaves[j], iHaves[i]
}
err := flowrand.Samples(uint(len(iHaves)), sampleSize, swap)
if err != nil {
return fmt.Errorf("failed to get random sample of ihave control messages: %w", err)
}
func (c *ControlMsgValidationInspector) sampleCtrlMessages(totalSize, sampleSize uint, swap func(i, j uint)) error {
err := flowrand.Samples(totalSize, sampleSize, swap)
if err != nil {
return fmt.Errorf("failed to get random sample of ihave control messages: %w", err)
}
return nil
}
Expand All @@ -339,6 +391,12 @@ func (c *ControlMsgValidationInspector) processInspectMsgReq(req *InspectMsgRequ
c.metrics.AsyncProcessingFinished(req.validationConfig.ControlMsg.String(), time.Since(start))
}()

// iWant validation uses new sample size validation. This will be updated for all other control message types.
switch req.validationConfig.ControlMsg {
case p2pmsg.CtrlMsgIWant:
return c.inspectIWant(req.ctrlMsg.GetIwant())
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
}

count := c.getCtrlMsgCount(req.validationConfig.ControlMsg, req.ctrlMsg)
lg := c.logger.With().
Str("peer_id", req.Peer.String()).
Expand Down Expand Up @@ -392,7 +450,7 @@ func (c *ControlMsgValidationInspector) getCtrlMsgCount(ctrlMsgType p2pmsg.Contr
// validateTopics ensures all topics in the specified control message are valid flow topic/channel and no duplicate topics exist.
// Expected error returns during normal operations:
// - channels.InvalidTopicErr: if topic is invalid.
// - ErrDuplicateTopic: if a duplicate topic ID is encountered.
// - DuplicateFoundErr: if a duplicate topic ID is encountered.
func (c *ControlMsgValidationInspector) validateTopics(from peer.ID, validationConfig *p2pconf.CtrlMsgValidationConfig, ctrlMsg *pubsub_pb.ControlMessage) error {
activeClusterIDS := c.tracker.GetActiveClusterIds()
switch validationConfig.ControlMsg {
Expand All @@ -413,13 +471,13 @@ func (c *ControlMsgValidationInspector) validateTopics(from peer.ID, validationC

// validateGrafts performs topic validation on all grafts in the control message using the provided validateTopic func while tracking duplicates.
func (c *ControlMsgValidationInspector) validateGrafts(from peer.ID, ctrlMsg *pubsub_pb.ControlMessage, activeClusterIDS flow.ChainIDList) error {
tracker := make(duplicateTopicTracker)
tracker := make(duplicateStrTracker)
for _, graft := range ctrlMsg.GetGraft() {
topic := channels.Topic(graft.GetTopicID())
if tracker.isDuplicate(topic) {
return NewDuplicateTopicErr(topic)
if tracker.isDuplicate(topic.String()) {
return NewDuplicateFoundErr(fmt.Errorf("duplicate topic found: %s", topic.String()))
}
tracker.set(topic)
tracker.set(topic.String())
err := c.validateTopic(from, topic, activeClusterIDS)
if err != nil {
return err
Expand All @@ -430,13 +488,13 @@ func (c *ControlMsgValidationInspector) validateGrafts(from peer.ID, ctrlMsg *pu

// validatePrunes performs topic validation on all prunes in the control message using the provided validateTopic func while tracking duplicates.
func (c *ControlMsgValidationInspector) validatePrunes(from peer.ID, ctrlMsg *pubsub_pb.ControlMessage, activeClusterIDS flow.ChainIDList) error {
tracker := make(duplicateTopicTracker)
tracker := make(duplicateStrTracker)
for _, prune := range ctrlMsg.GetPrune() {
topic := channels.Topic(prune.GetTopicID())
if tracker.isDuplicate(topic) {
return NewDuplicateTopicErr(topic)
if tracker.isDuplicate(topic.String()) {
return NewDuplicateFoundErr(fmt.Errorf("duplicate topic found: %s", topic.String()))
}
tracker.set(topic)
tracker.set(topic.String())
err := c.validateTopic(from, topic, activeClusterIDS)
if err != nil {
return err
Expand All @@ -455,15 +513,15 @@ func (c *ControlMsgValidationInspector) validateIhaves(from peer.ID, validationC
// Sample size ensures liveness of the network when validating messages with no upper bound on the amount of messages that may be received.
// All errors returned from this function can be considered benign.
func (c *ControlMsgValidationInspector) validateTopicsSample(from peer.ID, validationConfig *p2pconf.CtrlMsgValidationConfig, ctrlMsg *pubsub_pb.ControlMessage, activeClusterIDS flow.ChainIDList, sampleSize uint) error {
tracker := make(duplicateTopicTracker)
tracker := make(duplicateStrTracker)
switch validationConfig.ControlMsg {
case p2pmsg.CtrlMsgIHave:
for i := uint(0); i < sampleSize; i++ {
topic := channels.Topic(ctrlMsg.Ihave[i].GetTopicID())
if tracker.isDuplicate(topic) {
return NewDuplicateTopicErr(topic)
if tracker.isDuplicate(topic.String()) {
return NewDuplicateFoundErr(fmt.Errorf("duplicate topic found: %s", topic.String()))
}
tracker.set(topic)
tracker.set(topic.String())
err := c.validateTopic(from, topic, activeClusterIDS)
if err != nil {
return err
Expand Down
44 changes: 33 additions & 11 deletions network/p2p/inspector/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@ import (
p2pmsg "github.com/onflow/flow-go/network/p2p/message"
)

// IWantCacheMissThresholdErr indicates that the amount of cache misses exceeds the allowed threshold.
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
type IWantCacheMissThresholdErr struct {
misses float64
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
totalMessageIDS float64
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
threshold float64
}

func (e IWantCacheMissThresholdErr) Error() string {
return fmt.Sprintf("%f/%f iWant cache misses exceeds the allowed threshold: %f", e.misses, e.totalMessageIDS, e.threshold)
}

// NewIWantCacheMissThresholdErr returns a new IWantCacheMissThresholdErr.
func NewIWantCacheMissThresholdErr(misses, totalMessageIDS, threshold float64) IWantCacheMissThresholdErr {
return IWantCacheMissThresholdErr{misses, totalMessageIDS, threshold}
}

// IsIWantCacheMissThresholdErr returns true if an error is IWantCacheMissThresholdErr
func IsIWantCacheMissThresholdErr(err error) bool {
var e IWantCacheMissThresholdErr
return errors.As(err, &e)
}

// ErrHardThreshold indicates that the amount of RPC messages received exceeds hard threshold.
type ErrHardThreshold struct {
// controlMsg the control message type.
Expand Down Expand Up @@ -53,23 +75,23 @@ func IsErrRateLimitedControlMsg(err error) bool {
return errors.As(err, &e)
}

// ErrDuplicateTopic error that indicates a duplicate topic in control message has been detected.
type ErrDuplicateTopic struct {
topic channels.Topic
// DuplicateFoundErr error that indicates a duplicate has been detected. This can be duplicate topic or message ID tracking.
type DuplicateFoundErr struct {
err error
}

func (e ErrDuplicateTopic) Error() string {
return fmt.Errorf("duplicate topic %s", e.topic).Error()
func (e DuplicateFoundErr) Error() string {
return e.err.Error()
}

// NewDuplicateTopicErr returns a new ErrDuplicateTopic.
func NewDuplicateTopicErr(topic channels.Topic) ErrDuplicateTopic {
return ErrDuplicateTopic{topic: topic}
// NewDuplicateFoundErr returns a new DuplicateFoundErr.
func NewDuplicateFoundErr(err error) DuplicateFoundErr {
return DuplicateFoundErr{err}
}

// IsErrDuplicateTopic returns true if an error is ErrDuplicateTopic.
func IsErrDuplicateTopic(err error) bool {
var e ErrDuplicateTopic
// IsDuplicateFoundErr returns true if an error is DuplicateFoundErr.
func IsDuplicateFoundErr(err error) bool {
var e DuplicateFoundErr
return errors.As(err, &e)
}

Expand Down
15 changes: 8 additions & 7 deletions network/p2p/inspector/validation/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@ func TestErrRateLimitedControlMsgRoundTrip(t *testing.T) {
assert.False(t, IsErrRateLimitedControlMsg(dummyErr), "IsErrRateLimitedControlMsg should return false for non-ErrRateLimitedControlMsg error")
}

// TestErrDuplicateTopicRoundTrip ensures correct error formatting for ErrDuplicateTopic.
// TestErrDuplicateTopicRoundTrip ensures correct error formatting for DuplicateFoundErr.
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved
func TestErrDuplicateTopicRoundTrip(t *testing.T) {
topic := channels.Topic("topic")
err := NewDuplicateTopicErr(topic)
e := fmt.Errorf("duplicate topic found: %s", topic.String())
err := NewDuplicateFoundErr(e)

// tests the error message formatting.
expectedErrMsg := fmt.Errorf("duplicate topic %s", topic).Error()
expectedErrMsg := e.Error()
assert.Equal(t, expectedErrMsg, err.Error(), "the error message should be correctly formatted")

// tests the IsErrDuplicateTopic function.
assert.True(t, IsErrDuplicateTopic(err), "IsErrDuplicateTopic should return true for ErrDuplicateTopic error")
// tests the IsDuplicateFoundErr function.
assert.True(t, IsDuplicateFoundErr(err), "IsDuplicateFoundErr should return true for DuplicateFoundErr error")

// test IsErrDuplicateTopic with a different error type.
// test IsDuplicateFoundErr with a different error type.
dummyErr := fmt.Errorf("dummy error")
assert.False(t, IsErrDuplicateTopic(dummyErr), "IsErrDuplicateTopic should return false for non-ErrDuplicateTopic error")
assert.False(t, IsDuplicateFoundErr(dummyErr), "IsDuplicateFoundErr should return false for non-DuplicateFoundErr error")
}
12 changes: 5 additions & 7 deletions network/p2p/inspector/validation/utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package validation

import "github.com/onflow/flow-go/network/channels"
type duplicateStrTracker map[string]struct{}
yhassanzadeh13 marked this conversation as resolved.
Show resolved Hide resolved

type duplicateTopicTracker map[channels.Topic]struct{}

func (d duplicateTopicTracker) set(topic channels.Topic) {
d[topic] = struct{}{}
func (d duplicateStrTracker) set(s string) {
d[s] = struct{}{}
}

func (d duplicateTopicTracker) isDuplicate(topic channels.Topic) bool {
_, ok := d[topic]
func (d duplicateStrTracker) isDuplicate(s string) bool {
_, ok := d[s]
return ok
}
Loading
Loading