Skip to content

Commit 6eb8f32

Browse files
authored
eth/catalyst: add locking around newpayload (#25816)
Sometimes we get stuck on db compaction, and the CL re-issues the "same" command to us multiple times. Each request get stuck on the same place, in the middle of the handler. This changes makes it so we do not reprocess the same payload, but instead detects it early.
1 parent 5d11d38 commit 6eb8f32

File tree

2 files changed

+124
-5
lines changed

2 files changed

+124
-5
lines changed

eth/catalyst/api.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ type ConsensusAPI struct {
120120
lastNewPayloadLock sync.Mutex
121121

122122
forkchoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
123+
newPayloadLock sync.Mutex // Lock for the NewPayload method
123124
}
124125

125126
// NewConsensusAPI creates a new consensus api for the given backend.
@@ -342,6 +343,22 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.Execu
342343

343344
// NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
344345
func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) {
346+
// The locking here is, strictly, not required. Without these locks, this can happen:
347+
//
348+
// 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to
349+
// api.eth.BlockChain().InsertBlockWithoutSetHead, where it is blocked on
350+
// e.g database compaction.
351+
// 2. The call times out on the CL layer, which issues another NewPayload (execdata-N) call.
352+
// Similarly, this also get stuck on the same place. Importantly, since the
353+
// first call has not gone through, the early checks for "do we already have this block"
354+
// will all return false.
355+
// 3. When the db compaction ends, then N calls inserting the same payload are processed
356+
// sequentially.
357+
// Hence, we use a lock here, to be sure that the previous call has finished before we
358+
// check whether we already have the block locally.
359+
api.newPayloadLock.Lock()
360+
defer api.newPayloadLock.Unlock()
361+
345362
log.Trace("Engine API request received", "method", "ExecutePayload", "number", params.Number, "hash", params.BlockHash)
346363
block, err := beacon.ExecutableDataToBlock(params)
347364
if err != nil {

eth/catalyst/api_test.go

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"fmt"
2222
"math/big"
23+
"sync"
2324
"testing"
2425
"time"
2526

@@ -277,10 +278,12 @@ func TestEth2NewBlock(t *testing.T) {
277278
t.Fatalf("Failed to convert executable data to block %v", err)
278279
}
279280
newResp, err := api.NewPayloadV1(*execData)
280-
if err != nil || newResp.Status != "VALID" {
281+
switch {
282+
case err != nil:
281283
t.Fatalf("Failed to insert block: %v", err)
282-
}
283-
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 {
284+
case newResp.Status != "VALID":
285+
t.Fatalf("Failed to insert block: %v", newResp.Status)
286+
case ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1:
284287
t.Fatalf("Chain head shouldn't be updated")
285288
}
286289
checkLogEvents(t, newLogCh, rmLogsCh, 0, 0)
@@ -292,8 +295,8 @@ func TestEth2NewBlock(t *testing.T) {
292295
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
293296
t.Fatalf("Failed to insert block: %v", err)
294297
}
295-
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() {
296-
t.Fatalf("Chain head should be updated")
298+
if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want {
299+
t.Fatalf("Chain head should be updated, have %d want %d", have, want)
297300
}
298301
checkLogEvents(t, newLogCh, rmLogsCh, 1, 0)
299302

@@ -855,3 +858,102 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
855858
t.Fatalf("error sending invalid forkchoice, invalid status: %v", resp.PayloadStatus.Status)
856859
}
857860
}
861+
862+
// TestSimultaneousNewBlock does several parallel inserts, both as
863+
// newPayLoad and forkchoiceUpdate. This is to test that the api behaves
864+
// well even of the caller is not being 'serial'.
865+
func TestSimultaneousNewBlock(t *testing.T) {
866+
genesis, preMergeBlocks := generatePreMergeChain(10)
867+
n, ethservice := startEthService(t, genesis, preMergeBlocks)
868+
defer n.Close()
869+
870+
var (
871+
api = NewConsensusAPI(ethservice)
872+
parent = preMergeBlocks[len(preMergeBlocks)-1]
873+
)
874+
for i := 0; i < 10; i++ {
875+
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
876+
ethservice.TxPool().AddLocal(types.MustSignNewTx(testKey, types.LatestSigner(ethservice.BlockChain().Config()),
877+
&types.DynamicFeeTx{
878+
Nonce: statedb.GetNonce(testAddr),
879+
Value: big.NewInt(0),
880+
GasFeeCap: big.NewInt(2 * params.InitialBaseFee),
881+
GasTipCap: big.NewInt(2 * params.InitialBaseFee),
882+
ChainID: genesis.Config.ChainID,
883+
Gas: 1000000,
884+
To: &common.Address{99},
885+
}))
886+
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
887+
Timestamp: parent.Time() + 5,
888+
})
889+
if err != nil {
890+
t.Fatalf("Failed to create the executable data %v", err)
891+
}
892+
// Insert it 10 times in parallel. Should be ignored.
893+
{
894+
var (
895+
wg sync.WaitGroup
896+
testErr error
897+
errMu sync.Mutex
898+
)
899+
wg.Add(10)
900+
for ii := 0; ii < 10; ii++ {
901+
go func() {
902+
defer wg.Done()
903+
if newResp, err := api.NewPayloadV1(*execData); err != nil {
904+
errMu.Lock()
905+
testErr = fmt.Errorf("Failed to insert block: %w", err)
906+
errMu.Unlock()
907+
} else if newResp.Status != "VALID" {
908+
errMu.Lock()
909+
testErr = fmt.Errorf("Failed to insert block: %v", newResp.Status)
910+
errMu.Unlock()
911+
}
912+
}()
913+
}
914+
wg.Wait()
915+
if testErr != nil {
916+
t.Fatal(testErr)
917+
}
918+
}
919+
block, err := beacon.ExecutableDataToBlock(*execData)
920+
if err != nil {
921+
t.Fatalf("Failed to convert executable data to block %v", err)
922+
}
923+
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 {
924+
t.Fatalf("Chain head shouldn't be updated")
925+
}
926+
fcState := beacon.ForkchoiceStateV1{
927+
HeadBlockHash: block.Hash(),
928+
SafeBlockHash: block.Hash(),
929+
FinalizedBlockHash: block.Hash(),
930+
}
931+
{
932+
var (
933+
wg sync.WaitGroup
934+
testErr error
935+
errMu sync.Mutex
936+
)
937+
wg.Add(10)
938+
// Do each FCU 10 times
939+
for ii := 0; ii < 10; ii++ {
940+
go func() {
941+
defer wg.Done()
942+
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
943+
errMu.Lock()
944+
testErr = fmt.Errorf("Failed to insert block: %w", err)
945+
errMu.Unlock()
946+
}
947+
}()
948+
}
949+
wg.Wait()
950+
if testErr != nil {
951+
t.Fatal(testErr)
952+
}
953+
}
954+
if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want {
955+
t.Fatalf("Chain head should be updated, have %d want %d", have, want)
956+
}
957+
parent = block
958+
}
959+
}

0 commit comments

Comments
 (0)