Skip to content

Commit

Permalink
op-node: Remove Unsafe Sync Target & Direct Engine Manipulation
Browse files Browse the repository at this point in the history
This fully moves all execution engine manipulation into the Engine Controller
from the Engine Queue. It does not remove the proxy methods however.
  • Loading branch information
trianglesphere committed Jan 9, 2024
1 parent 320fe93 commit 714a127
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 179 deletions.
5 changes: 0 additions & 5 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
}

func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef {
return s.derivation.EngineSyncTarget()
}

func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
Expand All @@ -163,7 +159,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
}
}

Expand Down
19 changes: 8 additions & 11 deletions op-e2e/actions/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,11 @@ func TestUnsafeSync(gt *testing.T) {
verifier.ActL2PipelineFull(t)
// Verifier must advance its unsafe head and engine sync target.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Check engine sync target updated.
require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}

func TestEngineP2PSync(gt *testing.T) {
func TestELSync(gt *testing.T) {
gt.Skip("not implemented yet")
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
Expand All @@ -179,8 +177,6 @@ func TestEngineP2PSync(gt *testing.T) {
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)

verifierUnsafeHead := verifier.L2Unsafe()

// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
Expand All @@ -195,12 +191,13 @@ func TestEngineP2PSync(gt *testing.T) {
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance only engine sync target.
require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash)
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
// Verifier must advance unsafe head after unsafe gossip.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
}
// Actual test flow should be as follows:
// 1. Build a chain on the sequencer.
// 2. Gossip only a single final L2 block from the sequencer to the verifier.
// 3. Assert that the verifier has the full chain.
}

func TestInvalidPayloadInSpanBatch(gt *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
}
}

Expand Down
134 changes: 112 additions & 22 deletions op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package derive

import (
"context"
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

var errNoFCUNeeded = errors.New("no FCU call was needed")

var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)

Expand All @@ -20,17 +24,18 @@ type ExecEngine interface {
}

type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
genesis *rollup.Genesis
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
genesis *rollup.Genesis
syncMode sync.Mode

// Block Head State
syncTarget eth.L2BlockRef
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef
safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef
needFCUCall bool

// Building State
buildingOnto eth.L2BlockRef
Expand All @@ -39,21 +44,18 @@ type EngineController struct {
safeAttrs *AttributesWithParent
}

func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis) *EngineController {
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis, syncMode sync.Mode) *EngineController {
return &EngineController{
engine: engine,
log: log,
metrics: metrics,
genesis: &genesis,
engine: engine,
log: log,
metrics: metrics,
genesis: &genesis,
syncMode: syncMode,
}
}

// State Getters

func (e *EngineController) EngineSyncTarget() eth.L2BlockRef {
return e.syncTarget
}

func (e *EngineController) UnsafeL2Head() eth.L2BlockRef {
return e.unsafeHead
}
Expand All @@ -75,21 +77,16 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
}

func (e *EngineController) IsEngineSyncing() bool {
return e.unsafeHead.Hash != e.syncTarget.Hash
return false
}

// Setters

// SetEngineSyncTarget implements LocalEngineControl.
func (e *EngineController) SetEngineSyncTarget(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_engineSyncTarget", r)
e.syncTarget = r
}

// SetFinalizedHead implements LocalEngineControl.
func (e *EngineController) SetFinalizedHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_finalized", r)
e.finalizedHead = r
e.needFCUCall = true
}

// SetPendingSafeL2Head implements LocalEngineControl.
Expand All @@ -102,12 +99,14 @@ func (e *EngineController) SetPendingSafeL2Head(r eth.L2BlockRef) {
func (e *EngineController) SetSafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_safe", r)
e.safeHead = r
e.needFCUCall = true
}

// SetUnsafeHead implements LocalEngineControl.
func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_unsafe", r)
e.unsafeHead = r
e.needFCUCall = true
}

// Engine Methods
Expand Down Expand Up @@ -165,7 +164,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
}

e.unsafeHead = ref
e.syncTarget = ref

e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
Expand Down Expand Up @@ -208,6 +206,98 @@ func (e *EngineController) resetBuildingState() {

// Misc Setters only used by the engine queue

// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}

// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}

// TryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if !e.needFCUCall {
return errNoFCUNeeded
}
if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{
HeadBlockHash: e.unsafeHead.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
_, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
e.needFCUCall = false
return nil
}

func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload, ref eth.L2BlockRef) error {
status, err := e.engine.NewPayload(ctx, payload)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if !e.checkNewPayloadStatus(status.Status) {
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.NewPayloadErr(payload, status)))
}

// Mark the new payload as valid
fc := eth.ForkchoiceState{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
}
if !e.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
e.unsafeHead = ref
e.needFCUCall = false

return nil
}

// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
Expand Down
Loading

0 comments on commit 714a127

Please sign in to comment.