Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 45 additions & 193 deletions consensus/dbft/dbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package dbft

import (
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -35,15 +33,12 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/antimev"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/dbft/dbftutil"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/systemcontracts"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -87,11 +82,6 @@ const (
// msgsChCap is a capacity of channel that accepts consensus messages from
// dBFT protocol.
msgsChCap = 100
// validatorsCacheCap is a capacity of validators cache. It's enough to store
// validators for only three potentially subsequent heights, i.e. three latest
// blocks to effectivaly verify dBFT payloads travelling through the network and
// properly initialize dBFT at the latest height.
validatorsCacheCap = 3
// crossEpochDecryptionStartRound is the number of DKG round (as denoted in KeyManagement
// system contract) starting from which continuous cross-epoch Envelopes decryption is supported.
// First DKG round setups sharing key, second DKG round setups resharing key, hence resharing
Expand Down Expand Up @@ -131,6 +121,9 @@ var (

// errUnauthorizedSigner is returned if a header is signed by a non-authorized entity.
errUnauthorizedSigner = errors.New("unauthorized signer")

// errNotInitializedBackend is returned if the eth API backend is not initialized
errNotInitializedBackend = errors.New("eth API backend is not initialized, DKG can't function properly")
)

// errShutdown represents an error caused by engine shutdown initiated by user.
Expand Down Expand Up @@ -224,11 +217,8 @@ type DBFT struct {
dkgTaskWatcherToCloseCh chan struct{}

// various native contract APIs that dBFT uses.
backend *ethapi.Backend
txAPI *ethapi.TransactionAPI
validatorsCache *lru.Cache[uint64, []common.Address]
// dkgIndexCache is a cache for storing the index array of the ordered validators
dkgIndexCache *lru.Cache[uint64, []int]
backend ethapi.Backend
txAPI *ethapi.TransactionAPI
// staticPool is a legacy pool instance for decrypted transaction verification,
// which is initialized once per height at postBlock callback. It should be reset
// before any reusage at the same height.
Expand Down Expand Up @@ -320,9 +310,6 @@ func New(chainCfg *params.ChainConfig, _ ethdb.Database, statisticsCfg Statistic
dkgTaskExecutorToCloseCh: make(chan struct{}),
dkgTaskWatcherToCloseCh: make(chan struct{}),

validatorsCache: lru.NewCache[uint64, []common.Address](validatorsCacheCap),
dkgIndexCache: lru.NewCache[uint64, []int](validatorsCacheCap),

dkgSnapshot: NewSnapshot(),
dkgTaskExecutorCh: make(chan *taskList, 2), // The maximum number of task lists per epoch is 3
dkgTaskWatcherCh: make(chan *taskList, 2),
Expand Down Expand Up @@ -445,12 +432,16 @@ func (c *DBFT) getValidatorsCb(txs ...dbft.Transaction[common.Hash]) []dbft.Publ
err error
)
if txs == nil {
// getValidatorsSorted with empty args is used by dbft to fill the list of
// GetValidatorsSortedByBlockNumber is used by dbft to fill the list of
// block's validators, thus should return validators from the current
// epoch without recalculation.
pKeys, err = c.getValidatorsSorted(&c.lastIndex, nil, nil)
if c.backend != nil {
pKeys, err = c.backend.GetValidatorsSortedByBlockNumber(c.lastIndex)
} else {
err = errNotInitializedBackend
}
}
// getValidatorsSorted with non-empty args is used by dbft to fill block's
// GetValidatorsSortedByState is used by dbft to fill block's
// NextConsensus field, but DBFT doesn't provide WithGetConsensusAddress
// callback and fills NextConsensus by itself via WithNewBlockFromContext
// callback. Thus, leave pKeys empty if txes != nil.
Expand Down Expand Up @@ -1064,10 +1055,16 @@ func (c *DBFT) processPreBlockCb(b dbft.PreBlock[common.Hash]) error {
sharesCurr = make(map[int][]*tpke.DecryptionShare, ctx.M())
sharesPrev = make(map[int][]*tpke.DecryptionShare)
blockNum = pre.header.Number.Uint64() - 1
dkgIndex int
err error
)
for _, preC := range ctx.PreCommitPayloads {
if preC != nil && preC.ViewNumber() == ctx.ViewNumber {
dkgIndex, err := c.getDKGIndex(int(preC.ValidatorIndex()), blockNum)
if c.backend != nil {
dkgIndex, err = c.backend.GetDKGIndex(blockNum, int(preC.ValidatorIndex()))
} else {
err = errNotInitializedBackend
}
if err != nil {
return fmt.Errorf("get DKG index failed: ValidatorIndex %d, block height %d", int(preC.ValidatorIndex()), blockNum)
}
Expand Down Expand Up @@ -1413,8 +1410,13 @@ func (c *DBFT) getBlockWitness(pub *tpke.PublicKey, block *Block) ([]byte, error
for i := 0; i < len(vals); i++ {
if p := dctx.CommitPayloads[i]; p != nil && p.ViewNumber() == dctx.ViewNumber {
var err error
var dkgIndex int
blockNum := block.header.Number.Uint64() - 1
dkgIndex, err := c.getDKGIndex(i, blockNum)
if c.backend != nil {
dkgIndex, err = c.backend.GetDKGIndex(blockNum, i)
} else {
err = errNotInitializedBackend
}
if err != nil {
return nil, fmt.Errorf("get DKG index failed: ValidatorIndex %d, block height %d", i, blockNum)
}
Expand Down Expand Up @@ -1447,7 +1449,7 @@ func (c *DBFT) getBlockWitness(pub *tpke.PublicKey, block *Block) ([]byte, error
// WithEthAPIBackend initializes Eth API backend and transaction API for
// proper consensus module work.
func (c *DBFT) WithEthAPIBackend(b ethapi.Backend) {
c.backend = &b
c.backend = b
c.txAPI = ethapi.NewTransactionAPI(b, new(ethapi.AddrLocker))
}

Expand Down Expand Up @@ -1487,47 +1489,6 @@ func (c *DBFT) WithRequestTxs(f func(hashed []common.Hash)) {
func (c *DBFT) WithMux(mux *event.TypeMux) {
c.mux = mux
c.blockQueue.SetMux(mux)

go c.syncWatcher()
}

// syncWatcher is a standalone loop aimed to be active irrespectively of dBFT engine
// activity. It tracks the first chain sync attempt till its end.
func (c *DBFT) syncWatcher() {
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
if !downloaderEvents.Closed() {
downloaderEvents.Unsubscribe()
}
}()
dlEventCh := downloaderEvents.Chan()

events:
for {
select {
case <-c.quit:
break events
case ev := <-dlEventCh:
if ev == nil {
// Unsubscription done, stop listening.
dlEventCh = nil
break events
}
switch ev.Data.(type) {
case downloader.StartEvent:
c.syncing.Store(true)

case downloader.FailedEvent:
c.syncing.Store(false)

case downloader.DoneEvent:
c.syncing.Store(false)

// Stop reacting to downloader events.
downloaderEvents.Unsubscribe()
}
}
}
}

// WithTxPool initializes transaction pool API for DBFT interactions with memory pool
Expand Down Expand Up @@ -2234,7 +2195,7 @@ func (c *DBFT) eventLoop() {
// been broadcasted the events are unregistered and the loop is exited. This to
// prevent a major security vuln where external parties can DOS you with blocks
// and halt your dBFT operation for as long as the DOS continues.
downloaderEvents := c.mux.Subscribe(downloader.DoneEvent{}, downloader.FailedEvent{})
downloaderEvents := c.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
dlEventCh := downloaderEvents.Chan()

events:
Expand Down Expand Up @@ -2294,10 +2255,16 @@ events:
continue
}
switch ev.Data.(type) {
case downloader.StartEvent:
c.syncing.Store(true)
case downloader.FailedEvent:
c.syncing.Store(false)

latest := c.chain.CurrentHeader()
c.handleChainBlock(latest, false)
case downloader.DoneEvent:
c.syncing.Store(false)

// Stop reacting to downloader events.
downloaderEvents.Unsubscribe()

Expand Down Expand Up @@ -2430,7 +2397,10 @@ func payloadFromMessage(ep *dbftproto.Message, getBlockExtraVersion func(*big.In

func (c *DBFT) validatePayload(p *Payload) error {
h := c.chain.CurrentBlock().Number.Uint64()
validators, err := c.getValidatorsSorted(&h, nil, nil)
if c.backend == nil {
return errNotInitializedBackend
}
validators, err := c.backend.GetValidatorsSortedByBlockNumber(h)
if err != nil {
return fmt.Errorf("failed to get next block validators: %w", err)
}
Expand All @@ -2446,25 +2416,6 @@ func (c *DBFT) validatePayload(p *Payload) error {
return nil
}

// IsExtensibleAllowed determines if address is allowed to send extensible payloads
// (only consensus payloads for now) at the specified height.
func (c *DBFT) IsExtensibleAllowed(h uint64, u common.Address) error {
// Can't verify extensible sender if the node has an outdated state.
if c.syncing.Load() {
return dbftproto.ErrSyncing
}
// Only validators are included into extensible whitelist for now.
validators, err := c.getValidatorsSorted(&h, nil, nil)
if err != nil {
return fmt.Errorf("failed to get validators: %w", err)
}
_, found := slices.BinarySearchFunc(validators, u, common.Address.Cmp)
if !found {
return fmt.Errorf("address is not a validator")
}
return nil
}

func (c *DBFT) newConsensusPayloadCb(ctx *dbft.Context[common.Hash], t dbft.MessageType, msg any) dbft.ConsensusPayload[common.Hash] {
var cp = new(Payload)
cp.BlockIndex = uint64(ctx.BlockIndex)
Expand Down Expand Up @@ -2533,7 +2484,10 @@ func (c *DBFT) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, pa

func (c *DBFT) calcDifficulty(signer common.Address, parent *types.Header) *big.Int {
h := parent.Number.Uint64()
vals, err := c.getValidatorsSorted(&h, nil, nil)
if c.backend == nil {
return nil
}
vals, err := c.backend.GetValidatorsSortedByBlockNumber(h)
if err != nil {
return nil
}
Expand Down Expand Up @@ -2722,111 +2676,6 @@ func (c *DBFT) APIs(chain consensus.ChainHeaderReader) []rpc.API {
}}
}

// getValidatorsSorted returns validators chosen in the result of the latest
// finalized voting epoch. It calls Governance contract under the hood. The call
// is based on the provided state or (if not provided) on the state of the block
// with the specified height. Validators returned from this method are always
// sorted by bytes order (even if the list returned from governance contract is
// sorted in another way). This method uses cached values in case of validators
// requested by block height.
func (c *DBFT) getValidatorsSorted(blockNum *uint64, state *state.StateDB, header *types.Header) ([]common.Address, error) {
res, err := c.getValidators(blockNum, state, header)
if err != nil {
return nil, err
}

sortedList := slices.Clone(res)
slices.SortFunc(sortedList, common.Address.Cmp)
return sortedList, err
}

// getValidators returns validators chosen in the result of the latest finalized
// voting epoch. It calls Governance contract under the hood. The call is based
// on the provided state or (if not provided) on the state of the block with the
// specified height. Validators returned from this method are sorted in the original
// order used by Governance contract. This method uses cached values in case of
// validators requested by block height.
func (c *DBFT) getValidators(blockNum *uint64, state *state.StateDB, header *types.Header) ([]common.Address, error) {
if c.backend == nil {
return nil, errors.New("eth API backend is not initialized, dBFT can't function properly")
}

if state == nil && blockNum != nil {
vals, ok := c.validatorsCache.Get(*blockNum)
if ok {
return vals, nil
}
}

// Perform smart contract call.
method := "getCurrentConsensus" // latest finalized epoch validators.
data, err := systemcontracts.GovernanceABI.Pack(method)
if err != nil {
return nil, fmt.Errorf("failed to pack '%s': %w", method, err)
}
msgData := hexutil.Bytes(data)
gas := hexutil.Uint64(50_000_000) // more than enough for validators call processing.
args := ethapi.TransactionArgs{
Gas: &gas,
To: &systemcontracts.GovernanceProxyHash,
Data: &msgData,
}

ctx, cancel := context.WithCancel(context.Background())
// Cancel when we are finished consuming integers.
defer cancel()
var result *core.ExecutionResult
if state != nil {
result, err = ethapi.DoCallAtState(ctx, *c.backend, args, state, header, nil, nil, 0, 0)
} else if blockNum != nil {
blockNr := rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(*blockNum))
result, err = ethapi.DoCall(ctx, *c.backend, args, blockNr, nil, nil, 0, 0)
} else {
return nil, fmt.Errorf("failed to compute validators: both block number and state are nil")
}
if err != nil {
return nil, fmt.Errorf("failed to perform '%s' call: %w", method, err)
}
var res []common.Address
err = unpackContractExecutionResult(&res, result, systemcontracts.GovernanceABI, method)
if err != nil {
return nil, err
}

// Update cache in case if existing state was used for validators retrieval.
if state == nil && blockNum != nil {
_ = c.validatorsCache.Add(*blockNum, res)
}

return res, err
}

// getDKGIndex returns validator dkg index (original validator index +1) by validatorIndex (ordered validator index).
func (c *DBFT) getDKGIndex(validatorIndex int, blockNum uint64) (int, error) {
indices, ok := c.dkgIndexCache.Get(blockNum)
if !ok {
originValidators, err := c.getValidators(&blockNum, nil, nil)
if err != nil {
return -1, err
}

indices = make([]int, len(originValidators))
for i := range indices {
indices[i] = i
}
sort.Slice(indices, func(i, j int) bool {
return originValidators[indices[i]].Cmp(originValidators[indices[j]]) < 0
})
_ = c.dkgIndexCache.Add(blockNum, indices)
}

if validatorIndex < 0 || validatorIndex >= len(indices) {
return -1, fmt.Errorf("invalid validator index: validators count is %d, requested %d", len(indices), validatorIndex)
}
dkgIndex := indices[validatorIndex] + 1
return dkgIndex, nil
}

// getGlobalPublicKey returns TPKE global public key. If state is provided, then this state
// is used to recalculate local key based on the KeyManagement contract state. If state is
// not provided, then the node's local keystore is used to retrieve global public key.
Expand Down Expand Up @@ -2860,7 +2709,10 @@ func (c *DBFT) getGlobalPublicKey(h *types.Header, s *state.StateDB) (*tpke.Publ
func (c *DBFT) getNextConsensus(h *types.Header, s *state.StateDB) (common.Hash, common.Hash) {
var multisig, threshold common.Hash

nextVals, err := c.getValidatorsSorted(nil, s.Copy(), h)
if c.backend == nil {
log.Crit("Can't calculate next consensus", "err", errNotInitializedBackend)
}
nextVals, err := c.backend.GetValidatorsSortedByState(s.Copy(), h)
if err != nil {
log.Crit("Failed to compute next block validators",
"err", err)
Expand Down
Loading
Loading