Skip to content

Commit 10b7209

Browse files
authored
Merge branch 'master' into amlandeep/add-random-EN-lookup-on-recipt-lookup-failure
2 parents 802959b + 8ddc01e commit 10b7209

File tree

18 files changed

+241
-13
lines changed

18 files changed

+241
-13
lines changed

cmd/access/node_builder/access_node_builder.go

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/spf13/pflag"
1313

1414
"github.com/onflow/flow-go/crypto"
15+
"github.com/onflow/flow-go/module/compliance"
1516

1617
"github.com/onflow/flow-go/cmd"
1718
"github.com/onflow/flow-go/consensus"
@@ -309,6 +310,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
309310
builder.FollowerCore,
310311
builder.SyncCore,
311312
node.Tracer,
313+
compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold),
312314
)
313315
if err != nil {
314316
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/bootstrap/cmd/constraints.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ func checkConstraints(partnerNodes, internalNodes []model.NodeInfo) {
5050
if _, exists := internals.ByNodeID(node.NodeID); exists {
5151
clusterInternalCount++
5252
}
53-
if clusterInternalCount <= clusterPartnerCount*2 {
54-
log.Fatal().Msgf(
55-
"will not bootstrap configuration without Byzantine majority within cluster: "+
56-
"(partners=%d, internals=%d, min_internals=%d)",
57-
clusterPartnerCount, clusterInternalCount, clusterPartnerCount*2+1)
58-
}
53+
}
54+
if clusterInternalCount <= clusterPartnerCount*2 {
55+
log.Fatal().Msgf(
56+
"will not bootstrap configuration without Byzantine majority within cluster: "+
57+
"(partners=%d, internals=%d, min_internals=%d)",
58+
clusterPartnerCount, clusterInternalCount, clusterPartnerCount*2+1)
5959
}
6060
partnerCOLCount += clusterPartnerCount
6161
internalCOLCount += clusterInternalCount

cmd/collection/main.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/onflow/flow-go/cmd/util/cmd/common"
1010
"github.com/onflow/flow-go/model/bootstrap"
11+
modulecompliance "github.com/onflow/flow-go/module/compliance"
1112
"github.com/onflow/flow-go/module/mempool/herocache"
1213

1314
"github.com/onflow/flow-go-sdk/client"
@@ -68,9 +69,10 @@ func main() {
6869
startupTimeString string
6970
startupTime time.Time
7071

71-
followerState protocol.MutableState
72-
ingestConf = ingest.DefaultConfig()
73-
rpcConf rpc.Config
72+
followerState protocol.MutableState
73+
ingestConf = ingest.DefaultConfig()
74+
rpcConf rpc.Config
75+
clusterComplianceConfig modulecompliance.Config
7476

7577
pools *epochpool.TransactionPools // epoch-scoped transaction pools
7678
followerBuffer *buffer.PendingBlocks // pending block cache for follower
@@ -140,6 +142,8 @@ func main() {
140142
"additional fraction of replica timeout that the primary will wait for votes")
141143
flags.DurationVar(&blockRateDelay, "block-rate-delay", 250*time.Millisecond,
142144
"the delay to broadcast block proposal in order to control block production rate")
145+
flags.Uint64Var(&clusterComplianceConfig.SkipNewProposalsThreshold,
146+
"cluster-compliance-skip-proposals-threshold", modulecompliance.DefaultConfig().SkipNewProposalsThreshold, "threshold at which new proposals are discarded rather than cached, if their height is this much above local finalized height (cluster compliance engine)")
143147
flags.StringVar(&startupTimeString, "hotstuff-startup-time", cmd.NotSet, "specifies date and time (in ISO 8601 format) after which the consensus participant may enter the first view (e.g (e.g 1996-04-24T15:04:05-07:00))")
144148

145149
// epoch qc contract flags
@@ -306,6 +310,7 @@ func main() {
306310
followerCore,
307311
mainChainSyncCore,
308312
node.Tracer,
313+
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
309314
)
310315
if err != nil {
311316
return nil, fmt.Errorf("could not create follower engine: %w", err)
@@ -425,6 +430,7 @@ func main() {
425430
node.Metrics.Mempool,
426431
node.State,
427432
node.Storage.Transactions,
433+
modulecompliance.WithSkipNewProposalsThreshold(clusterComplianceConfig.SkipNewProposalsThreshold),
428434
)
429435
if err != nil {
430436
return nil, err

cmd/consensus/main.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/onflow/flow-go/module/buffer"
4848
builder "github.com/onflow/flow-go/module/builder/consensus"
4949
chmodule "github.com/onflow/flow-go/module/chunks"
50+
modulecompliance "github.com/onflow/flow-go/module/compliance"
5051
dkgmodule "github.com/onflow/flow-go/module/dkg"
5152
"github.com/onflow/flow-go/module/epochs"
5253
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
@@ -657,13 +658,21 @@ func main() {
657658
mutableState,
658659
proposals,
659660
syncCore,
660-
hotstuffModules.Aggregator)
661+
hotstuffModules.Aggregator,
662+
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
663+
)
661664
if err != nil {
662665
return nil, fmt.Errorf("could not initialize compliance core: %w", err)
663666
}
664667

665668
// initialize the compliance engine
666-
comp, err = compliance.NewEngine(node.Logger, node.Network, node.Me, prov, complianceCore)
669+
comp, err = compliance.NewEngine(
670+
node.Logger,
671+
node.Network,
672+
node.Me,
673+
prov,
674+
complianceCore,
675+
)
667676
if err != nil {
668677
return nil, fmt.Errorf("could not initialize compliance engine: %w", err)
669678
}

cmd/execution/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import (
6161
"github.com/onflow/flow-go/model/flow/filter"
6262
"github.com/onflow/flow-go/module"
6363
"github.com/onflow/flow-go/module/buffer"
64+
"github.com/onflow/flow-go/module/compliance"
6465
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
6566
"github.com/onflow/flow-go/module/metrics"
6667
"github.com/onflow/flow-go/module/state_synchronization"
@@ -674,6 +675,7 @@ func main() {
674675
followerCore,
675676
syncCore,
676677
node.Tracer,
678+
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
677679
)
678680
if err != nil {
679681
return nil, fmt.Errorf("could not create follower engine: %w", err)

cmd/node_builder.go

+5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/spf13/pflag"
1515

1616
"github.com/onflow/flow-go/crypto"
17+
"github.com/onflow/flow-go/module/compliance"
1718

1819
"github.com/onflow/flow-go/admin/commands"
1920
"github.com/onflow/flow-go/fvm"
@@ -151,6 +152,9 @@ type BaseConfig struct {
151152
topologyEdgeProbability float64
152153
HeroCacheMetricsEnable bool
153154
SyncCoreConfig synchronization.Config
155+
// ComplianceConfig configures either the compliance engine (consensus nodes)
156+
// or the follower engine (all other node roles)
157+
ComplianceConfig compliance.Config
154158
}
155159

156160
// NodeConfig contains all the derived parameters such the NodeID, private keys etc. and initialized instances of
@@ -232,5 +236,6 @@ func DefaultBaseConfig() *BaseConfig {
232236
topologyEdgeProbability: topology.MaximumEdgeProbability,
233237
HeroCacheMetricsEnable: false,
234238
SyncCoreConfig: synchronization.DefaultConfig(),
239+
ComplianceConfig: compliance.DefaultConfig(),
235240
}
236241
}

cmd/scaffold.go

+2
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
162162
fnb.flags.UintVar(&fnb.BaseConfig.SyncCoreConfig.MaxAttempts, "sync-max-attempts", defaultConfig.SyncCoreConfig.MaxAttempts, "the maximum number of attempts we make for each requested block/height before discarding")
163163
fnb.flags.UintVar(&fnb.BaseConfig.SyncCoreConfig.MaxSize, "sync-max-size", defaultConfig.SyncCoreConfig.MaxSize, "the maximum number of blocks we request in the same block request message")
164164
fnb.flags.UintVar(&fnb.BaseConfig.SyncCoreConfig.MaxRequests, "sync-max-requests", defaultConfig.SyncCoreConfig.MaxRequests, "the maximum number of requests we send during each scanning period")
165+
166+
fnb.flags.Uint64Var(&fnb.BaseConfig.ComplianceConfig.SkipNewProposalsThreshold, "compliance-skip-proposals-threshold", defaultConfig.ComplianceConfig.SkipNewProposalsThreshold, "threshold at which new proposals are discarded rather than cached, if their height is this much above local finalized height")
165167
}
166168

167169
func (fnb *FlowNodeBuilder) EnqueuePingService() {

cmd/verification/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/onflow/flow-go/module"
2727
"github.com/onflow/flow-go/module/buffer"
2828
"github.com/onflow/flow-go/module/chunks"
29+
"github.com/onflow/flow-go/module/compliance"
2930
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
3031
"github.com/onflow/flow-go/module/mempool"
3132
"github.com/onflow/flow-go/module/mempool/stdmap"
@@ -328,6 +329,7 @@ func main() {
328329
followerCore,
329330
syncCore,
330331
node.Tracer,
332+
compliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
331333
)
332334
if err != nil {
333335
return nil, fmt.Errorf("could not create follower engine: %w", err)

engine/collection/compliance/core.go

+23
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/onflow/flow-go/model/flow"
1616
"github.com/onflow/flow-go/model/messages"
1717
"github.com/onflow/flow-go/module"
18+
"github.com/onflow/flow-go/module/compliance"
1819
"github.com/onflow/flow-go/module/metrics"
1920
"github.com/onflow/flow-go/state"
2021
clusterkv "github.com/onflow/flow-go/state/cluster"
@@ -28,6 +29,7 @@ import (
2829
// user of this object needs to ensure single thread access.
2930
type Core struct {
3031
log zerolog.Logger // used to log relevant actions with context
32+
config compliance.Config
3133
metrics module.EngineMetrics
3234
mempoolMetrics module.MempoolMetrics
3335
collectionMetrics module.CollectionMetrics
@@ -49,10 +51,17 @@ func NewCore(
4951
state clusterkv.MutableState,
5052
pending module.PendingClusterBlockBuffer,
5153
voteAggregator hotstuff.VoteAggregator,
54+
opts ...compliance.Opt,
5255
) (*Core, error) {
5356

57+
config := compliance.DefaultConfig()
58+
for _, apply := range opts {
59+
apply(&config)
60+
}
61+
5462
c := &Core{
5563
log: log.With().Str("cluster_compliance", "core").Logger(),
64+
config: config,
5665
metrics: collector,
5766
mempoolMetrics: mempool,
5867
collectionMetrics: collectionMetrics,
@@ -108,6 +117,20 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Clus
108117
return fmt.Errorf("could not check proposal: %w", err)
109118
}
110119

120+
// ignore proposals which are too far ahead of our local finalized state
121+
// instead, rely on sync engine to catch up finalization more effectively, and avoid
122+
// large subtree of blocks to be cached.
123+
final, err := c.state.Final().Head()
124+
if err != nil {
125+
return fmt.Errorf("could not get latest finalized header: %w", err)
126+
}
127+
if header.Height > final.Height && header.Height-final.Height > c.config.SkipNewProposalsThreshold {
128+
log.Debug().
129+
Uint64("final_height", final.Height).
130+
Msg("dropping block too far ahead of locally finalized height")
131+
return nil
132+
}
133+
111134
// there are two possibilities if the proposal is neither already pending
112135
// processing in the cache, nor has already been processed:
113136
// 1) the proposal is unverifiable because parent or ancestor is unknown

engine/collection/compliance/core_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/onflow/flow-go/model/flow"
1818
"github.com/onflow/flow-go/model/messages"
1919
realbuffer "github.com/onflow/flow-go/module/buffer"
20+
"github.com/onflow/flow-go/module/compliance"
2021
"github.com/onflow/flow-go/module/metrics"
2122
module "github.com/onflow/flow-go/module/mock"
2223
clusterint "github.com/onflow/flow-go/state/cluster"
@@ -197,6 +198,22 @@ func (cs *ComplianceCoreSuite) TestOnBlockProposalValidParent() {
197198
cs.hotstuff.AssertExpectations(cs.T())
198199
}
199200

201+
func (cs *ComplianceCoreSuite) TestOnBlockProposalSkipProposalThreshold() {
202+
203+
// create a proposal which is far enough ahead to be dropped
204+
originID := unittest.IdentifierFixture()
205+
block := unittest.ClusterBlockFixture()
206+
block.Header.Height = cs.head.Header.Height + compliance.DefaultConfig().SkipNewProposalsThreshold + 1
207+
proposal := unittest.ClusterProposalFromBlock(&block)
208+
209+
err := cs.core.OnBlockProposal(originID, proposal)
210+
require.NoError(cs.T(), err)
211+
212+
// block should be dropped - not added to state or cache
213+
cs.state.AssertNotCalled(cs.T(), "Extend", mock.Anything)
214+
cs.pending.AssertNotCalled(cs.T(), "Add", originID, mock.Anything)
215+
}
216+
200217
func (cs *ComplianceCoreSuite) TestOnBlockProposalValidAncestor() {
201218

202219
// create a proposal that has two ancestors in the cache

engine/collection/epochmgr/factories/proposal.go

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/onflow/flow-go/engine/collection/compliance"
1010
"github.com/onflow/flow-go/module"
1111
"github.com/onflow/flow-go/module/buffer"
12+
modulecompliance "github.com/onflow/flow-go/module/compliance"
1213
"github.com/onflow/flow-go/network"
1314
"github.com/onflow/flow-go/state/cluster"
1415
"github.com/onflow/flow-go/state/protocol"
@@ -24,6 +25,7 @@ type ProposalEngineFactory struct {
2425
mempoolMetrics module.MempoolMetrics
2526
protoState protocol.State
2627
transactions storage.Transactions
28+
complianceOpts []modulecompliance.Opt
2729
}
2830

2931
// NewFactory returns a new collection proposal engine factory.
@@ -36,6 +38,7 @@ func NewProposalEngineFactory(
3638
mempoolMetrics module.MempoolMetrics,
3739
protoState protocol.State,
3840
transactions storage.Transactions,
41+
complianceOpts ...modulecompliance.Opt,
3942
) (*ProposalEngineFactory, error) {
4043

4144
factory := &ProposalEngineFactory{
@@ -47,6 +50,7 @@ func NewProposalEngineFactory(
4750
mempoolMetrics: mempoolMetrics,
4851
protoState: protoState,
4952
transactions: transactions,
53+
complianceOpts: complianceOpts,
5054
}
5155
return factory, nil
5256
}
@@ -68,6 +72,7 @@ func (f *ProposalEngineFactory) Create(
6872
clusterState,
6973
cache,
7074
voteAggregator,
75+
f.complianceOpts...,
7176
)
7277
if err != nil {
7378
return nil, fmt.Errorf("could create cluster compliance core: %w", err)

engine/common/follower/engine.go

+23
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/onflow/flow-go/model/flow"
1414
"github.com/onflow/flow-go/model/messages"
1515
"github.com/onflow/flow-go/module"
16+
"github.com/onflow/flow-go/module/compliance"
1617
"github.com/onflow/flow-go/module/metrics"
1718
"github.com/onflow/flow-go/module/trace"
1819
"github.com/onflow/flow-go/network"
@@ -25,6 +26,7 @@ import (
2526
type Engine struct {
2627
unit *engine.Unit
2728
log zerolog.Logger
29+
config compliance.Config
2830
me module.Local
2931
engMetrics module.EngineMetrics
3032
mempoolMetrics module.MempoolMetrics
@@ -53,11 +55,18 @@ func New(
5355
follower module.HotStuffFollower,
5456
sync module.BlockRequester,
5557
tracer module.Tracer,
58+
opts ...compliance.Opt,
5659
) (*Engine, error) {
5760

61+
config := compliance.DefaultConfig()
62+
for _, apply := range opts {
63+
apply(&config)
64+
}
65+
5866
e := &Engine{
5967
unit: engine.NewUnit(),
6068
log: log.With().Str("engine", "follower").Logger(),
69+
config: config,
6170
me: me,
6271
engMetrics: engMetrics,
6372
mempoolMetrics: mempoolMetrics,
@@ -215,6 +224,20 @@ func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.Bl
215224
return fmt.Errorf("could not check proposal: %w", err)
216225
}
217226

227+
// ignore proposals which are too far ahead of our local finalized state
228+
// instead, rely on sync engine to catch up finalization more effectively, and avoid
229+
// large subtree of blocks to be cached.
230+
final, err := e.state.Final().Head()
231+
if err != nil {
232+
return fmt.Errorf("could not get latest finalized header: %w", err)
233+
}
234+
if header.Height > final.Height && header.Height-final.Height > e.config.SkipNewProposalsThreshold {
235+
log.Debug().
236+
Uint64("final_height", final.Height).
237+
Msg("dropping block too far ahead of locally finalized height")
238+
return nil
239+
}
240+
218241
// there are two possibilities if the proposal is neither already pending
219242
// processing in the cache, nor has already been processed:
220243
// 1) the proposal is unverifiable because parent or ancestor is unknown

0 commit comments

Comments
 (0)