Skip to content

Commit

Permalink
move NotEjected filter to filter package
Browse files Browse the repository at this point in the history
- add filter for valid protocol participant
  • Loading branch information
kc1116 committed Apr 1, 2024
1 parent 18f7aec commit c82ffbc
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ func (builder *FlowAccessNodeBuilder) InitIDProviders() {
filter.And(
filter.HasRole[flow.Identity](flow.RoleConsensus),
filter.Not(filter.HasNodeID[flow.Identity](node.Me.NodeID())),
underlay.NotEjectedFilter,
filter.NotEjectedFilter,
),
builder.IdentityProvider,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,7 @@ func (fnb *FlowNodeBuilder) InitIDProviders() {
filter.And(
filter.HasRole[flow.Identity](flow.RoleConsensus),
filter.Not(filter.HasNodeID[flow.Identity](node.Me.NodeID())),
underlay.NotEjectedFilter,
filter.NotEjectedFilter,
),
node.IdentityProvider,
)
Expand Down
19 changes: 10 additions & 9 deletions cmd/util/cmd/epochs/cmd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/onflow/flow-go/cmd/util/cmd/common"
epochcmdutil "github.com/onflow/flow-go/cmd/util/cmd/epochs/utils"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol/inmem"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func generateRecoverEpochTxArgs(getSnapshot func() *inmem.Snapshot) func(cmd *co
// extractResetEpochArgs extracts the required transaction arguments for the `resetEpoch` transaction
func extractRecoverEpochArgs(snapshot *inmem.Snapshot) []cadence.Value {
epoch := snapshot.Epochs().Current()
ids, err := epoch.InitialIdentities()
ids, err := snapshot.Identities(filter.IsValidProtocolParticipant)
if err != nil {
log.Fatal().Err(err).Msg("failed to get initial identities for current epoch")
}
Expand Down Expand Up @@ -156,24 +157,24 @@ func extractRecoverEpochArgs(snapshot *inmem.Snapshot) []cadence.Value {

dkgPubKeys := make([]cadence.Value, 0)
nodeIds := make([]cadence.Value, 0)
ids.Map(func(skeleton flow.IdentitySkeleton) flow.IdentitySkeleton {
if skeleton.GetRole() == flow.RoleConsensus {
dkgPubKey, keyShareErr := currentEpochDKG.KeyShare(skeleton.GetNodeID())
ids.Map(func(identity flow.Identity) flow.Identity {
if identity.GetRole() == flow.RoleConsensus {
dkgPubKey, keyShareErr := currentEpochDKG.KeyShare(identity.GetNodeID())
if keyShareErr != nil {
log.Fatal().Err(keyShareErr).Msg(fmt.Sprintf("failed to get dkg pub key share for node: %s", skeleton.GetNodeID()))
log.Fatal().Err(keyShareErr).Msg(fmt.Sprintf("failed to get dkg pub key share for node: %s", identity.GetNodeID()))
}
dkgPubKeyCdc, cdcErr := cadence.NewString(dkgPubKey.String())
if cdcErr != nil {
log.Fatal().Err(cdcErr).Msg(fmt.Sprintf("failed to get dkg pub key cadence string for node: %s", skeleton.GetNodeID()))
log.Fatal().Err(cdcErr).Msg(fmt.Sprintf("failed to get dkg pub key cadence string for node: %s", identity.GetNodeID()))
}
dkgPubKeys = append(dkgPubKeys, dkgPubKeyCdc)
}
nodeIdCdc, err := cadence.NewString(skeleton.GetNodeID().String())
nodeIdCdc, err := cadence.NewString(identity.GetNodeID().String())
if err != nil {
log.Fatal().Err(err).Msg(fmt.Sprintf("failed to convert node ID to cadence string: %s", skeleton.GetNodeID()))
log.Fatal().Err(err).Msg(fmt.Sprintf("failed to convert node ID to cadence string: %s", identity.GetNodeID()))
}
nodeIds = append(nodeIds, nodeIdCdc)
return skeleton
return identity
})

// @TODO: cluster qcs are converted into flow.ClusterQCVoteData types,
Expand Down
17 changes: 17 additions & 0 deletions model/flow/filter/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,20 @@ var IsVotingConsensusCommitteeMember = And[flow.Identity](
// equivalent to the filter for consensus committee members, as these are
// the same group for now.
var IsValidDKGParticipant = IsConsensusCommitteeMember

// NotEjectedFilter is an identity filter for peers that are not ejected.
var NotEjectedFilter = Not(HasParticipationStatus(flow.EpochParticipationStatusEjected))

// HasWeightGreaterThanZero returns a filter for nodes with a weight greater than zero.
func HasWeightGreaterThanZero[T flow.GenericIdentity](identity *T) bool {
return (*identity).GetInitialWeight() > 0
}

// IsValidProtocolParticipant is an identity filter for all valid protocol participants.
// A protocol participant is considered valid if and only if the following are both true.
// 1. The node is not ejected.
// 2. The node has a weight greater than 0.
var IsValidProtocolParticipant = And[flow.Identity](
NotEjectedFilter, // enforces 1
HasWeightGreaterThanZero[flow.Identity], // enforces 2
)
3 changes: 1 addition & 2 deletions network/p2p/cache/node_blocklist_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/mocknetwork"
"github.com/onflow/flow-go/network/p2p/cache"
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -177,7 +176,7 @@ func (s *NodeDisallowListWrapperTestSuite) TestDisallowListNode() {

s.provider.On("Identities", mock.Anything).Return(combinedIdentities)

identities := s.wrapper.Identities(underlay.NotEjectedFilter)
identities := s.wrapper.Identities(filter.NotEjectedFilter)

require.Equal(s.T(), len(honestIdentities), len(identities)) // expected only honest nodes to be returned
for _, i := range identities {
Expand Down
10 changes: 1 addition & 9 deletions network/underlay/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ var (
ErrUnicastMsgWithoutSub = errors.New("networking layer does not have subscription for the channel ID indicated in the unicast message received")
)

// NotEjectedFilter is an identity filter that, when applied to the identity
// table at a given snapshot, returns all nodes that we should communicate with
// over the networking layer.
//
// NOTE: The protocol state includes nodes from the previous/next epoch that should
// be included in network communication. We omit any nodes that have been ejected.
var NotEjectedFilter = filter.Not(filter.HasParticipationStatus(flow.EpochParticipationStatusEjected))

// Network serves as the comprehensive networking layer that integrates three interfaces within Flow; Underlay, EngineRegistry, and ConduitAdapter.
// It is responsible for creating conduits through which engines can send and receive messages to and from other engines on the network, as well as registering other services
// such as BlobService and PingService. It also provides a set of APIs that can be used to send messages to other nodes on the network.
Expand Down Expand Up @@ -545,7 +537,7 @@ func (n *Network) UnRegisterChannel(channel channels.Channel) error {
}

func (n *Network) Identities() flow.IdentityList {
return n.identityProvider.Identities(NotEjectedFilter)
return n.identityProvider.Identities(filter.NotEjectedFilter)
}

func (n *Network) Identity(pid peer.ID) (*flow.Identity, bool) {
Expand Down

0 comments on commit c82ffbc

Please sign in to comment.