Skip to content

Commit

Permalink
Dynamic consensus protocol selection
Browse files Browse the repository at this point in the history
  • Loading branch information
pinebit committed Oct 11, 2024
1 parent 9960df1 commit 3d6080a
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 55 deletions.
45 changes: 32 additions & 13 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
"github.com/obolnetwork/charon/core/aggsigdb"
"github.com/obolnetwork/charon/core/bcast"
"github.com/obolnetwork/charon/core/consensus"
cprotocols "github.com/obolnetwork/charon/core/consensus/protocols"
cqbft "github.com/obolnetwork/charon/core/consensus/qbft"
"github.com/obolnetwork/charon/core/consensus/protocols"
"github.com/obolnetwork/charon/core/consensus/qbft"
"github.com/obolnetwork/charon/core/dutydb"
"github.com/obolnetwork/charon/core/fetcher"
"github.com/obolnetwork/charon/core/infosync"
Expand Down Expand Up @@ -258,8 +258,6 @@ func Run(ctx context.Context, conf Config) (err error) {

wirePeerInfo(life, tcpNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI)

consensusDebugger := consensus.NewDebugger()

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
seenPubkeysFunc := func(pk core.PubKey) {
Expand All @@ -282,6 +280,8 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

consensusDebugger := consensus.NewDebugger()

Check warning on line 284 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L283-L284

Added lines #L283 - L284 were not covered by tests
wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, tcpNode, eth2Cl, peerIDs,
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))

Expand Down Expand Up @@ -527,18 +527,18 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,

retryer := retry.New[core.Duty](deadlineFunc)

consensusFactory, err := consensus.NewConsensusFactory(
tcpNode, sender, peers, p2pKey,
deadlinerFunc("consensus"), gaterFunc, consensusDebugger.AddInstance)
consensusFactory, err := consensus.NewConsensusFactory(tcpNode, sender, peers, p2pKey, deadlinerFunc, gaterFunc, consensusDebugger)

Check warning on line 530 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L530

Added line #L530 was not covered by tests
if err != nil {
return err
}

defaultConsensus := consensusFactory.DefaultConsensus()
startDefaultConsensus := lifecycle.HookFuncCtx(defaultConsensus.Start)
coreConsensus := consensusFactory.CurrentConsensus()
startConsensus := lifecycle.HookFuncCtx(defaultConsensus.Start)

// Priority protocol always uses QBFTv2.

Check warning on line 539 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L535-L539

Added lines #L535 - L539 were not covered by tests
err = wirePrioritise(ctx, conf, life, tcpNode, peerIDs, int(cluster.GetThreshold()),
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc)
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc, consensusFactory)

Check warning on line 541 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L541

Added line #L541 was not covered by tests
if err != nil {
return err
}
Expand All @@ -558,12 +558,13 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

// Core always uses the "current" consensus that is changed dynamically.
opts := []core.WireOption{
core.WithTracing(),
core.WithTracking(track, inclusion),
core.WithAsyncRetry(retryer),
}
core.Wire(sched, fetch, defaultConsensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)
core.Wire(sched, fetch, coreConsensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)

Check warning on line 567 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L567

Added line #L567 was not covered by tests

err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched)
if err != nil {
Expand All @@ -575,7 +576,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
}

life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startDefaultConsensus)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startConsensus)

Check warning on line 579 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L579

Added line #L579 was not covered by tests
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartParSigDB, lifecycle.HookFuncCtx(parSigDB.Trim))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartTracker, lifecycle.HookFuncCtx(inclusion.Run))
Expand All @@ -590,8 +591,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, tcpNode host.Host,
peers []peer.ID, threshold int, sendFunc p2p.SendReceiveFunc, coreCons core.Consensus,
sched core.Scheduler, p2pKey *k1.PrivateKey, deadlineFunc func(duty core.Duty) (time.Time, bool),
consensusFactory core.ConsensusFactory,
) error {
cons, ok := coreCons.(*cqbft.Consensus)
cons, ok := coreCons.(*qbft.Consensus)

Check warning on line 596 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L596

Added line #L596 was not covered by tests
if !ok {
// Priority protocol not supported for leader cast.
return nil
Expand Down Expand Up @@ -626,6 +628,23 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t
prio.Subscribe(conf.TestConfig.PrioritiseCallback)
}

prio.Subscribe(func(ctx context.Context, _ core.Duty, tr []priority.TopicResult) error {
for _, t := range tr {
if t.Topic == infosync.TopicProtocol {
allProtocols := t.PrioritiesOnly()
preferredConsensusProtocol := protocols.MostPreferredConsensusProtocol(allProtocols)

if err := consensusFactory.SetCurrentConsensusForProtocol(protocol.ID(preferredConsensusProtocol)); err != nil {
log.Error(ctx, "Failed to set current consensus for protocol", err, z.Str("protocol", preferredConsensusProtocol))
} else {
log.Info(ctx, "Set current consensus for protocol", z.Str("protocol", preferredConsensusProtocol))
}

Check warning on line 641 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L631-L641

Added lines #L631 - L641 were not covered by tests
}
}

return nil

Check warning on line 645 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L645

Added line #L645 was not covered by tests
})

life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartPeerInfo, lifecycle.HookFuncCtx(prio.Start))

return nil
Expand Down Expand Up @@ -1066,7 +1085,7 @@ func (h httpServeHook) Call(context.Context) error {
// Protocols returns the list of supported Protocols in order of precedence.
func Protocols() []protocol.ID {
var resp []protocol.ID
resp = append(resp, cprotocols.Protocols()...)
resp = append(resp, protocols.Protocols()...)

Check warning on line 1088 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L1088

Added line #L1088 was not covered by tests
resp = append(resp, parsigex.Protocols()...)
resp = append(resp, peerinfo.Protocols()...)
resp = append(resp, priority.Protocols()...)
Expand Down
2 changes: 2 additions & 0 deletions core/consensus/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
)

//go:generate mockery --name=Debugger --output=mocks --outpkg=mocks --case=underscore

const maxDebuggerBuffer = 50 * (1 << 20) // 50 MB.

// Debugger is an interface for debugging consensus messages.
Expand Down
40 changes: 27 additions & 13 deletions core/consensus/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,31 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/consensus/protocols"
"github.com/obolnetwork/charon/core/consensus/qbft"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/p2p"
)

type DeadlinerFunc func(label string) core.Deadliner

type consensusFactory struct {
tcpNode host.Host
sender *p2p.Sender
peers []p2p.Peer
p2pKey *k1.PrivateKey
deadliner core.Deadliner
deadlinerFunc DeadlinerFunc
gaterFunc core.DutyGaterFunc
snifferFunc func(*pbv1.SniffedConsensusInstance)
debugger Debugger
defaultConsensus core.Consensus
wrappedConsensus *consensusWrapper
}

// NewConsensusFactory creates a new consensus factory with the default consensus protocol.
func NewConsensusFactory(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKey *k1.PrivateKey,
deadliner core.Deadliner, gaterFunc core.DutyGaterFunc, snifferFunc func(*pbv1.SniffedConsensusInstance),
deadlinerFunc DeadlinerFunc, gaterFunc core.DutyGaterFunc, debugger Debugger,
) (core.ConsensusFactory, error) {
defaultConsensus, err := qbft.NewConsensus(tcpNode, sender, peers, p2pKey, deadliner, gaterFunc, snifferFunc)
qbftDeadliner := deadlinerFunc("consensus.qbft")
defaultConsensus, err := qbft.NewConsensus(tcpNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance)
if err != nil {
return nil, err
}

Check warning on line 39 in core/consensus/factory.go

View check run for this annotation

Codecov / codecov/patch

core/consensus/factory.go#L38-L39

Added lines #L38 - L39 were not covered by tests
Expand All @@ -39,10 +43,11 @@ func NewConsensusFactory(tcpNode host.Host, sender *p2p.Sender, peers []p2p.Peer
sender: sender,
peers: peers,
p2pKey: p2pKey,
deadliner: deadliner,
deadlinerFunc: deadlinerFunc,
gaterFunc: gaterFunc,
snifferFunc: snifferFunc,
debugger: debugger,
defaultConsensus: defaultConsensus,
wrappedConsensus: newConsensusWrapper(defaultConsensus),
}, nil
}

Expand All @@ -51,13 +56,22 @@ func (f *consensusFactory) DefaultConsensus() core.Consensus {
return f.defaultConsensus
}

// ConsensusByProtocolID returns a consensus instance for the specified protocol ID.
func (f *consensusFactory) ConsensusByProtocolID(protocol protocol.ID) (core.Consensus, error) {
if f.defaultConsensus.ProtocolID() == protocol {
return f.defaultConsensus, nil
// CurrentConsensus returns the current consensus instance.
func (f *consensusFactory) CurrentConsensus() core.Consensus {
return f.wrappedConsensus
}

// SetCurrentConsensusForProtocol sets the current consensus instance for the given protocol id.
func (f *consensusFactory) SetCurrentConsensusForProtocol(protocol protocol.ID) error {
if f.wrappedConsensus.ProtocolID() == protocol {
return nil
}

Check warning on line 68 in core/consensus/factory.go

View check run for this annotation

Codecov / codecov/patch

core/consensus/factory.go#L67-L68

Added lines #L67 - L68 were not covered by tests

// TODO: support for more protocols, add map[protocol.ID]core.Consensus with a lock, etc.
if protocol == protocols.QBFTv2ProtocolID {
f.wrappedConsensus.SetImpl(f.defaultConsensus)

return nil
}

Check warning on line 74 in core/consensus/factory.go

View check run for this annotation

Codecov / codecov/patch

core/consensus/factory.go#L71-L74

Added lines #L71 - L74 were not covered by tests

return nil, errors.New("unknown consensus protocol")
return errors.New("unsupported protocol id")
}
42 changes: 16 additions & 26 deletions core/consensus/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/consensus"
csmocks "github.com/obolnetwork/charon/core/consensus/mocks"
"github.com/obolnetwork/charon/core/consensus/protocols"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
coremocks "github.com/obolnetwork/charon/core/mocks"
"github.com/obolnetwork/charon/eth2util/enr"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
Expand All @@ -31,7 +32,6 @@ func TestConsensusFactory(t *testing.T) {
random := rand.New(rand.NewSource(int64(seed)))
lock, p2pkeys, _ := cluster.NewForT(t, 1, 3, 3, seed, random)

snifferFunc := func(msgs *pbv1.SniffedConsensusInstance) {}
gaterFunc := func(core.Duty) bool { return true }

for i := range 3 {
Expand All @@ -54,33 +54,23 @@ func TestConsensusFactory(t *testing.T) {
hosts = append(hosts, h)
}

factory, err := consensus.NewConsensusFactory(hosts[0], new(p2p.Sender), peers, p2pkeys[0], testDeadliner{}, gaterFunc, snifferFunc)
deadlinerFunc := func(string) core.Deadliner {
return coremocks.NewDeadliner(t)
}
debugger := csmocks.NewDebugger(t)
factory, err := consensus.NewConsensusFactory(hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlinerFunc, gaterFunc, debugger)
require.NoError(t, err)
require.NotNil(t, factory)

defaultConsensus := factory.DefaultConsensus()
require.NotNil(t, defaultConsensus)
require.EqualValues(t, protocols.QBFTv2ProtocolID, defaultConsensus.ProtocolID())

sameConsesus, err := factory.ConsensusByProtocolID(protocols.QBFTv2ProtocolID)
require.NoError(t, err)
require.Equal(t, defaultConsensus, sameConsesus)

t.Run("unknown protocol", func(t *testing.T) {
_, err = factory.ConsensusByProtocolID("boo")
require.ErrorContains(t, err, "unknown consensus protocol")
t.Run("default and current consensus", func(t *testing.T) {
defaultConsensus := factory.DefaultConsensus()
require.NotNil(t, defaultConsensus)
require.EqualValues(t, protocols.QBFTv2ProtocolID, defaultConsensus.ProtocolID())
require.NotEqual(t, defaultConsensus, factory.CurrentConsensus()) // because the current is wrapped
})
}

// testDeadliner is a mock deadliner implementation.
type testDeadliner struct {
deadlineChan chan core.Duty
}

func (testDeadliner) Add(core.Duty) bool {
return true
}

func (t testDeadliner) C() <-chan core.Duty {
return t.deadlineChan
t.Run("unsupported protocol id", func(t *testing.T) {
err := factory.SetCurrentConsensusForProtocol("boo")
require.ErrorContains(t, err, "unsupported protocol id")
})
}
41 changes: 41 additions & 0 deletions core/consensus/mocks/debugger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions core/consensus/protocols/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ func Protocols() []protocol.ID {
return []protocol.ID{QBFTv2ProtocolID}
}

// MostPreferredConsensusProtocol returns the most preferred consensus protocol from the given list.
func MostPreferredConsensusProtocol(protocols []string) string {
for _, p := range protocols {
if strings.HasPrefix(p, protocolIDPrefix) {
return p
}
}

return QBFTv2ProtocolID
}

// IsSupportedProtocolName returns true if the protocol name is supported.
func IsSupportedProtocolName(name string) bool {
for _, p := range Protocols() {
Expand Down
16 changes: 16 additions & 0 deletions core/consensus/protocols/protocols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ func TestProtocols(t *testing.T) {
protocols.QBFTv2ProtocolID,
}, protocols.Protocols())
}

func TestMostPreferredConsensusProtocol(t *testing.T) {
t.Run("default is qbft", func(t *testing.T) {
require.Equal(t, protocols.QBFTv2ProtocolID, protocols.MostPreferredConsensusProtocol([]string{"unreal"}))
require.Equal(t, protocols.QBFTv2ProtocolID, protocols.MostPreferredConsensusProtocol([]string{}))
})

t.Run("latest abft is preferred", func(t *testing.T) {
pp := []string{
"/charon/consensus/abft/3.0.0",
"/charon/consensus/abft/1.0.0",
"/charon/consensus/qbft/1.0.0",
}
require.Equal(t, "/charon/consensus/abft/3.0.0", protocols.MostPreferredConsensusProtocol(pp))
})
}
Loading

0 comments on commit 3d6080a

Please sign in to comment.