Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockManager): refactor and use state as single source of truth for height #847

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e36a9a5
removed pending block as submit to SL retries forever
mtsitrin May 5, 2024
0d4c641
removed BlockBatchSize and fix UT
mtsitrin May 5, 2024
5803cda
moved accumulated count to produce to be mutex protected
mtsitrin May 6, 2024
aef3bc4
refactored error handling
mtsitrin May 6, 2024
1e766ab
removed healthEvents from layers. set by manager on submission skew
mtsitrin May 6, 2024
8c4df90
cleanup
mtsitrin May 6, 2024
f3592e5
fixed defaults
mtsitrin May 6, 2024
6973282
fix UT
mtsitrin May 6, 2024
81baf80
changed accumaletd counter to be atomic
mtsitrin May 6, 2024
24d8b85
fixed UT
mtsitrin May 6, 2024
6c1741a
spelling, typo, format
danwt May 7, 2024
b1c0131
spelling
danwt May 7, 2024
9364c7c
feat: block progress to support ibc should be managed by produceloop …
mtsitrin May 8, 2024
9f76e0b
refactored the signaling
mtsitrin May 8, 2024
c7ffbf1
added ctx support for blocking signals
mtsitrin May 8, 2024
a501fa2
cleanup. comments
mtsitrin May 8, 2024
72a8893
moved indexers to own package
mtsitrin May 9, 2024
9815da3
renamed state and some struct fields
mtsitrin May 9, 2024
5c7c454
moved store back to store package
mtsitrin May 9, 2024
f53ddc4
moving height related managment to be based on State
mtsitrin May 9, 2024
8402076
fixed store pruning
mtsitrin May 9, 2024
90031e7
updated block manager to use state. rpc needs fix
mtsitrin May 9, 2024
d2be2ce
cleanup
mtsitrin May 9, 2024
a095024
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 12, 2024
d9698f8
simplified commit
mtsitrin May 12, 2024
051a4e7
moved gossip methods to gossip.go
mtsitrin May 12, 2024
cecf106
reverted executer key
mtsitrin May 12, 2024
b896eaa
fixed publishEvents
mtsitrin May 12, 2024
bb66c2c
removed unused fields. saving state post commit
mtsitrin May 12, 2024
efcf193
removed unused params. cleaned the state flow
mtsitrin May 12, 2024
3992f2a
fixed LastBlockHeight to be atomic
mtsitrin May 13, 2024
5b3cda0
avoid copying state and pass by reference
mtsitrin May 13, 2024
953a9b8
fixed PR comments
mtsitrin May 15, 2024
576325e
simplified genesis check on produce block
mtsitrin May 15, 2024
403ec9e
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
31c8906
pr comments
mtsitrin May 15, 2024
5ab2f15
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
74bfba0
linter
mtsitrin May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ proto/pb
.go-version
build

vendor/
vendor/
da/grpc/mockserv/db/
141 changes: 40 additions & 101 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package block

import (
"context"
"fmt"

errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
tmtypes "github.com/tendermint/tendermint/types"
)

// applyBlock applies the block to the store and the abci app.
// Contract: block and commit must be validated before calling this function!
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// steps: save block -> execute block with app -> update state -> commit block to app -> update state's height and commit result.
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.Store.NextHeight() {
if block.Header.Height != m.State.NextHeight() {
return types.ErrInvalidBlockHeight
}

Expand All @@ -35,6 +31,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
if isBlockAlreadyApplied {
// In this case, where the app was committed, but the state wasn't updated
// it will update the state from appInfo, saved responses and validators.
err := m.UpdateStateFromApp()
if err != nil {
return fmt.Errorf("update state from app: %w", err)
Expand All @@ -48,83 +46,82 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("save block: %w", err)
}

responses, err := m.Executor.ExecuteBlock(m.LastState, block)
responses, err := m.Executor.ExecuteBlock(m.State, block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}

newState, err := m.Executor.UpdateStateFromResponses(responses, m.LastState, block)
dbBatch := m.Store.NewBatch()
dbBatch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, dbBatch)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

batch := m.Store.NewBatch()

batch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
dbBatch.Discard()
return fmt.Errorf("save block responses: %w", err)
}

m.LastState = newState
batch, err = m.Store.UpdateState(m.LastState, batch)
// Get the validator changes from the app
validators, err := m.Executor.NextValSetFromResponses(m.State, responses, block)
if err != nil {
batch.Discard()
return fmt.Errorf("update state: %w", err)
return fmt.Errorf("update state from responses: %w", err)
}
batch, err = m.Store.SaveValidators(block.Header.Height, m.LastState.Validators, batch)

dbBatch, err = m.Store.SaveValidators(block.Header.Height, validators, dbBatch)
if err != nil {
batch.Discard()
dbBatch.Discard()
return fmt.Errorf("save validators: %w", err)
}

err = batch.Commit()
err = dbBatch.Commit()
if err != nil {
return fmt.Errorf("commit batch to disk: %w", err)
}

// Commit block to app
retainHeight, err := m.Executor.Commit(&newState, block, responses)
appHash, retainHeight, err := m.Executor.Commit(m.State, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

// If failed here, after the app committed, but before the state is updated, we'll update the state on
// UpdateStateFromApp using the saved responses and validators.

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, validators)
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
_, err := m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
return nil
}

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.LastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.Store.Base()

_, err = m.Store.UpdateState(newState, nil)
// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return fmt.Errorf("final update state: %w", err)
return false, errorsmod.Wrap(err, "get app info")
}
m.LastState = newState

if ok := m.Store.SetHeight(block.Header.Height); !ok {
return fmt.Errorf("store set height: %d", block.Header.Height)
}
isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight

return nil
// TODO: add switch case to validate better the current app state

return isBlockAlreadyApplied, nil
}

// TODO: move to gossip.go
func (m *Manager) attemptApplyCachedBlocks() error {
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.Store.NextHeight()
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
Expand All @@ -148,68 +145,10 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return nil
}

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return false, errorsmod.Wrap(err, "get app info")
}

isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight

// TODO: add switch case to validate better the current app state

return isBlockAlreadyApplied, nil
}

// UpdateStateFromApp is responsible for aligning the state of the store from the abci app
func (m *Manager) UpdateStateFromApp() error {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)

// update the state with the hash, last store height and last validators.
m.LastState.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.LastState.LastStoreHeight = appHeight
m.LastState.LastValidators = m.LastState.Validators.Copy()

resp, err := m.Store.LoadBlockResponses(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}
copy(m.LastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.Store.UpdateState(m.LastState, nil)
if err != nil {
return errorsmod.Wrap(err, "update state")
}
if ok := m.Store.SetHeight(appHeight); !ok {
return fmt.Errorf("store set height: %d", appHeight)
}
return nil
}

func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.SLClient.GetProposer()

return types.ValidateProposedTransition(m.LastState, block, commit, proposer)
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
// Although this boils down to publishing on a topic, we don't want to speculate too much on what
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}
return nil
return types.ValidateProposedTransition(m.State, block, commit, proposer)
}
20 changes: 8 additions & 12 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state types.State, maxBytes uint64) *types.Block {
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state *types.State, maxBytes uint64) *types.Block {
if state.ConsensusParams.Block.MaxBytes > 0 {
maxBytes = min(maxBytes, uint64(state.ConsensusParams.Block.MaxBytes))
}
Expand Down Expand Up @@ -134,21 +134,18 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
}

// Commit commits the block
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error) {
appHash, retainHeight, err := e.commit(state, block, resp.DeliverTxs)
if err != nil {
return 0, err
return nil, 0, err
}

copy(state.AppHash[:], appHash[:])
copy(state.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

err = e.publishEvents(resp, block, *state)
err = e.publishEvents(resp, block)
if err != nil {
e.logger.Error("fire block events", "error", err)
return 0, err
return nil, 0, err
}
return retainHeight, nil
return appHash, retainHeight, nil
}

// GetAppInfo returns the latest AppInfo from the proxyApp.
Expand Down Expand Up @@ -183,7 +180,7 @@ func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*
}

// ExecuteBlock executes the block and returns the ABCIResponses. Block should be valid (passed validation checks).
func (e *Executor) ExecuteBlock(state types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
abciResponses := new(tmstate.ABCIResponses)
abciResponses.DeliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))

Expand Down Expand Up @@ -252,13 +249,12 @@ func (e *Executor) getDataHash(block *types.Block) []byte {
return abciData.Hash()
}

func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block, state types.State) error {
func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block) error {
if e.eventBus == nil {
return nil
}

abciBlock, err := types.ToABCIBlock(block)
abciBlock.Header.ValidatorsHash = state.Validators.Hash()
if err != nil {
return err
}
Expand Down
32 changes: 14 additions & 18 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCreateBlock(t *testing.T) {

maxBytes := uint64(100)

state := types.State{}
state := &types.State{}
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.Validators = tmtypes.NewValidatorSet(nil)
Expand Down Expand Up @@ -140,13 +140,12 @@ func TestApplyBlock(t *testing.T) {
require.NotNil(headerSub)

// Init state
state := types.State{
state := &types.State{
NextValidators: tmtypes.NewValidatorSet(nil),
Validators: tmtypes.NewValidatorSet(nil),
LastValidators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight = 0
state.LastBlockHeight.Store(0)
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
Expand Down Expand Up @@ -182,21 +181,18 @@ func TestApplyBlock(t *testing.T) {
resp, err := executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err := executor.UpdateStateFromResponses(resp, state, block)
appHash, _, err := executor.Commit(state, block, resp)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(1), newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, state.Validators)
assert.Equal(uint64(1), state.Height())
assert.Equal(mockAppHash, state.AppHash)

// Create another block with multiple Tx from mempool
require.NoError(mpool.CheckTx([]byte{0, 1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 90), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, [32]byte{}, newState, maxBytes)
block = executor.CreateBlock(2, commit, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand All @@ -217,7 +213,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = types.ValidateProposedTransition(newState, block, invalidCommit, proposer)
err = types.ValidateProposedTransition(state, block, invalidCommit, proposer)

require.ErrorIs(err, types.ErrInvalidSignature)

Expand All @@ -231,17 +227,17 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(newState, block, commit, proposer)
err = types.ValidateProposedTransition(state, block, commit, proposer)
require.NoError(err)
resp, err = executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err = executor.UpdateStateFromResponses(resp, state, block)
vals, err := executor.NextValSetFromResponses(state, resp, block)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(2), newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
_, _, err = executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, vals)
assert.Equal(uint64(2), state.Height())

// wait for at least 4 Tx events, for up to 3 second.
// 3 seconds is a fail-scenario only
Expand Down
Loading
Loading