Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
// ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation.
// In case they differ, Charon does not sign the attestation.
ChainSplitHalt = "chain_split_halt"

// PrepareProposer enables scheduling and processing of prepare proposer duties.
PrepareProposer = "prepare_proposer"
)

var (
Expand All @@ -88,6 +91,7 @@ var (
QUIC: statusAlpha,
FetchOnlyCommIdx0: statusAlpha,
ChainSplitHalt: statusAlpha,
PrepareProposer: statusAlpha,
// Add all features and their status here.
}

Expand Down
135 changes: 128 additions & 7 deletions core/consensus/qbft/qbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

k1 "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -40,15 +41,15 @@ var supportedCompareDuties = []core.DutyType{core.DutyAttester}

// newDefinition returns a qbft definition (this is constant across all consensus instances).
func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTimer,
decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]), compareAttestations bool,
decideCallback func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message),
isLeader func(duty core.Duty, round, process int64) bool,
compareAttestations bool,
) qbft.Definition[core.Duty, [32]byte, proto.Message] {
quorum := qbft.Definition[core.Duty, [32]byte, proto.Message]{Nodes: nodes}.Quorum()

return qbft.Definition[core.Duty, [32]byte, proto.Message]{
// IsLeader is a deterministic leader election function.
IsLeader: func(duty core.Duty, round, process int64) bool {
return leader(duty, round, nodes) == process
},
IsLeader: isLeader,

// Decide sends consensus output to subscribers.
Decide: func(ctx context.Context, duty core.Duty, _ [32]byte, qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
Expand All @@ -70,7 +71,7 @@ func newDefinition(nodes int, subs func() []subscriber, roundTimer timer.RoundTi
return
}

decideCallback(qcommit)
decideCallback(qcommit, value)

for _, sub := range subs() {
if err := sub(ctx, duty, value); err != nil {
Expand Down Expand Up @@ -279,6 +280,7 @@ func NewConsensus(p2pNode host.Host, sender *p2p.Sender, peers []p2p.Peer, p2pKe
compareAttestations: compareAttestations,
}
c.mutable.instances = make(map[core.Duty]*instance.IO[Msg])
c.prepareParticipation.data = make(map[uint64][]int64)

return c, nil
}
Expand Down Expand Up @@ -307,6 +309,15 @@ type Consensus struct {

instances map[core.Duty]*instance.IO[Msg]
}

// prepareParticipation stores peer indices that participated in DutyPrepareProposer consensus.
// This is used to adjust leader election for DutyProposer to skip offline/malicious nodes.
// Key is the slot of DutyPrepareProposer (which is slot-1 of the corresponding DutyProposer).
prepareParticipation struct {
sync.RWMutex

data map[uint64][]int64
}
}

// ProtocolID returns the protocol ID.
Expand Down Expand Up @@ -374,6 +385,26 @@ func (c *Consensus) Start(ctx context.Context) {
// waits until it completes, in both cases it returns the resulting error.
// Note this errors if called multiple times for the same duty.
func (c *Consensus) Propose(ctx context.Context, duty core.Duty, data core.UnsignedDataSet) error {
// Inject visible peers for PrepareProposer duty.
if duty.Type == core.DutyPrepareProposer {
var visible []uint64

for i, p := range c.peers {
// Include self and connected peers.
if p.ID == c.p2pNode.ID() || c.p2pNode.Network().Connectedness(p.ID) == network.Connected {
visible = append(visible, uint64(i))
}
}

// Update the data set.
for k, v := range data {
if p, ok := v.(core.PrepareProposerData); ok {
p.VisiblePeers = visible
data[k] = p
}
}
}

// Hash the proposed data, since qbft only supports simple comparable values.
value, err := core.UnsignedDataSetToProto(data)
if err != nil {
Expand Down Expand Up @@ -545,7 +576,7 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
span.End()
}()

decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
decideCallback := func(qcommit []qbft.Msg[core.Duty, [32]byte, proto.Message], value proto.Message) {
round := qcommit[0].Round()
decided = true

Expand All @@ -568,12 +599,23 @@ func (c *Consensus) runInstance(parent context.Context, duty core.Duty) (err err
span.SetAttributes(attribute.String("leader_name", leaderName))
span.AddEvent("qbft.Decided")

// Store participation for DutyPrepareProposer to be used in DutyProposer leader election.
if duty.Type == core.DutyPrepareProposer {
c.storeParticipation(duty.Slot, value)
}

// qbft.Run() is stopped by cancelling the context, or if an error occurred.
cancel()
}

// isLeader returns true if the given process is the leader for the given duty and round.
// For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available.
isLeader := func(d core.Duty, round, process int64) bool {
return c.leaderWithParticipation(d, round, nodes) == process
}

// Create a new qbft definition for this instance.
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, c.compareAttestations)
def := newDefinition(len(c.peers), c.subscribers, roundTimer, decideCallback, isLeader, c.compareAttestations)
origLogRoundChange := def.LogRoundChange
def.LogRoundChange = func(ctx context.Context, instance core.Duty, process, round, newRound int64, uponRule qbft.UponRule, msgs []qbft.Msg[core.Duty, [32]byte, proto.Message]) {
if origLogRoundChange != nil {
Expand Down Expand Up @@ -881,6 +923,85 @@ func fmtStepPeers(step roundStep) string {
return strings.Join(resp, "")
}

// storeParticipation stores peer indices that participated in DutyPrepareProposer consensus.
// It also cleans up entries older than 2 slots to prevent memory growth.
func (c *Consensus) storeParticipation(slot uint64, value proto.Message) {
// Convert proto message back to UnsignedDataSet
unsignedPB, ok := value.(*pbv1.UnsignedDataSet)
if !ok {
return
}

unsignedSet, err := core.UnsignedDataSetFromProto(core.DutyPrepareProposer, unsignedPB)
if err != nil {
return
}

var participants []int64

// Extract visible peers from the dataset
for _, v := range unsignedSet {
if p, ok := v.(core.PrepareProposerData); ok {
for _, peerIdx := range p.VisiblePeers {
participants = append(participants, int64(peerIdx))
}
// All entries in the set should have the same VisiblePeers since they come from the same leader proposal
break
}
}

// Sort for deterministic leader election across all nodes.
slices.Sort(participants)

c.prepareParticipation.Lock()
defer c.prepareParticipation.Unlock()

// Store participation for this slot.
c.prepareParticipation.data[slot] = participants

// Clean up entries older than 2 slots.
for s := range c.prepareParticipation.data {
if slot > 1 && s < slot-1 {
delete(c.prepareParticipation.data, s)
}
}
}

// getParticipants returns the list of peer indices that participated in DutyPrepareProposer
// for the given slot. Returns nil if no participation data is available.
func (c *Consensus) getParticipants(prepareSlot uint64) []int64 {
c.prepareParticipation.RLock()
defer c.prepareParticipation.RUnlock()

return c.prepareParticipation.data[prepareSlot]
}

// leaderWithParticipation returns the leader index for the given duty and round.
// For DutyProposer, it uses participation data from DutyPrepareProposer (slot-1) if available,
// which allows skipping offline/malicious nodes in leader election.
// For all other duties, it falls back to the standard leader election.
func (c *Consensus) leaderWithParticipation(duty core.Duty, round int64, nodes int) int64 {
if duty.Type != core.DutyProposer {
return leader(duty, round, nodes)
}

if duty.Slot == 0 {
return leader(duty, round, nodes)
}

participants := c.getParticipants(duty.Slot - 1)

// If no participation data (e.g., DutyPrepareProposer didn't reach consensus),
// fall back to standard leader election.
if len(participants) == 0 {
return leader(duty, round, nodes)
}

idx := (int64(duty.Slot) + int64(duty.Type) + round) % int64(len(participants))

return participants[idx]
}

// leader return the deterministic leader index.
func leader(duty core.Duty, round int64, nodes int) int64 {
return (int64(duty.Slot) + int64(duty.Type) + round) % int64(nodes)
Expand Down
Loading
Loading