Skip to content

Commit

Permalink
fix block queue edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Apr 24, 2024
1 parent eff9395 commit b9679c8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 1 deletion.
43 changes: 42 additions & 1 deletion engine/execution/ingestion/block_queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,52 @@ func (q *BlockQueue) HandleBlock(block *flow.Block, parentFinalState *flow.State
// check if the block already exists
blockID := block.ID()
executable, ok := q.blocks[blockID]

q.log.Debug().
Str("blockID", blockID.String()).
Uint64("height", block.Header.Height).
Bool("parent executed", parentFinalState != nil).
Msg("handle block")

if ok {
// handle the case where the block has seen before
return q.handleKnownBlock(executable, parentFinalState)
}

// handling a new block

_, parentExists := q.blocks[block.Header.ParentID]
// if parentFinalState is not provided, then its parent block must exists in the queue
// otherwise it's an exception
if parentFinalState == nil {
_, parentExists := q.blocks[block.Header.ParentID]
if !parentExists {
return nil, nil,
fmt.Errorf("block %s has no parent commitment, but its parent block %s does not exist in the queue: %w",
blockID, block.Header.ParentID, ErrMissingParent)
}
} else {
if parentExists {
// this is an edge case where A <- B, and B is received with A's final state, however,
// A is not executed yet.
// the reason this could happen is that there is a race condition in `OnBlockExecuted` and
// `HandleBlock`, when A's execution result (which contains the final state) has been
// saved to database, and B is received before `blockQueue.OnBlockExecuted(A)` is called, so
// `blockQueue.HandleBlock(B, A's final state)` will be called, which run into this case.
// In this case, if we consider A is executed, then return B as executables, and later
// when `OnBlockExecuted(A)` is called, it will return B as executables again, which
// will cause B to be executed twice.
// In order to prevent B to be executed twice, we will simply ignore A's final state,
// as if its parent has not been executed yet, then B is not executable. And when `OnBlockExecuted(A)`
// is called, it will return B as executables, so that both A and B will be executed only once.
// See test case: TestHandleBlockChildCalledBeforeOnBlockExecutedParent
q.log.Warn().
Str("blockID", blockID.String()).
Uint64("height", block.Header.Height).
Msgf("edge case: receiving block with parent commitment, but its parent block %s still exists",
block.Header.ParentID)

parentFinalState = nil
}
}

executable = &entity.ExecutableBlock{
Expand Down Expand Up @@ -226,6 +256,11 @@ func (q *BlockQueue) OnBlockExecuted(
q.Lock()
defer q.Unlock()

q.log.Debug().
Str("blockID", blockID.String()).
Hex("commit", commit[:]).
Msg("block executed")

return q.onBlockExecuted(blockID, commit)
}

Expand All @@ -244,6 +279,12 @@ func (q *BlockQueue) handleKnownBlock(executable *entity.ExecutableBlock, parent
// in this case, we will internally call OnBlockExecuted(parentBlockID, parentFinalState).
// there is no need to create the executable block again, since it's already created.
if executable.StartState == nil && parentFinalState != nil {
q.log.Warn().
Str("blockID", executable.ID().String()).
Uint64("height", executable.Block.Header.Height).
Hex("parentID", executable.Block.Header.ParentID[:]).
Msg("edge case: receiving block with no parent commitment, but its parent block actually has been executed")

executables, err := q.onBlockExecuted(executable.Block.Header.ParentID, *parentFinalState)
if err != nil {
return nil, nil, fmt.Errorf("receiving block %v with parent commitment %v, but parent block %v already exists with no commitment, fail to call mark parent as executed: %w",
Expand Down
41 changes: 41 additions & 0 deletions engine/execution/ingestion/block_queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,47 @@ func TestSingleBlockBecomeReady(t *testing.T) {
requireQueueIsEmpty(t, q)
}

func TestHandleBlockChildCalledBeforeOnBlockExecutedParent(t *testing.T) {
t.Parallel()
// Given a chain
// R <- A(C1) <- B(C2,C3) <- C() <- D()
// - ^------- E(C4,C5) <- F(C6)
// - ^-----------G()
block, _, commitFor := makeChainABCDEFG()
// take block C and D
blockC, blockD := block("C"), block("D")

q := NewBlockQueue(unittest.Logger())

// Given block B has been executed, and block C is received,
// block C becomes executable
missing, executables, err := q.HandleBlock(blockC, commitFor("B"))
require.NoError(t, err)
requireExecutableHas(t, executables, blockC)
require.Empty(t, missing)

// Now we received blockD, with block C's commit, however,
// the block queue state shows block D's parent (C) has not been executed yet,
// because OnBlockExecuted(C) is not called.
// In this case, we will ignore block C's commit, as if block C has not
// been executed yet.
missing, executables, err = q.HandleBlock(blockD, commitFor("C"))
require.NoError(t, err)
requireExecutableHas(t, executables)
require.Empty(t, missing)

// later block C is executed, which will make block D to be executable
executables, err = q.OnBlockExecuted(blockC.ID(), *commitFor("C"))
require.NoError(t, err)
requireExecutableHas(t, executables, blockD)

// once block D is executed, the queue should be empty
executables, err = q.OnBlockExecuted(blockD.ID(), *commitFor("D"))
require.NoError(t, err)
requireExecutableHas(t, executables)
requireQueueIsEmpty(t, q)
}

func TestMultipleBlockBecomesReady(t *testing.T) {
t.Parallel()
// Given a chain
Expand Down
6 changes: 6 additions & 0 deletions engine/execution/ingestion/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (e *Core) onProcessableBlock(blockID flow.Identifier) error {
return fmt.Errorf("failed to enqueue block %v: %w", blockID, err)
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently block is executable")
e.executeConcurrently(executables)

err = e.fetch(missingColls)
Expand Down Expand Up @@ -304,6 +305,8 @@ func (e *Core) onBlockExecuted(
return fmt.Errorf("cannot persist execution state: %w", err)
}

e.log.Debug().Uint64("height", block.Block.Header.Height).Msgf("execution state saved")

// must call OnBlockExecuted AFTER saving the execution result to storage
// because when enqueuing a block, we rely on execState.StateCommitmentByBlockID
// to determine whether a block has been executed or not.
Expand Down Expand Up @@ -340,6 +343,8 @@ func (e *Core) onBlockExecuted(
// its parent block has been successfully saved to storage.
// this ensures OnBlockExecuted would not be called with blocks in a wrong order, such as
// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock).

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: parent block is executed")
e.executeConcurrently(executables)

return nil
Expand All @@ -365,6 +370,7 @@ func (e *Core) onCollection(col *flow.Collection) error {
return fmt.Errorf("unexpected error while adding collection to block queue")
}

e.log.Debug().Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block")
e.executeConcurrently(executables)

return nil
Expand Down

0 comments on commit b9679c8

Please sign in to comment.