Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-node: Remove Unsafe Sync Target & Direct Engine Manipulation #8856

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions op-e2e/actions/l2_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ func (s *L2Verifier) L2Unsafe() eth.L2BlockRef {
return s.derivation.UnsafeL2Head()
}

func (s *L2Verifier) EngineSyncTarget() eth.L2BlockRef {
return s.derivation.EngineSyncTarget()
}

func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
return &eth.SyncStatus{
CurrentL1: s.derivation.Origin(),
Expand All @@ -163,7 +159,6 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus {
FinalizedL2: s.L2Finalized(),
PendingSafeL2: s.L2PendingSafe(),
UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(),
EngineSyncTarget: s.EngineSyncTarget(),
}
}

Expand Down
24 changes: 12 additions & 12 deletions op-e2e/actions/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func FinalizeWhileSyncing(gt *testing.T, deltaTimeOffset *hexutil.Uint64) {
require.Less(t, verifierStartStatus.FinalizedL2.Number, verifier.SyncStatus().FinalizedL2.Number, "verifier finalized L2 blocks during sync")
}

// TestUnsafeSync tests that a verifier properly imports unsafe blocks via gossip.
func TestUnsafeSync(gt *testing.T) {
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
Expand All @@ -155,15 +156,15 @@ func TestUnsafeSync(gt *testing.T) {
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance its unsafe head and engine sync target.
// Verifier must advance its unsafe head.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
// Check engine sync target updated.
require.Equal(t, sequencer.L2Unsafe().Hash, sequencer.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
}
}

func TestEngineP2PSync(gt *testing.T) {
// TestELSync tests that a verifier will have the EL import the full chain from the sequencer
// when passed a single unsafe block. op-geth can either snap sync or full sync here.
func TestELSync(gt *testing.T) {
gt.Skip("not implemented yet")
t := NewDefaultTesting(gt)
dp := e2eutils.MakeDeployParams(t, defaultRollupTestParams)
sd := e2eutils.Setup(t, dp, defaultAlloc)
Expand All @@ -179,8 +180,6 @@ func TestEngineP2PSync(gt *testing.T) {
sequencer.ActL2PipelineFull(t)
verifier.ActL2PipelineFull(t)

verifierUnsafeHead := verifier.L2Unsafe()

// Build a L2 block. This block will not be gossiped to verifier, so verifier can not advance chain by itself.
sequencer.ActL2StartBlock(t)
sequencer.ActL2EndBlock(t)
Expand All @@ -195,12 +194,13 @@ func TestEngineP2PSync(gt *testing.T) {
verifier.ActL2UnsafeGossipReceive(seqHead)(t)
// Handle unsafe payload
verifier.ActL2PipelineFull(t)
// Verifier must advance only engine sync target.
require.NotEqual(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
require.NotEqual(t, verifier.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
require.Equal(t, verifier.L2Unsafe().Hash, verifierUnsafeHead.Hash)
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.EngineSyncTarget().Hash)
// Verifier must advance unsafe head after unsafe gossip.
require.Equal(t, sequencer.L2Unsafe().Hash, verifier.L2Unsafe().Hash)
}
// Actual test flow should be as follows:
// 1. Build a chain on the sequencer.
// 2. Gossip only a single final L2 block from the sequencer to the verifier.
// 3. Assert that the verifier has the full chain.
}

func TestInvalidPayloadInSpanBatch(gt *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion op-node/node/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus {
FinalizedL2: testutils.RandomL2BlockRef(rng),
PendingSafeL2: testutils.RandomL2BlockRef(rng),
UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng),
EngineSyncTarget: testutils.RandomL2BlockRef(rng),
}
}

Expand Down
134 changes: 112 additions & 22 deletions op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package derive

import (
"context"
"errors"
"fmt"

"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/sync"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

var errNoFCUNeeded = errors.New("no FCU call was needed")

var _ EngineControl = (*EngineController)(nil)
var _ LocalEngineControl = (*EngineController)(nil)

Expand All @@ -20,17 +24,18 @@ type ExecEngine interface {
}

type EngineController struct {
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
genesis *rollup.Genesis
engine ExecEngine // Underlying execution engine RPC
log log.Logger
metrics Metrics
genesis *rollup.Genesis
syncMode sync.Mode

// Block Head State
syncTarget eth.L2BlockRef
unsafeHead eth.L2BlockRef
pendingSafeHead eth.L2BlockRef
safeHead eth.L2BlockRef
finalizedHead eth.L2BlockRef
needFCUCall bool

// Building State
buildingOnto eth.L2BlockRef
Expand All @@ -39,21 +44,18 @@ type EngineController struct {
safeAttrs *AttributesWithParent
}

func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis) *EngineController {
func NewEngineController(engine ExecEngine, log log.Logger, metrics Metrics, genesis rollup.Genesis, syncMode sync.Mode) *EngineController {
return &EngineController{
engine: engine,
log: log,
metrics: metrics,
genesis: &genesis,
engine: engine,
log: log,
metrics: metrics,
genesis: &genesis,
syncMode: syncMode,
}
}

// State Getters

func (e *EngineController) EngineSyncTarget() eth.L2BlockRef {
return e.syncTarget
}

func (e *EngineController) UnsafeL2Head() eth.L2BlockRef {
return e.unsafeHead
}
Expand All @@ -75,21 +77,16 @@ func (e *EngineController) BuildingPayload() (eth.L2BlockRef, eth.PayloadID, boo
}

func (e *EngineController) IsEngineSyncing() bool {
return e.unsafeHead.Hash != e.syncTarget.Hash
return false
}

// Setters

// SetEngineSyncTarget implements LocalEngineControl.
func (e *EngineController) SetEngineSyncTarget(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_engineSyncTarget", r)
e.syncTarget = r
}

// SetFinalizedHead implements LocalEngineControl.
func (e *EngineController) SetFinalizedHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_finalized", r)
e.finalizedHead = r
e.needFCUCall = true
}

// SetPendingSafeL2Head implements LocalEngineControl.
Expand All @@ -102,12 +99,14 @@ func (e *EngineController) SetPendingSafeL2Head(r eth.L2BlockRef) {
func (e *EngineController) SetSafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_safe", r)
e.safeHead = r
e.needFCUCall = true
}

// SetUnsafeHead implements LocalEngineControl.
func (e *EngineController) SetUnsafeHead(r eth.L2BlockRef) {
e.metrics.RecordL2Ref("l2_unsafe", r)
e.unsafeHead = r
e.needFCUCall = true
}

// Engine Methods
Expand Down Expand Up @@ -165,7 +164,6 @@ func (e *EngineController) ConfirmPayload(ctx context.Context) (out *eth.Executi
}

e.unsafeHead = ref
e.syncTarget = ref

e.metrics.RecordL2Ref("l2_unsafe", ref)
e.metrics.RecordL2Ref("l2_engineSyncTarget", ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove this as well.

Expand Down Expand Up @@ -208,6 +206,98 @@ func (e *EngineController) resetBuildingState() {

// Misc Setters only used by the engine queue

// checkNewPayloadStatus checks returned status of engine_newPayloadV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkNewPayloadStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING and ACCEPTED if engine EL sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing || status == eth.ExecutionAccepted
}
return status == eth.ExecutionValid
}

// checkForkchoiceUpdatedStatus checks returned status of engine_forkchoiceUpdatedV1 request for next unsafe payload.
// It returns true if the status is acceptable.
func (e *EngineController) checkForkchoiceUpdatedStatus(status eth.ExecutePayloadStatus) bool {
if e.syncMode == sync.ELSync {
// Allow SYNCING if engine P2P sync is enabled
return status == eth.ExecutionValid || status == eth.ExecutionSyncing
}
return status == eth.ExecutionValid
}

// TryUpdateEngine attempts to update the engine with the current forkchoice state of the rollup node,
// this is a no-op if the nodes already agree on the forkchoice state.
func (e *EngineController) TryUpdateEngine(ctx context.Context) error {
if !e.needFCUCall {
return errNoFCUNeeded
}
if e.IsEngineSyncing() {
e.log.Warn("Attempting to update forkchoice state while engine is P2P syncing")
}
fc := eth.ForkchoiceState{
HeadBlockHash: e.unsafeHead.Hash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
_, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to sync forkchoice with engine: %w", err))
}
}
e.needFCUCall = false
return nil
}

func (e *EngineController) InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayload, ref eth.L2BlockRef) error {
status, err := e.engine.NewPayload(ctx, payload)
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to update insert payload: %w", err))
}
if !e.checkNewPayloadStatus(status.Status) {
return NewTemporaryError(fmt.Errorf("cannot process unsafe payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.NewPayloadErr(payload, status)))
}

// Mark the new payload as valid
fc := eth.ForkchoiceState{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: e.safeHead.Hash,
FinalizedBlockHash: e.finalizedHead.Hash,
}
fcRes, err := e.engine.ForkchoiceUpdate(ctx, &fc, nil)
if err != nil {
var inputErr eth.InputError
if errors.As(err, &inputErr) {
switch inputErr.Code {
case eth.InvalidForkchoiceState:
return NewResetError(fmt.Errorf("pre-unsafe-block forkchoice update was inconsistent with engine, need reset to resolve: %w", inputErr.Unwrap()))
default:
return NewTemporaryError(fmt.Errorf("unexpected error code in forkchoice-updated response: %w", err))
}
} else {
return NewTemporaryError(fmt.Errorf("failed to update forkchoice to prepare for new unsafe payload: %w", err))
}
}
if !e.checkForkchoiceUpdatedStatus(fcRes.PayloadStatus.Status) {
return NewTemporaryError(fmt.Errorf("cannot prepare unsafe chain for new payload: new - %v; parent: %v; err: %w",
payload.ID(), payload.ParentID(), eth.ForkchoiceUpdateErr(fcRes.PayloadStatus)))
}
e.unsafeHead = ref
e.needFCUCall = false

return nil
}

// ResetBuildingState implements LocalEngineControl.
func (e *EngineController) ResetBuildingState() {
e.resetBuildingState()
Expand Down
Loading
Loading