Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: consensus abstraction #3327

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3a36696
*: initial consensus factory impl
pinebit Oct 8, 2024
a70587a
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 8, 2024
109e350
Fixed vapi test
pinebit Oct 9, 2024
2df6290
More of QBFT refactoring
pinebit Oct 9, 2024
8da6e82
Updated protoc version
pinebit Oct 9, 2024
d168ebb
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 10, 2024
33a9b90
Consensus package refactoring
pinebit Oct 10, 2024
5b4d360
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 10, 2024
f758914
ConsensusMetrics interface
pinebit Oct 10, 2024
9324fa7
ConsensusMetrics interface
pinebit Oct 10, 2024
6accee3
More of refactoring and tests
pinebit Oct 11, 2024
9960df1
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 11, 2024
3d6080a
Dynamic consensus protocol selection
pinebit Oct 11, 2024
cc24de6
Consensus protocol runtime selection
pinebit Oct 11, 2024
d70cdee
Improved wording
pinebit Oct 11, 2024
f6bd661
Unexported odd symbols
pinebit Oct 14, 2024
31e90fa
Added docs/consensus.md draft
pinebit Oct 14, 2024
4b1f239
Addressed PR feedback
pinebit Oct 16, 2024
99ea803
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 16, 2024
75321cb
Addressed PR feedback
pinebit Oct 16, 2024
bbc08f3
Addressed PR feedback
pinebit Oct 16, 2024
8a897d8
Updated metrics.md
pinebit Oct 16, 2024
cb832ed
More of refactoring
pinebit Oct 16, 2024
23dc53e
Merge branch 'main' into pinebit/consensus-factory
pinebit Oct 16, 2024
4d432f2
More of refactoring
pinebit Oct 17, 2024
078de38
Better deadliner usage
pinebit Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 58 additions & 30 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
"github.com/obolnetwork/charon/core/aggsigdb"
"github.com/obolnetwork/charon/core/bcast"
"github.com/obolnetwork/charon/core/consensus"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/core/consensus/protocols"
"github.com/obolnetwork/charon/core/consensus/qbft"
"github.com/obolnetwork/charon/core/dutydb"
"github.com/obolnetwork/charon/core/fetcher"
"github.com/obolnetwork/charon/core/infosync"
Expand Down Expand Up @@ -92,6 +93,7 @@
SimnetBMockFuzz bool
TestnetConfig eth2util.Network
ProcDirectory string
ConsensusProtocol string

TestConfig TestConfig
}
Expand Down Expand Up @@ -257,8 +259,6 @@

wirePeerInfo(life, tcpNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI)

consensusDebugger := consensus.NewDebugger()

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
seenPubkeysFunc := func(pk core.PubKey) {
Expand All @@ -281,6 +281,8 @@
return err
}

consensusDebugger := consensus.NewDebugger()

Check warning on line 285 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L284-L285

Added lines #L284 - L285 were not covered by tests
wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, tcpNode, eth2Cl, peerIDs,
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))

Expand Down Expand Up @@ -524,16 +526,25 @@
return err
}

retryer := retry.New[core.Duty](deadlineFunc)
retryer := retry.New(deadlineFunc)

Check warning on line 529 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L529

Added line #L529 was not covered by tests

cons, startCons, err := newConsensus(cluster, tcpNode, p2pKey, sender,
deadlinerFunc("consensus"), gaterFunc, consensusDebugger.AddInstance)
// Consensus
consensusController, err := consensus.NewConsensusController(
ctx, tcpNode, sender, peers, p2pKey,
deadlineFunc, gaterFunc, consensusDebugger)

Check warning on line 534 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L531-L534

Added lines #L531 - L534 were not covered by tests
if err != nil {
return err
}

defaultConsensus := consensusController.DefaultConsensus()
startConsensusCtrl := lifecycle.HookFuncCtx(consensusController.Start)

coreConsensus := consensusController.CurrentConsensus() // initially points to DefaultConsensus()

// Priority protocol always uses QBFTv2.

Check warning on line 544 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L539-L544

Added lines #L539 - L544 were not covered by tests
err = wirePrioritise(ctx, conf, life, tcpNode, peerIDs, int(cluster.GetThreshold()),
sender.SendReceive, cons, sched, p2pKey, deadlineFunc)
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc,
consensusController, cluster.GetConsensusProtocol())

Check warning on line 547 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L546-L547

Added lines #L546 - L547 were not covered by tests
if err != nil {
return err
}
Expand All @@ -553,12 +564,13 @@
return err
}

// Core always uses the "current" consensus that is changed dynamically.
opts := []core.WireOption{
core.WithTracing(),
core.WithTracking(track, inclusion),
core.WithAsyncRetry(retryer),
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)
core.Wire(sched, fetch, coreConsensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

Check warning on line 573 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L573

Added line #L573 was not covered by tests

err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched)
if err != nil {
Expand All @@ -570,7 +582,7 @@
}

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startCons)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startConsensusCtrl)

Check warning on line 585 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L585

Added line #L585 was not covered by tests
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartParSigDB, lifecycle.HookFuncCtx(parSigDB.Trim))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartTracker, lifecycle.HookFuncCtx(inclusion.Run))
Expand All @@ -585,8 +597,9 @@
func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, tcpNode host.Host,
peers []peer.ID, threshold int, sendFunc p2p.SendReceiveFunc, coreCons core.Consensus,
sched core.Scheduler, p2pKey *k1.PrivateKey, deadlineFunc func(duty core.Duty) (time.Time, bool),
consensusController core.ConsensusController, clusterPreferredProtocol string,
) error {
cons, ok := coreCons.(*consensus.Component)
cons, ok := coreCons.(*qbft.Consensus)

Check warning on line 602 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L602

Added line #L602 was not covered by tests
KaloyanTanev marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
// Priority protocol not supported for leader cast.
return nil
Expand All @@ -602,9 +615,22 @@
return err
}

// The initial protocols order as defined by implementation is altered by:
// 1. Prioritizing the cluster (lock) preferred protocol to the top.
// 2. Prioritizing the protocol specified by CLI flag (cluster run) to the top.
// In all cases this prioritizes all versions of the protocol identified by name.
// The order of all these operations are important.
allProtocols := Protocols()
if clusterPreferredProtocol != "" {
allProtocols = protocols.PrioritizeProtocolsByName(clusterPreferredProtocol, allProtocols)
}
if conf.ConsensusProtocol != "" {
allProtocols = protocols.PrioritizeProtocolsByName(conf.ConsensusProtocol, allProtocols)
}

Check warning on line 629 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L623-L629

Added lines #L623 - L629 were not covered by tests
KaloyanTanev marked this conversation as resolved.
Show resolved Hide resolved

isync := infosync.New(prio,
version.Supported(),
Protocols(),
allProtocols,

Check warning on line 633 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L633

Added line #L633 was not covered by tests
ProposalTypes(conf.BuilderAPI, conf.SyntheticBlockProposals),
)

Expand All @@ -621,6 +647,26 @@
prio.Subscribe(conf.TestConfig.PrioritiseCallback)
}

prio.Subscribe(func(ctx context.Context, _ core.Duty, tr []priority.TopicResult) error {
for _, t := range tr {
if t.Topic == infosync.TopicProtocol {
allProtocols := t.PrioritiesOnly()
preferredConsensusProtocol := protocols.MostPreferredConsensusProtocol(allProtocols)
preferredConsensusProtocolID := protocol.ID(preferredConsensusProtocol)

if err := consensusController.SetCurrentConsensusForProtocol(ctx, preferredConsensusProtocolID); err != nil {
log.Error(ctx, "Failed to set current consensus protocol", err, z.Str("protocol", preferredConsensusProtocol))
} else {
log.Info(ctx, "Current consensus protocol changed", z.Str("protocol", preferredConsensusProtocol))
}

Check warning on line 661 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L650-L661

Added lines #L650 - L661 were not covered by tests

break

Check warning on line 663 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L663

Added line #L663 was not covered by tests
}
}

return nil

Check warning on line 667 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L667

Added line #L667 was not covered by tests
})

life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartPeerInfo, lifecycle.HookFuncCtx(prio.Start))

return nil
Expand Down Expand Up @@ -918,24 +964,6 @@
return eth2Cl, nil
}

// newConsensus returns a new consensus component and its start lifecycle hook.
func newConsensus(cluster *manifestpb.Cluster, tcpNode host.Host, p2pKey *k1.PrivateKey,
sender *p2p.Sender, deadliner core.Deadliner, gaterFunc core.DutyGaterFunc,
qbftSniffer func(*pbv1.SniffedConsensusInstance),
) (core.Consensus, lifecycle.IHookFunc, error) {
peers, err := manifest.ClusterPeers(cluster)
if err != nil {
return nil, nil, err
}

comp, err := consensus.New(tcpNode, sender, peers, p2pKey, deadliner, gaterFunc, qbftSniffer)
if err != nil {
return nil, nil, err
}

return comp, lifecycle.HookFuncCtx(comp.Start), nil
}

// createMockValidators creates mock validators identified by their public shares.
func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
resp := make(beaconmock.ValidatorSet)
Expand Down Expand Up @@ -1079,7 +1107,7 @@
// Protocols returns the list of supported Protocols in order of precedence.
func Protocols() []protocol.ID {
var resp []protocol.ID
resp = append(resp, consensus.Protocols()...)
resp = append(resp, protocols.Protocols()...)

Check warning on line 1110 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L1110

Added line #L1110 was not covered by tests
resp = append(resp, parsigex.Protocols()...)
resp = append(resp, peerinfo.Protocols()...)
resp = append(resp, priority.Protocols()...)
Expand Down
13 changes: 7 additions & 6 deletions cluster/manifest/mutationlegacylock.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ func transformLegacyLock(input *manifestpb.Cluster, signed *manifestpb.SignedMut
}

return &manifestpb.Cluster{
Name: lock.Name,
Threshold: int32(lock.Threshold),
DkgAlgorithm: lock.DKGAlgorithm,
ForkVersion: lock.ForkVersion,
Validators: vals,
Operators: ops,
Name: lock.Name,
Threshold: int32(lock.Threshold),
DkgAlgorithm: lock.DKGAlgorithm,
ForkVersion: lock.ForkVersion,
ConsensusProtocol: lock.ConsensusProtocol,
Validators: vals,
Operators: ops,
}, nil
}

Expand Down
Loading
Loading