Skip to content

Commit

Permalink
Merge pull request #4559 from onflow/yurii/5529-dynamic-protocol-stat…
Browse files Browse the repository at this point in the history
…e-storage-layer
  • Loading branch information
durkmurder authored Sep 7, 2023
2 parents 17c66b8 + 628d61f commit 620d901
Show file tree
Hide file tree
Showing 15 changed files with 1,031 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,7 @@ func TestStakingVoteProcessorV2_BuildVerifyQC(t *testing.T) {
}).Sort(order.Canonical)

leader := stakingSigners[0]

block := helper.MakeBlock(helper.WithBlockView(view),
helper.WithBlockProposer(leader.NodeID))
block := helper.MakeBlock(helper.WithBlockView(view), helper.WithBlockProposer(leader.NodeID))

committee := &mockhotstuff.DynamicCommittee{}
committee.On("IdentitiesByEpoch", block.View).Return(stakingSigners.ToSkeleton(), nil)
Expand Down
5 changes: 5 additions & 0 deletions model/flow/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ type EventIDs struct {
CommitID Identifier
}

// ID returns hash of the event IDs.
func (e *EventIDs) ID() Identifier {
return MakeID(e)
}

func NewEpochStatus(previousSetup, previousCommit, currentSetup, currentCommit, nextSetup, nextCommit Identifier) (*EpochStatus, error) {
status := &EpochStatus{
PreviousEpoch: EventIDs{
Expand Down
3 changes: 2 additions & 1 deletion model/flow/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ func (il IdentityList) SamplePct(pct float64) (IdentityList, error) {
// Union returns a new identity list containing every identity that occurs in
// either `il`, or `other`, or both. There are no duplicates in the output,
// where duplicates are identities with the same node ID.
// The returned IdentityList is sorted
// Receiver `il` and/or method input `other` can be nil or empty.
// The returned IdentityList is sorted in canonical order.
func (il IdentityList) Union(other IdentityList) IdentityList {
maxLen := len(il) + len(other)

Expand Down
230 changes: 230 additions & 0 deletions model/flow/protocol_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package flow

import "fmt"

// DynamicIdentityEntry encapsulates nodeID and dynamic portion of identity.
type DynamicIdentityEntry struct {
NodeID Identifier
Dynamic DynamicIdentity
}

type DynamicIdentityEntryList []*DynamicIdentityEntry

// ProtocolStateEntry represents a snapshot of the identity table (i.e. the set of all notes authorized to
// be part of the network) at some point in time. It allows to reconstruct the state of identity table using
// epoch setup events and dynamic identities. It tracks attempts of invalid state transitions.
// It also holds information about the next epoch, if it has been already committed.
// This structure is used to persist protocol state in the database.
//
// Note that the current implementation does not store the identity table directly. Instead, we store
// the original events that constituted the _initial_ identity table at the beginning of the epoch
// plus some modifiers. We intend to restructure this code soon.
// TODO: https://github.com/onflow/flow-go/issues/4649
type ProtocolStateEntry struct {
// setup and commit event IDs for current epoch.
CurrentEpochEventIDs EventIDs
// setup and commit event IDs for previous epoch.
PreviousEpochEventIDs EventIDs
// Part of identity table that can be changed during the epoch.
// Always sorted in canonical order.
Identities DynamicIdentityEntryList
// InvalidStateTransitionAttempted encodes whether an invalid state transition
// has been detected in this fork. When this happens, epoch fallback is triggered
// AFTER the fork is finalized.
InvalidStateTransitionAttempted bool
// NextEpochProtocolState describes protocol state of the next epoch
NextEpochProtocolState *ProtocolStateEntry
}

// RichProtocolStateEntry is a ProtocolStateEntry which has additional fields that are cached
// from storage layer for convenience.
// Using this structure instead of ProtocolStateEntry allows us to avoid querying
// the database for epoch setups and commits and full identity table.
// It holds several invariants, such as:
// - CurrentEpochSetup and CurrentEpochCommit are for the same epoch. Never nil.
// - PreviousEpochSetup and PreviousEpochCommit are for the same epoch. Can be nil.
// - Identities is a full identity table for the current epoch.
// Identities are sorted in canonical order. Without duplicates. Never nil.
// - NextEpochProtocolState is a protocol state for the next epoch. Can be nil.
type RichProtocolStateEntry struct {
*ProtocolStateEntry

CurrentEpochSetup *EpochSetup
CurrentEpochCommit *EpochCommit
PreviousEpochSetup *EpochSetup
PreviousEpochCommit *EpochCommit
Identities IdentityList

NextEpochProtocolState *RichProtocolStateEntry
}

// NewRichProtocolStateEntry constructs a rich protocol state entry from a protocol state entry and additional data.
// No errors are expected during normal operation.
func NewRichProtocolStateEntry(
protocolState *ProtocolStateEntry,
previousEpochSetup *EpochSetup,
previousEpochCommit *EpochCommit,
currentEpochSetup *EpochSetup,
currentEpochCommit *EpochCommit,
nextEpochSetup *EpochSetup,
nextEpochCommit *EpochCommit,
) (*RichProtocolStateEntry, error) {
result := &RichProtocolStateEntry{
ProtocolStateEntry: protocolState,
CurrentEpochSetup: currentEpochSetup,
CurrentEpochCommit: currentEpochCommit,
PreviousEpochSetup: previousEpochSetup,
PreviousEpochCommit: previousEpochCommit,
Identities: nil,
NextEpochProtocolState: nil,
}

var err error
// if next epoch has been already committed, fill in data for it as well.
if protocolState.NextEpochProtocolState != nil {
// sanity check consistency of input data
if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID != nextEpochSetup.ID() {
return nil, fmt.Errorf("inconsistent EpochSetup for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v",
protocolState.NextEpochProtocolState.CurrentEpochEventIDs.SetupID, nextEpochSetup.ID())
}
if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != ZeroID {
if protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID != nextEpochCommit.ID() {
return nil, fmt.Errorf("inconsistent EpochCommit for constucting RichProtocolStateEntry, next protocol state states ID %v while input event has ID %v",
protocolState.NextEpochProtocolState.CurrentEpochEventIDs.CommitID, nextEpochCommit.ID())
}
}

// if next epoch is available, it means that we have observed epoch setup event and we are not anymore in staking phase,
// so we need to build the identity table using current and next epoch setup events.
// so we need to build the identity table using current and next epoch setup events.
result.Identities, err = buildIdentityTable(
protocolState.Identities,
currentEpochSetup.Participants,
nextEpochSetup.Participants,
)
if err != nil {
return nil, fmt.Errorf("could not build identity table for setup/commit phase: %w", err)
}

nextEpochProtocolState := protocolState.NextEpochProtocolState
nextEpochIdentityTable, err := buildIdentityTable(
nextEpochProtocolState.Identities,
nextEpochSetup.Participants,
currentEpochSetup.Participants,
)
if err != nil {
return nil, fmt.Errorf("could not build next epoch identity table: %w", err)
}

// fill identities for next epoch
result.NextEpochProtocolState = &RichProtocolStateEntry{
ProtocolStateEntry: nextEpochProtocolState,
CurrentEpochSetup: nextEpochSetup,
CurrentEpochCommit: nextEpochCommit,
PreviousEpochSetup: result.CurrentEpochSetup, // previous epoch setup is current epoch setup
PreviousEpochCommit: result.CurrentEpochCommit, // previous epoch setup is current epoch setup
Identities: nextEpochIdentityTable,
NextEpochProtocolState: nil, // always nil
}
} else {
// if next epoch is not yet created, it means that we are in staking phase,
// so we need to build the identity table using previous and current epoch setup events.
var otherIdentities IdentityList
if previousEpochSetup != nil {
otherIdentities = previousEpochSetup.Participants
}
result.Identities, err = buildIdentityTable(
protocolState.Identities,
currentEpochSetup.Participants,
otherIdentities,
)
if err != nil {
return nil, fmt.Errorf("could not build identity table for staking phase: %w", err)
}
}

return result, nil
}

// ID returns hash of entry by hashing all fields.
func (e *ProtocolStateEntry) ID() Identifier {
if e == nil {
return ZeroID
}
body := struct {
CurrentEpochEventIDs Identifier
PreviousEpochEventIDs Identifier
Identities DynamicIdentityEntryList
InvalidStateTransitionAttempted bool
NextEpochProtocolStateID Identifier
}{
CurrentEpochEventIDs: e.CurrentEpochEventIDs.ID(),
PreviousEpochEventIDs: e.PreviousEpochEventIDs.ID(),
Identities: e.Identities,
InvalidStateTransitionAttempted: e.InvalidStateTransitionAttempted,
NextEpochProtocolStateID: e.NextEpochProtocolState.ID(),
}
return MakeID(body)
}

// Sorted returns whether the list is sorted by the input ordering.
func (ll DynamicIdentityEntryList) Sorted(less IdentifierOrder) bool {
for i := 0; i < len(ll)-1; i++ {
a := ll[i]
b := ll[i+1]
if !less(a.NodeID, b.NodeID) {
return false
}
}
return true
}

// buildIdentityTable constructs the full identity table for the target epoch by combining data from:
// 1. The target epoch's Dynamic Identities.
// 2. The target epoch's IdentitySkeletons
// (recorded in EpochSetup event and immutable throughout the epoch).
// 3. [optional] An adjacent epoch's IdentitySkeletons (can be empty or nil), as recorded in the
// adjacent epoch's setup event. For a target epoch N, the epochs N-1 and N+1 are defined to be
// adjacent. Adjacent epochs do not _necessarily_ exist (e.g. consider a spork comprising only
// a single epoch), in which case this input is nil or empty.
//
// It also performs sanity checks to make sure that the data is consistent.
// No errors are expected during normal operation.
func buildIdentityTable(
targetEpochDynamicIdentities DynamicIdentityEntryList,
targetEpochIdentitySkeletons IdentityList, // TODO: change to `IdentitySkeletonList`
adjacentEpochIdentitySkeletons IdentityList, // TODO: change to `IdentitySkeletonList`
) (IdentityList, error) {
// produce a unique set for current and previous epoch participants
allEpochParticipants := targetEpochIdentitySkeletons.Union(adjacentEpochIdentitySkeletons)
// sanity check: size of identities should be equal to previous and current epoch participants combined
if len(allEpochParticipants) != len(targetEpochDynamicIdentities) {
return nil, fmt.Errorf("invalid number of identities in protocol state: expected %d, got %d", len(allEpochParticipants), len(targetEpochDynamicIdentities))
}

// build full identity table for current epoch
var result IdentityList
for i, identity := range targetEpochDynamicIdentities {
// sanity check: identities should be sorted in canonical order
if identity.NodeID != allEpochParticipants[i].NodeID {
return nil, fmt.Errorf("identites in protocol state are not in canonical order: expected %s, got %s", allEpochParticipants[i].NodeID, identity.NodeID)
}
result = append(result, &Identity{
IdentitySkeleton: allEpochParticipants[i].IdentitySkeleton,
DynamicIdentity: identity.Dynamic,
})
}
return result, nil
}

// DynamicIdentityEntryListFromIdentities converts IdentityList to DynamicIdentityEntryList.
func DynamicIdentityEntryListFromIdentities(identities IdentityList) DynamicIdentityEntryList {
dynamicIdentities := make(DynamicIdentityEntryList, 0, len(identities))
for _, identity := range identities {
dynamicIdentities = append(dynamicIdentities, &DynamicIdentityEntry{
NodeID: identity.NodeID,
Dynamic: identity.DynamicIdentity,
})
}
return dynamicIdentities
}
119 changes: 119 additions & 0 deletions model/flow/protocol_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package flow_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

// TestNewRichProtocolStateEntry checks that NewRichProtocolStateEntry creates valid identity tables depending on the state
// of epoch which is derived from the protocol state entry.
func TestNewRichProtocolStateEntry(t *testing.T) {
// Conditions right after a spork:
// * no previous epoch exists from the perspective of the freshly-sporked protocol state
// * network is currently in the staking phase for the next epoch, hence no service events for the next epoch exist
t.Run("staking-root-protocol-state", func(t *testing.T) {
currentEpochSetup := unittest.EpochSetupFixture()
currentEpochCommit := unittest.EpochCommitFixture()
stateEntry := &flow.ProtocolStateEntry{
CurrentEpochEventIDs: flow.EventIDs{
SetupID: currentEpochSetup.ID(),
CommitID: currentEpochCommit.ID(),
},
PreviousEpochEventIDs: flow.EventIDs{},
Identities: flow.DynamicIdentityEntryListFromIdentities(currentEpochSetup.Participants),
InvalidStateTransitionAttempted: false,
NextEpochProtocolState: nil,
}
entry, err := flow.NewRichProtocolStateEntry(
stateEntry,
nil,
nil,
currentEpochSetup,
currentEpochCommit,
nil,
nil,
)
assert.NoError(t, err)
assert.Equal(t, currentEpochSetup.Participants, entry.Identities, "should be equal to current epoch setup participants")
})

// Common situation during the staking phase for epoch N+1
// * we are currently in Epoch N
// * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events)
// * network is currently in the staking phase for the next epoch, hence no service events for the next epoch exist
t.Run("staking-phase", func(t *testing.T) {
stateEntry := unittest.ProtocolStateFixture()
richEntry, err := flow.NewRichProtocolStateEntry(
stateEntry.ProtocolStateEntry,
stateEntry.PreviousEpochSetup,
stateEntry.PreviousEpochCommit,
stateEntry.CurrentEpochSetup,
stateEntry.CurrentEpochCommit,
nil,
nil,
)
assert.NoError(t, err)
expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.PreviousEpochSetup.Participants)
assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + previous epoch setup participants")
assert.Nil(t, richEntry.NextEpochProtocolState)
})

// Common situation during the epoch setup phase for epoch N+1
// * we are currently in Epoch N
// * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events)
// * network is currently in the setup phase for the next epoch, i.e. EpochSetup event (starting setup phase) has already been observed
t.Run("setup-phase", func(t *testing.T) {
stateEntry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState(), func(entry *flow.RichProtocolStateEntry) {
entry.NextEpochProtocolState.CurrentEpochCommit = nil
entry.NextEpochProtocolState.CurrentEpochEventIDs.CommitID = flow.ZeroID
})

richEntry, err := flow.NewRichProtocolStateEntry(
stateEntry.ProtocolStateEntry,
stateEntry.PreviousEpochSetup,
stateEntry.PreviousEpochCommit,
stateEntry.CurrentEpochSetup,
stateEntry.CurrentEpochCommit,
stateEntry.NextEpochProtocolState.CurrentEpochSetup,
nil,
)
assert.NoError(t, err)
expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants)
assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + next epoch setup participants")
assert.Nil(t, richEntry.NextEpochProtocolState.CurrentEpochCommit)
expectedIdentities = stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants.Union(stateEntry.CurrentEpochSetup.Participants)
assert.Equal(t, expectedIdentities, richEntry.NextEpochProtocolState.Identities, "should be equal to next epoch setup participants + current epoch setup participants")
})

// TODO: include test for epoch setup phase where no prior epoch exist (i.e. first epoch setup phase after spork)

// Common situation during the epoch commit phase for epoch N+1
// * we are currently in Epoch N
// * previous epoch N-1 is known (specifically EpochSetup and EpochCommit events)
// * The network has completed the epoch setup phase, i.e. published the EpochSetup and EpochCommit events for epoch N+1.
t.Run("commit-phase", func(t *testing.T) {
stateEntry := unittest.ProtocolStateFixture(unittest.WithNextEpochProtocolState())

richEntry, err := flow.NewRichProtocolStateEntry(
stateEntry.ProtocolStateEntry,
stateEntry.PreviousEpochSetup,
stateEntry.PreviousEpochCommit,
stateEntry.CurrentEpochSetup,
stateEntry.CurrentEpochCommit,
stateEntry.NextEpochProtocolState.CurrentEpochSetup,
stateEntry.NextEpochProtocolState.CurrentEpochCommit,
)
assert.NoError(t, err)
expectedIdentities := stateEntry.CurrentEpochSetup.Participants.Union(stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants)
assert.Equal(t, expectedIdentities, richEntry.Identities, "should be equal to current epoch setup participants + next epoch setup participants")
expectedIdentities = stateEntry.NextEpochProtocolState.CurrentEpochSetup.Participants.Union(stateEntry.CurrentEpochSetup.Participants)
assert.Equal(t, expectedIdentities, richEntry.NextEpochProtocolState.Identities, "should be equal to next epoch setup participants + current epoch setup participants")
})

// TODO: include test for epoch commit phase where no prior epoch exist (i.e. first epoch commit phase after spork)

}
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
ResourceQC = "qc"
ResourceMyReceipt = "my_receipt"
ResourceCollection = "collection"
ResourceProtocolState = "protocol_state"
ResourceApproval = "approval"
ResourceSeal = "seal"
ResourcePendingIncorporatedSeal = "pending_incorporated_seal"
Expand Down
Loading

0 comments on commit 620d901

Please sign in to comment.