Skip to content

Commit

Permalink
get subsets of internal collectors and partner collectors from snapshot
Browse files Browse the repository at this point in the history
- emit fatal log if identity is present in internal node info from disc but missing from snap shot identities list
  • Loading branch information
kc1116 committed Apr 1, 2024
1 parent c82ffbc commit 321da97
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
4 changes: 2 additions & 2 deletions cmd/bootstrap/cmd/finalize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func TestClusterAssignment(t *testing.T) {

log := zerolog.Nop()
// should not error
_, clusters, err := common.ConstructClusterAssignment(log, partners, internals, int(flagCollectionClusters))
_, clusters, err := common.ConstructClusterAssignment(log, model.ToIdentityList(partners), model.ToIdentityList(internals), int(flagCollectionClusters))
require.NoError(t, err)
require.True(t, checkClusterConstraint(clusters, partners, internals))

// unhappy Path
internals = internals[:21] // reduce one internal node
// should error
_, _, err = common.ConstructClusterAssignment(log, partners, internals, int(flagCollectionClusters))
_, _, err = common.ConstructClusterAssignment(log, model.ToIdentityList(partners), model.ToIdentityList(internals), int(flagCollectionClusters))
require.Error(t, err)
// revert the flag value
flagCollectionClusters = tmp
Expand Down
2 changes: 1 addition & 1 deletion cmd/bootstrap/cmd/rootblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func rootBlock(cmd *cobra.Command, args []string) {
participants := model.ToIdentityList(stakingNodes).Sort(flow.Canonical[flow.Identity])

log.Info().Msg("computing collection node clusters")
assignments, clusters, err := common.ConstructClusterAssignment(log, partnerNodes, internalNodes, int(flagCollectionClusters))
assignments, clusters, err := common.ConstructClusterAssignment(log, model.ToIdentityList(partnerNodes), model.ToIdentityList(internalNodes), int(flagCollectionClusters))
if err != nil {
log.Fatal().Err(err).Msg("unable to generate cluster assignment")
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/util/cmd/common/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
// satisfied, an exception is returned.
// Note that if an exception is returned with a certain number of internal/partner nodes, there is no chance
// of succeeding the assignment by re-running the function without increasing the internal nodes ratio.
func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes []bootstrap.NodeInfo, numCollectionClusters int) (flow.AssignmentList, flow.ClusterList, error) {
func ConstructClusterAssignment(log zerolog.Logger, partnerNodes, internalNodes flow.IdentityList, numCollectionClusters int) (flow.AssignmentList, flow.ClusterList, error) {

partners := bootstrap.ToIdentityList(partnerNodes).Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
internals := bootstrap.ToIdentityList(internalNodes).Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
partners := partnerNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
internals := internalNodes.Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
nCollectors := len(partners) + len(internals)

// ensure we have at least as many collection nodes as clusters
Expand Down
38 changes: 28 additions & 10 deletions cmd/util/cmd/epochs/cmd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,44 @@ func generateRecoverEpochTxArgs(getSnapshot func() *inmem.Snapshot) func(cmd *co
// extractResetEpochArgs extracts the required transaction arguments for the `resetEpoch` transaction
func extractRecoverEpochArgs(snapshot *inmem.Snapshot) []cadence.Value {
epoch := snapshot.Epochs().Current()

ids, err := snapshot.Identities(filter.IsValidProtocolParticipant)
if err != nil {
log.Fatal().Err(err).Msg("failed to get initial identities for current epoch")
log.Fatal().Err(err).Msg("failed to get valid protocol participants from snapshot")
}

currentEpochDKG, err := epoch.DKG()
if err != nil {
log.Fatal().Err(err).Msg("failed to get DKG for current epoch")
}

log.Info().Msg("collecting partner network and staking keys")
partnerNodes := common.ReadPartnerNodeInfos(log, flagPartnerWeights, flagPartnerNodeInfoDir)
log.Info().Msg("")
// separate collector nodes by internal and partner nodes
collectors := ids.Filter(filter.HasRole[flow.Identity](flow.RoleCollection))
internalCollectors := make(flow.IdentityList, 0)
partnerCollectors := make(flow.IdentityList, 0)

log.Info().Msg("collecting internal node network and staking keys")
internalNodes := common.ReadInternalNodeInfos(log, flagInternalNodePrivInfoDir, flagNodeConfigJson)
internalNodesMap := make(map[flow.Identifier]struct{})
for _, node := range internalNodes {
if !ids.Exists(node.Identity()) {
log.Fatal().Msg(fmt.Sprintf("node ID found in internal node infos missing from protocol snapshot identities: %s", node.NodeID))
}
internalNodesMap[node.NodeID] = struct{}{}
}
log.Info().Msg("")

collectors.Map(func(identity flow.Identity) flow.Identity {
if _, ok := internalNodesMap[identity.NodeID]; ok {
internalCollectors = append(internalCollectors, &identity)
} else {
partnerCollectors = append(partnerCollectors, &identity)
}
return identity
})

currentEpochDKG, err := epoch.DKG()
if err != nil {
log.Fatal().Err(err).Msg("failed to get DKG for current epoch")
}

log.Info().Msg("computing collection node clusters")
_, clusters, err := common.ConstructClusterAssignment(log, partnerNodes, internalNodes, flagCollectionClusters)
_, clusters, err := common.ConstructClusterAssignment(log, partnerCollectors, internalCollectors, flagCollectionClusters)
if err != nil {
log.Fatal().Err(err).Msg("unable to generate cluster assignment")
}
Expand Down

0 comments on commit 321da97

Please sign in to comment.