Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix block queue edge case #5753

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion engine/execution/ingestion/block_queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,52 @@ func (q *BlockQueue) HandleBlock(block *flow.Block, parentFinalState *flow.State
// check if the block already exists
blockID := block.ID()
executable, ok := q.blocks[blockID]

q.log.Debug().
Str("blockID", blockID.String()).
Uint64("height", block.Header.Height).
Bool("parent executed", parentFinalState != nil).
Msg("handle block")

if ok {
// handle the case where the block has seen before
return q.handleKnownBlock(executable, parentFinalState)
}

// handling a new block

_, parentExists := q.blocks[block.Header.ParentID]
// if parentFinalState is not provided, then its parent block must exists in the queue
// otherwise it's an exception
if parentFinalState == nil {
_, parentExists := q.blocks[block.Header.ParentID]
if !parentExists {
return nil, nil,
fmt.Errorf("block %s has no parent commitment, but its parent block %s does not exist in the queue: %w",
blockID, block.Header.ParentID, ErrMissingParent)
}
} else {
if parentExists {
// this is an edge case where A <- B, and B is received with A's final state, however,
// A is not executed yet.
// the reason this could happen is that there is a race condition in `OnBlockExecuted` and
// `HandleBlock`, when A's execution result (which contains the final state) has been
// saved to database, and B is received before `blockQueue.OnBlockExecuted(A)` is called, so
// `blockQueue.HandleBlock(B, A's final state)` will be called, which run into this case.
// In this case, if we consider A is executed, then return B as executables, and later
// when `OnBlockExecuted(A)` is called, it will return B as executables again, which
// will cause B to be executed twice.
// In order to prevent B to be executed twice, we will simply ignore A's final state,
// as if its parent has not been executed yet, then B is not executable. And when `OnBlockExecuted(A)`
// is called, it will return B as executables, so that both A and B will be executed only once.
// See test case: TestHandleBlockChildCalledBeforeOnBlockExecutedParent
q.log.Warn().
Str("blockID", blockID.String()).
Uint64("height", block.Header.Height).
Msgf("edge case: receiving block with parent commitment, but its parent block %s still exists",
block.Header.ParentID)

parentFinalState = nil
}
}

executable = &entity.ExecutableBlock{
Expand Down Expand Up @@ -226,6 +256,11 @@ func (q *BlockQueue) OnBlockExecuted(
q.Lock()
defer q.Unlock()

q.log.Debug().
Str("blockID", blockID.String()).
Hex("commit", commit[:]).
Msg("block executed")

return q.onBlockExecuted(blockID, commit)
}

Expand All @@ -244,6 +279,12 @@ func (q *BlockQueue) handleKnownBlock(executable *entity.ExecutableBlock, parent
// in this case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState).
// there is no need to create the executable block again, since it's already created.
if executable.StartState == nil && parentFinalState != nil {
q.log.Warn().
Str("blockID", executable.ID().String()).
Uint64("height", executable.Block.Header.Height).
Hex("parentID", executable.Block.Header.ParentID[:]).
Msg("edge case: receiving block with no parent commitment, but its parent block actually has been executed")

executables, err := q.onBlockExecuted(executable.Block.Header.ParentID, *parentFinalState)
if err != nil {
return nil, nil, fmt.Errorf("receiving block %v with parent commitment %v, but parent block %v already exists with no commitment, fail to call mark parent as executed: %w",
Expand Down
41 changes: 41 additions & 0 deletions engine/execution/ingestion/block_queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ func TestSingleBlockBecomeReady(t *testing.T) {
requireQueueIsEmpty(t, q)
}

func TestHandleBlockChildCalledBeforeOnBlockExecutedParent(t *testing.T) {
t.Parallel()
// Given a chain
// R <- A(C1) <- B(C2,C3) <- C() <- D()
// - ^------- E(C4,C5) <- F(C6)
// - ^-----------G()
block, _, commitFor := makeChainABCDEFG()
// take block C and D
blockC, blockD := block("C"), block("D")

q := NewBlockQueue(unittest.Logger())

// Given block B has been executed, and block C is received,
// block C becomes executable
missing, executables, err := q.HandleBlock(blockC, commitFor("B"))
require.NoError(t, err)
requireExecutableHas(t, executables, blockC)
require.Empty(t, missing)

// Now we received blockD, with block C's commit, however,
// the block queue state shows block D's parent (C) has not been executed yet,
// because OnBlockExecuted(C) is not called.
// In this case, we will ignore block C's commit, as if block C has not
// been executed yet.
missing, executables, err = q.HandleBlock(blockD, commitFor("C"))
require.NoError(t, err)
requireExecutableHas(t, executables)
require.Empty(t, missing)

// later block C is executed, which will make block D to be executable
executables, err = q.OnBlockExecuted(blockC.ID(), *commitFor("C"))
require.NoError(t, err)
requireExecutableHas(t, executables, blockD)

// once block D is executed, the queue should be empty
executables, err = q.OnBlockExecuted(blockD.ID(), *commitFor("D"))
require.NoError(t, err)
requireExecutableHas(t, executables)
requireQueueIsEmpty(t, q)
}

func TestMultipleBlockBecomesReady(t *testing.T) {
t.Parallel()
// Given a chain
Expand Down
6 changes: 6 additions & 0 deletions engine/execution/ingestion/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (e *Core) onProcessableBlock(blockID flow.Identifier) error {
return fmt.Errorf("failed to enqueue block %v: %w", blockID, err)
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently block is executable")
e.executeConcurrently(executables)

err = e.fetch(missingColls)
Expand Down Expand Up @@ -304,6 +305,8 @@ func (e *Core) onBlockExecuted(
return fmt.Errorf("cannot persist execution state: %w", err)
}

e.log.Debug().Uint64("height", block.Block.Header.Height).Msgf("execution state saved")

// must call OnBlockExecuted AFTER saving the execution result to storage
// because when enqueuing a block, we rely on execState.StateCommitmentByBlockID
// to determine whether a block has been executed or not.
Expand Down Expand Up @@ -340,6 +343,8 @@ func (e *Core) onBlockExecuted(
// its parent block has been successfully saved to storage.
// this ensures OnBlockExecuted would not be called with blocks in a wrong order, such as
// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock).

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: parent block is executed")
e.executeConcurrently(executables)

return nil
Expand All @@ -365,6 +370,7 @@ func (e *Core) onCollection(col *flow.Collection) error {
return fmt.Errorf("unexpected error while adding collection to block queue")
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block")
e.executeConcurrently(executables)

return nil
Expand Down
Loading