Skip to content

Commit

Permalink
eth/catalyst: ensure period zero mode leaves no pending txs in pool (e…
Browse files Browse the repository at this point in the history
…thereum#30264)

closes ethereum#29475, replaces ethereum#29657, ethereum#30104 

Fixes two issues. First is a deadlock where the txpool attempts to reorg, but can't complete because there are no readers left for the new txs subscription. Second, resolves a problem with on demand mode where txs may be left pending when there are more pending txs than block space.

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
2 people authored and zfy0701 committed Dec 3, 2024
1 parent b399475 commit 0533c8e
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 70 deletions.
21 changes: 4 additions & 17 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai"))
}
}
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false)
return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1)
}

// ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload
Expand All @@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa
return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads"))
}
}
return api.forkchoiceUpdated(update, params, engine.PayloadV2, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV2)
}

// ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root
Expand All @@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa
// hash, even if params are wrong. To do this we need to split up
// forkchoiceUpdate into a function that only updates the head and then a
// function that kicks off block construction.
return api.forkchoiceUpdated(update, params, engine.PayloadV3, false)
return api.forkchoiceUpdated(update, params, engine.PayloadV3)
}

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) {
func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) {
api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()

Expand Down Expand Up @@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
if api.localBlocks.has(id) {
return valid(&id), nil
}
// If the beacon chain is ran by a simulator, then transaction insertion,
// block insertion and block production will happen without any timing
// delay between them. This will cause flaky simulator executions due to
// the transaction pool running its internal reset operation on a back-
// ground thread. To avoid the racey behavior - in simulator mode - the
// pool will be explicitly blocked on its reset before continuing to the
// block production below.
if simulatorMode {
if err := api.eth.TxPool().Sync(); err != nil {
log.Error("Failed to sync transaction pool", "err", err)
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to build payload", "err", err)
Expand Down
87 changes: 51 additions & 36 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"math/big"
"sync"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
Expand All @@ -41,36 +43,46 @@ const devEpochLength = 32
// withdrawalQueue implements a FIFO queue which holds withdrawals that are
// pending inclusion.
type withdrawalQueue struct {
pending chan *types.Withdrawal
pending types.Withdrawals
mu sync.Mutex
feed event.Feed
subs event.SubscriptionScope
}

type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals }

// add queues a withdrawal for future inclusion.
func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error {
select {
case w.pending <- withdrawal:
break
default:
return errors.New("withdrawal queue full")
}
w.mu.Lock()
w.pending = append(w.pending, withdrawal)
w.mu.Unlock()

w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}})
return nil
}

// gatherPending returns a number of queued withdrawals up to a maximum count.
func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal {
withdrawals := []*types.Withdrawal{}
for {
select {
case withdrawal := <-w.pending:
withdrawals = append(withdrawals, withdrawal)
if len(withdrawals) == maxCount {
return withdrawals
}
default:
return withdrawals
}
}
// pop dequeues the specified number of withdrawals from the queue.
func (w *withdrawalQueue) pop(count int) types.Withdrawals {
w.mu.Lock()
defer w.mu.Unlock()

count = min(count, len(w.pending))
popped := w.pending[0:count]
w.pending = w.pending[count:]

return popped
}

// subscribe allows a listener to be updated when new withdrawals are added to
// the queue.
func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription {
sub := w.feed.Subscribe(ch)
return w.subs.Track(sub)
}

// SimulatedBeacon drives an Ethereum instance as if it were a real beacon
// client. It can run in period mode where it mines a new block every period
// (seconds) or on every transaction via Commit, Fork and AdjustTime.
type SimulatedBeacon struct {
shutdownCh chan struct{}
eth *eth.Ethereum
Expand All @@ -86,10 +98,6 @@ type SimulatedBeacon struct {
}

// NewSimulatedBeacon constructs a new simulated beacon chain.
// Period sets the period in which blocks should be produced.
//
// - If period is set to 0, a block is produced on every transaction.
// via Commit, Fork and AdjustTime.
func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) {
block := eth.BlockChain().CurrentBlock()
current := engine.ForkchoiceStateV1{
Expand All @@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err
engineAPI: engineAPI,
lastBlockTime: block.Time,
curForkchoiceState: current,
withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)},
}, nil
}

Expand Down Expand Up @@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
c.setCurrentState(header.Hash(), *finalizedHash)
}

// Because transaction insertion, block insertion, and block production will
// happen without any timing delay between them in simulator mode and the
// transaction pool will be running its internal reset operation on a
// background thread, flaky executions can happen. To avoid the racey
// behavior, the pool will be explicitly blocked on its reset before
// continuing to the block production below.
if err := c.eth.APIBackend.TxPool().Sync(); err != nil {
return fmt.Errorf("failed to sync txpool: %w", err)
}

var random [32]byte
rand.Read(random[:])
fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{
Expand All @@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
Withdrawals: withdrawals,
Random: random,
BeaconRoot: &common.Hash{},
}, engine.PayloadV3, true)
}, engine.PayloadV3)
if err != nil {
return err
}
if fcResponse == engine.STATUS_SYNCING {
return errors.New("chain rewind prevented invocation of payload creation")
}

envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true)
if err != nil {
return err
Expand Down Expand Up @@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() {
case <-c.shutdownCh:
return
case <-timer.C:
withdrawals := c.withdrawals.gatherPending(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
} else {
timer.Reset(time.Second * time.Duration(c.period))
Expand Down Expand Up @@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) {

// Commit seals a block on demand.
func (c *SimulatedBeacon) Commit() common.Hash {
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
}
Expand Down Expand Up @@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if parent == nil {
return errors.New("parent not found")
}
withdrawals := c.withdrawals.gatherPending(10)
withdrawals := c.withdrawals.pop(10)
return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second))
}

// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the
// stack.
func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) {
api := &api{sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
api := newSimulatedBeaconAPI(sim)
stack.RegisterAPIs([]rpc.API{
{
Namespace: "dev",
Expand Down
72 changes: 58 additions & 14 deletions eth/catalyst/simulated_beacon_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,88 @@ package catalyst

import (
"context"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

type api struct {
// simulatedBeaconAPI provides a RPC API for SimulatedBeacon.
type simulatedBeaconAPI struct {
sim *SimulatedBeacon
}

func (a *api) loop() {
// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a
// buffered commit channel. If period is zero, it starts a goroutine to handle
// new tx events.
func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI {
api := &simulatedBeaconAPI{sim: sim}
if sim.period == 0 {
// mine on demand if period is set to 0
go api.loop()
}
return api
}

// loop is the main loop for the API when it's running in period = 0 mode. It
// ensures that block production is triggered as soon as a new withdrawal or
// transaction is received.
func (a *simulatedBeaconAPI) loop() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newTxs = make(chan core.NewTxsEvent)
newWxs = make(chan newWithdrawalsEvent)
newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true)
newWxsSub = a.sim.withdrawals.subscribe(newWxs)
doCommit = make(chan struct{}, 1)
)
defer sub.Unsubscribe()
defer newTxsSub.Unsubscribe()
defer newWxsSub.Unsubscribe()

// A background thread which signals to the simulator when to commit
// based on messages over doCommit.
go func() {
for range doCommit {
a.sim.Commit()
a.sim.eth.TxPool().Sync()

// It's worth noting that in case a tx ends up in the pool listed as
// "executable", but for whatever reason the miner does not include it in
// a block -- maybe the miner is enforcing a higher tip than the pool --
// this code will spinloop.
for {
if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 {
break
}
a.sim.Commit()
}
}
}()

for {
select {
case <-a.sim.shutdownCh:
close(doCommit)
return
case w := <-a.sim.withdrawals.pending:
withdrawals := append(a.sim.withdrawals.gatherPending(9), w)
if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil {
log.Warn("Error performing sealing work", "err", err)
case <-newWxs:
select {
case doCommit <- struct{}{}:
default:
}
case <-newTxs:
a.sim.Commit()
select {
case doCommit <- struct{}{}:
default:
}
}
}
}

func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
// AddWithdrawal adds a withdrawal to the pending queue.
func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error {
return a.sim.withdrawals.add(withdrawal)
}

func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
// SetFeeRecipient sets the fee recipient for block building purposes.
func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) {
a.sim.setFeeRecipient(feeRecipient)
}
Loading

0 comments on commit 0533c8e

Please sign in to comment.