Skip to content

Commit

Permalink
FLIP 204: Update cruisectl.BlockTimeController to use `TargetEndTim…
Browse files Browse the repository at this point in the history
…e`, `TargetDuration` (#5023)

* unit tests, integration tests using buildjet-4vcpu runner

* listTargetPackages() returns map of CI runners

* test for custom runners

* generateTestMatrix() stores CI runner

* more testing with custom CI runners

* CI test - 2 unit tests running buildjet runners

* increase engine unit tests to 8 vCPUs

* resource manager full load test (8 vCPUs)

* network/test tests unquarantined, increased runners

* increased to 16 vCPUs

* increased engine tests to 16 vCPUs

* insecure module uses buildjet 4 vcpu runner

* bft framework integration test using buildjet

* engine tests split up to 6 jobs with default runners

* added back all integration tests

* upgraded runners for some integration tests, engine/execution

* dummy commit to kick off CI

* split up TestScriptExecutionAndGetAccounts into 3 tests

* updated runners for Epoch Cohort2, Access Integration Tests

* AN integration tests split up into 3 cohorts (skip unit, other integration tests)

* docker build using GitHub cache

* remove saving Docker image locally

* cache v3, saving local tar before caching, 8vcpu

* add 11 docker images to docker-images.tar, 16 vcpu

* docker save multiline fix; skip localnet-test

* docker build increase to 32 vcpu

* buildjet cache instead of GitHub cache

* lint fix

* activate more integration tests with default runners, using cached Docker

* docker-build downsized to 16 vcpu

* integration test without relic build

* put back relic build for integration tests

* put back all remaining integration tests using BuildJet runners

* integration tests reverted to ubuntu-latest runners

* re-activated unit-test, unit-test-modules jobs

* added (failing) test for sub-sub packages

* sub-sub packages support

* sub-sub package test enhanced

* updated other tests to support new sub-sub packages

* split up engine/execution unit tests into 3 jobs

* engine/execution/ingestion upgraded to 4 vcpu

* engine/execution/ingestion:buildjet-8vcpu

* network/test, network/p2p increased to 8 vcpu; added remaining network tests

* BFT Framework, Epoch Cohort 2 - increased to 4 vcpu

* upgraded runners for Access Cohort 1, Epoch Cohort 1, 2

* epoch cohort2 upgraded to 16 vcpu

* BFT Framework upgrade to 8 vcpu, epoch cohort2 removed flaky test, downgrade to 4 vcpu

* quarantined flaky test - TestEpochJoinAndLeaveVN

* quarantined flaky test - TestSealingAndVerificationPassThrough

* lint fix

* network/test split up into network/test/cohort1, network/test/cohort2

* network/p2p split up into 4 subpackages

network/p2p/connection network/p2p/scoring network/p2p/p2pnode network/p2p

* extracted network/alsp into separate job

* extracted module/dkg into separate job

* engine/execution/ingestion dowgraded to stock runner

* added engine parent package as separate job

* added storage parent package as separate job

* added state package as separate job

* removed localnet-test job

* engine/execution/ingestion upgraded to 2 vcpu

* network/p2p/p2pnode upgraded to 2 vcpu

* module, engine upgraded to 2 vcpu

* network/p2p/p2pnode upgraded to 4 vcpu

* network/test/cohort2 upgraded to 4 vcpu

* engine upgraded to 4 vcpu

* engine/execution/ingestion upgraded to 4 vcpu, network/test/cohort1 upgraded to 2 vcpu

* network/test/cohort1 upgraded to 4 vcpu

* engine/execution/ingestion upgraded to 8 vcpu

* network/test/cohort1 upgraded to 8 vcpu

* BFT (Protocol) upgraded to 4 vcpu

* network/test/cohort1 upgraded to 16 vcpu

* BFT (Protocol), Epoch Cohort1 upgraded to 8 vcpu

* noop push to kick off CI

* Module (integration) upgraded to 4 vcpu

* TestUnicastRateLimit_Messages flaky test quarantined

* noop push to kick off CI

* buildjet/cache@v3 => actions/cache@v3

* test retries increased to 5, timeout increased to 35 mins

* module job increased to 4 vcpu

* unquarantined epoch flaky test after it was fixed

#4975

* unlink flaky test monitor workflow from running when ci changes

* clean up

* lint fix

* add TargetEndTime to epoch models

* update service event parsing

* fix some typos and godocs

* update mocks

* lint

* lint

* pin new version of core-contracts

- adds TargetEndTime field
- pinned to branch - needs to be updated when release is created

* update expected state commitment

* update another state commitment constant

* update another state commit constant

* use TargetEndTime in cruisectl

* represent target end time as uint64

* lint / mocks

* use unix time in block time controller

* add todod for epoch fallback flag

* wip

* wip - begin adding duration

* add TargetDuration to models, conversion

* bump core-contracts version

got this error:
```
go: module github.com/onflow/flow-core-contracts@9e8417b found (v0.0.0-20231120143830-9e8417b56122), but does not contain package github.com/onflow/flow-core-contracts/lib/go/templates
```

Maybe because templates did not change since previous version?

* update state commitment constants in tests

* tidy

* update mocks

* track target duration in block time ctl

* remove transition_time

* update tests

* fix last 2 tests

- add helper functions for concisely converting between various
  time/duration types.
- use ns-level precision in view measurement, primarily so that we can
  verify this implementation is consistent with the Python simulation.
  For real-world, second-level precision is likely fine.

* Update consensus/hotstuff/cruisectl/block_time_controller.go

Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>

* Update consensus/hotstuff/cruisectl/block_time_controller_test.go

* Update consensus/hotstuff/cruisectl/block_time_controller_test.go

* lint

* Apply suggestions from code review

Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>

* add docs

* add docs for config setter/getters

* gofmt

---------

Co-authored-by: Misha <15269764+gomisha@users.noreply.github.com>
Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>
  • Loading branch information
3 people authored Nov 24, 2023
1 parent 445e0ab commit de721aa
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 404 deletions.
10 changes: 0 additions & 10 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func main() {
dkgControllerConfig dkgmodule.ControllerConfig
dkgMessagingEngineConfig = dkgeng.DefaultMessagingEngineConfig()
cruiseCtlConfig = cruisectl.DefaultConfig()
cruiseCtlTargetTransitionTimeFlag = cruiseCtlConfig.TargetTransition.String()
cruiseCtlFallbackProposalDurationFlag time.Duration
cruiseCtlMinViewDurationFlag time.Duration
cruiseCtlMaxViewDurationFlag time.Duration
Expand Down Expand Up @@ -150,7 +149,6 @@ func main() {
flags.DurationVar(&hotstuffMinTimeout, "hotstuff-min-timeout", 2500*time.Millisecond, "the lower timeout bound for the hotstuff pacemaker, this is also used as initial timeout")
flags.Float64Var(&hotstuffTimeoutAdjustmentFactor, "hotstuff-timeout-adjustment-factor", timeout.DefaultConfig.TimeoutAdjustmentFactor, "adjustment of timeout duration in case of time out event")
flags.Uint64Var(&hotstuffHappyPathMaxRoundFailures, "hotstuff-happy-path-max-round-failures", timeout.DefaultConfig.HappyPathMaxRoundFailures, "number of failed rounds before first timeout increase")
flags.StringVar(&cruiseCtlTargetTransitionTimeFlag, "cruise-ctl-target-epoch-transition-time", cruiseCtlTargetTransitionTimeFlag, "the target epoch switchover schedule")
flags.DurationVar(&cruiseCtlFallbackProposalDurationFlag, "cruise-ctl-fallback-proposal-duration", cruiseCtlConfig.FallbackProposalDelay.Load(), "the proposal duration value to use when the controller is disabled, or in epoch fallback mode. In those modes, this value has the same as the old `--block-rate-delay`")
flags.DurationVar(&cruiseCtlMinViewDurationFlag, "cruise-ctl-min-view-duration", cruiseCtlConfig.MinViewDuration.Load(), "the lower bound of authority for the controller, when active. This is the smallest amount of time a view is allowed to take.")
flags.DurationVar(&cruiseCtlMaxViewDurationFlag, "cruise-ctl-max-view-duration", cruiseCtlConfig.MaxViewDuration.Load(), "the upper bound of authority for the controller when active. This is the largest amount of time a view is allowed to take.")
Expand Down Expand Up @@ -179,14 +177,6 @@ func main() {
startupTime = t
nodeBuilder.Logger.Info().Time("startup_time", startupTime).Msg("got startup_time")
}
// parse target transition time string, if set
if cruiseCtlTargetTransitionTimeFlag != cruiseCtlConfig.TargetTransition.String() {
transitionTime, err := cruisectl.ParseTransition(cruiseCtlTargetTransitionTimeFlag)
if err != nil {
return fmt.Errorf("invalid epoch transition time string: %w", err)
}
cruiseCtlConfig.TargetTransition = *transitionTime
}
// convert local flag variables to atomic config variables, for dynamically updatable fields
if cruiseCtlEnabledFlag != cruiseCtlConfig.Enabled.Load() {
cruiseCtlConfig.Enabled.Store(cruiseCtlEnabledFlag)
Expand Down
113 changes: 84 additions & 29 deletions consensus/hotstuff/cruisectl/block_time_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,20 @@ type TimedBlock struct {
// epochInfo stores data about the current and next epoch. It is updated when we enter
// the first view of a new epoch, or the EpochSetup phase of the current epoch.
type epochInfo struct {
curEpochFirstView uint64
curEpochFinalView uint64 // F[v] - the final view of the epoch
curEpochTargetEndTime time.Time // T[v] - the target end time of the current epoch
nextEpochFinalView *uint64
curEpochFirstView uint64
curEpochFinalView uint64 // F[v] - the final view of the current epoch
curEpochTargetDuration uint64 // desired total duration of current epoch in seconds
curEpochTargetEndTime uint64 // T[v] - the target end time of the current epoch, represented as Unix Time [seconds]
nextEpochFinalView *uint64 // the final view of the next epoch
nextEpochTargetDuration *uint64 // desired total duration of next epoch in seconds, or nil if epoch has not yet been set up
nextEpochTargetEndTime *uint64 // the target end time of the next epoch, represented as Unix Time [seconds]
}

// targetViewTime returns τ[v], the ideal, steady-state view time for the current epoch.
// For numerical stability, we avoid repetitive conversions between seconds and time.Duration.
// Instead, internally within the controller, we work with float64 in units of seconds.
func (epoch *epochInfo) targetViewTime() float64 {
return epochLength.Seconds() / float64(epoch.curEpochFinalView-epoch.curEpochFirstView+1)
}

// fractionComplete returns the percentage of views completed of the epoch for the given curView.
// curView must be within the range [curEpochFirstView, curEpochFinalView]
// Returns the completion percentage as a float between [0, 1]
func (epoch *epochInfo) fractionComplete(curView uint64) float64 {
return float64(curView-epoch.curEpochFirstView) / float64(epoch.curEpochFinalView-epoch.curEpochFirstView)
return float64(epoch.curEpochTargetDuration) / float64(epoch.curEpochFinalView-epoch.curEpochFirstView+1)
}

// BlockTimeController dynamically adjusts the ProposalTiming of this node,
Expand All @@ -67,8 +63,8 @@ func (epoch *epochInfo) fractionComplete(curView uint64) float64 {
// This low-level controller output `(B0, x0, d)` is wrapped into a `ProposalTiming`
// interface, specifically `happyPathBlockTime` on the happy path. The purpose of the
// `ProposalTiming` wrapper is to translate the raw controller output into a form
// that is useful for the event handler. Edge cases, such as initialization or
// EECC are implemented by other implementations of `ProposalTiming`.
// that is useful for the EventHandler. Edge cases, such as initialization or
// epoch fallback are implemented by other implementations of `ProposalTiming`.
type BlockTimeController struct {
component.Component
protocol.Consumer // consumes protocol state events
Expand All @@ -79,7 +75,9 @@ type BlockTimeController struct {
log zerolog.Logger
metrics module.CruiseCtlMetrics

epochInfo // scheduled transition view for current/next epoch
epochInfo // scheduled transition view for current/next epoch
// Currently, the only possible state transition for `epochFallbackTriggered` is false → true.
// TODO for 'leaving Epoch Fallback via special service event' this might need to change.
epochFallbackTriggered bool

incorporatedBlocks chan TimedBlock // OnBlockIncorporated events, we desire these blocks to be processed in a timely manner and therefore use a small channel capacity
Expand Down Expand Up @@ -162,6 +160,18 @@ func (ctl *BlockTimeController) initEpochInfo(curView uint64) error {
}
ctl.curEpochFinalView = curEpochFinalView

curEpochTargetDuration, err := curEpoch.TargetDuration()
if err != nil {
return fmt.Errorf("could not initialize current epoch target duration: %w", err)
}
ctl.curEpochTargetDuration = curEpochTargetDuration

curEpochTargetEndTime, err := curEpoch.TargetEndTime()
if err != nil {
return fmt.Errorf("could not initialize current epoch target end time: %w", err)
}
ctl.curEpochTargetEndTime = curEpochTargetEndTime

phase, err := finalSnapshot.Phase()
if err != nil {
return fmt.Errorf("could not check snapshot phase: %w", err)
Expand All @@ -172,9 +182,19 @@ func (ctl *BlockTimeController) initEpochInfo(curView uint64) error {
return fmt.Errorf("could not initialize next epoch final view: %w", err)
}
ctl.epochInfo.nextEpochFinalView = &nextEpochFinalView
}

ctl.curEpochTargetEndTime = ctl.config.TargetTransition.inferTargetEndTime(time.Now().UTC(), ctl.epochInfo.fractionComplete(curView))
nextEpochTargetDuration, err := finalSnapshot.Epochs().Next().TargetDuration()
if err != nil {
return fmt.Errorf("could not initialize next epoch target duration: %w", err)
}
ctl.nextEpochTargetDuration = &nextEpochTargetDuration

nextEpochTargetEndTime, err := finalSnapshot.Epochs().Next().TargetEndTime()
if err != nil {
return fmt.Errorf("could not initialize next epoch target end time: %w", err)
}
ctl.nextEpochTargetEndTime = &nextEpochTargetEndTime
}

epochFallbackTriggered, err := ctl.state.Params().EpochFallbackTriggered()
if err != nil {
Expand All @@ -197,8 +217,7 @@ func (ctl *BlockTimeController) initProposalTiming(curView uint64) {
ctl.storeProposalTiming(newPublishImmediately(curView, time.Now().UTC()))
}

// storeProposalTiming stores the latest ProposalTiming
// Concurrency safe.
// storeProposalTiming stores the latest ProposalTiming. Concurrency safe.
func (ctl *BlockTimeController) storeProposalTiming(proposalTiming ProposalTiming) {
ctl.latestProposalTiming.Store(&proposalTiming)
}
Expand Down Expand Up @@ -242,7 +261,7 @@ func (ctl *BlockTimeController) processEventsWorkerLogic(ctx irrecoverable.Signa
case <-ctl.epochFallbacks:
err := ctl.processEpochFallbackTriggered()
if err != nil {
ctl.log.Err(err).Msgf("fatal error processing epoch EECC event")
ctl.log.Err(err).Msgf("fatal error processing epoch fallback event")
ctx.Throw(err)
}
default:
Expand Down Expand Up @@ -270,7 +289,7 @@ func (ctl *BlockTimeController) processEventsWorkerLogic(ctx irrecoverable.Signa
case <-ctl.epochFallbacks:
err := ctl.processEpochFallbackTriggered()
if err != nil {
ctl.log.Err(err).Msgf("fatal error processing epoch EECC event")
ctl.log.Err(err).Msgf("fatal error processing epoch fallback event")
ctx.Throw(err)
return
}
Expand Down Expand Up @@ -321,15 +340,24 @@ func (ctl *BlockTimeController) checkForEpochTransition(tb TimedBlock) error {
if ctl.nextEpochFinalView == nil { // final view of epoch we are entering should be known
return fmt.Errorf("cannot transition without nextEpochFinalView set")
}
if ctl.nextEpochTargetEndTime == nil {
return fmt.Errorf("cannot transition without nextEpochTargetEndTime set")
}
if ctl.nextEpochTargetDuration == nil {
return fmt.Errorf("cannot transition without nextEpochTargetDuration set")
}
if view > *ctl.nextEpochFinalView { // the block's view should be within the upcoming epoch
return fmt.Errorf("sanity check failed: curView %d is beyond both current epoch (final view %d) and next epoch (final view %d)",
view, ctl.curEpochFinalView, *ctl.nextEpochFinalView)
}

ctl.curEpochFirstView = ctl.curEpochFinalView + 1
ctl.curEpochFinalView = *ctl.nextEpochFinalView
ctl.curEpochTargetDuration = *ctl.nextEpochTargetDuration
ctl.curEpochTargetEndTime = *ctl.nextEpochTargetEndTime
ctl.nextEpochFinalView = nil
ctl.curEpochTargetEndTime = ctl.config.TargetTransition.inferTargetEndTime(tb.Block.Timestamp, ctl.epochInfo.fractionComplete(view))
ctl.nextEpochTargetDuration = nil
ctl.nextEpochTargetEndTime = nil
return nil
}

Expand Down Expand Up @@ -362,9 +390,9 @@ func (ctl *BlockTimeController) measureViewDuration(tb TimedBlock) error {
// In accordance with this convention, observing the proposal for the last view of an epoch, marks the start of the last view.
// By observing the proposal, nodes enter the last view, verify the block, vote for it, the primary aggregates the votes,
// constructs the child (for first view of new epoch). The last view of the epoch ends, when the child proposal is published.
tau := ctl.targetViewTime() // τ - idealized target view time in units of seconds
viewDurationsRemaining := ctl.curEpochFinalView + 1 - view // k[v] - views remaining in current epoch
durationRemaining := ctl.curEpochTargetEndTime.Sub(tb.TimeObserved)
tau := ctl.targetViewTime() // τ: idealized target view time in units of seconds
viewDurationsRemaining := ctl.curEpochFinalView + 1 - view // k[v]: views remaining in current epoch
durationRemaining := u2t(ctl.curEpochTargetEndTime).Sub(tb.TimeObserved) // Γ[v] = T[v] - t[v], with t[v] ≡ tb.TimeObserved the time when observing the block that trigged the view change

// Compute instantaneous error term: e[v] = k[v]·τ - T[v] i.e. the projected difference from target switchover
// and update PID controller's error terms. All UNITS in SECOND.
Expand All @@ -377,7 +405,7 @@ func (ctl *BlockTimeController) measureViewDuration(tb TimedBlock) error {
u := propErr*ctl.config.KP + itgErr*ctl.config.KI + drivErr*ctl.config.KD

// compute the controller output for this observation
unconstrainedBlockTime := time.Duration((tau - u) * float64(time.Second)) // desired time between parent and child block, in units of seconds
unconstrainedBlockTime := f2d(tau - u) // desired time between parent and child block, in units of seconds
proposalTiming := newHappyPathBlockTime(tb, unconstrainedBlockTime, ctl.config.TimingConfig)
constrainedBlockTime := proposalTiming.ConstrainedBlockTime()

Expand All @@ -390,13 +418,13 @@ func (ctl *BlockTimeController) measureViewDuration(tb TimedBlock) error {
Float64("proportional_err", propErr).
Float64("integral_err", itgErr).
Float64("derivative_err", drivErr).
Dur("controller_output", time.Duration(u*float64(time.Second))).
Dur("controller_output", f2d(u)).
Dur("unconstrained_block_time", unconstrainedBlockTime).
Dur("constrained_block_time", constrainedBlockTime).
Msg("measured error upon view change")

ctl.metrics.PIDError(propErr, itgErr, drivErr)
ctl.metrics.ControllerOutput(time.Duration(u * float64(time.Second)))
ctl.metrics.ControllerOutput(f2d(u))
ctl.metrics.TargetProposalDuration(proposalTiming.ConstrainedBlockTime())

ctl.storeProposalTiming(proposalTiming)
Expand All @@ -416,9 +444,20 @@ func (ctl *BlockTimeController) processEpochSetupPhaseStarted(snapshot protocol.
nextEpoch := snapshot.Epochs().Next()
finalView, err := nextEpoch.FinalView()
if err != nil {
return fmt.Errorf("could not get next epochInfo final view: %w", err)
return fmt.Errorf("could not get next epoch final view: %w", err)
}
targetDuration, err := nextEpoch.TargetDuration()
if err != nil {
return fmt.Errorf("could not get next epoch target duration: %w", err)
}
targetEndTime, err := nextEpoch.TargetEndTime()
if err != nil {
return fmt.Errorf("could not get next epoch target end time: %w", err)
}

ctl.epochInfo.nextEpochFinalView = &finalView
ctl.epochInfo.nextEpochTargetDuration = &targetDuration
ctl.epochInfo.nextEpochTargetEndTime = &targetEndTime
return nil
}

Expand Down Expand Up @@ -460,3 +499,19 @@ func (ctl *BlockTimeController) EpochSetupPhaseStarted(_ uint64, first *flow.Hea
func (ctl *BlockTimeController) EpochEmergencyFallbackTriggered() {
ctl.epochFallbacks <- struct{}{}
}

// t2u converts a time.Time to UNIX time represented as a uint64.
// Returned timestamp is precise to within one second of input.
func t2u(t time.Time) uint64 {
return uint64(t.Unix())
}

// u2t converts a UNIX timestamp represented as a uint64 to a time.Time.
func u2t(unix uint64) time.Time {
return time.Unix(int64(unix), 0)
}

// f2d converts a floating-point number of seconds to a time.Duration.
func f2d(sec float64) time.Duration {
return time.Duration(int64(sec * float64(time.Second)))
}
Loading

0 comments on commit de721aa

Please sign in to comment.