Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockManager): refactor and use state as single source of truth for height #847

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e36a9a5
removed pending block as submit to SL retries forever
mtsitrin May 5, 2024
0d4c641
removed BlockBatchSize and fix UT
mtsitrin May 5, 2024
5803cda
moved accumulated count to produce to be mutex protected
mtsitrin May 6, 2024
aef3bc4
refactored error handling
mtsitrin May 6, 2024
1e766ab
removed healthEvents from layers. set by manager on submission skew
mtsitrin May 6, 2024
8c4df90
cleanup
mtsitrin May 6, 2024
f3592e5
fixed defaults
mtsitrin May 6, 2024
6973282
fix UT
mtsitrin May 6, 2024
81baf80
changed accumaletd counter to be atomic
mtsitrin May 6, 2024
24d8b85
fixed UT
mtsitrin May 6, 2024
6c1741a
spelling, typo, format
danwt May 7, 2024
b1c0131
spelling
danwt May 7, 2024
9364c7c
feat: block progress to support ibc should be managed by produceloop …
mtsitrin May 8, 2024
9f76e0b
refactored the signaling
mtsitrin May 8, 2024
c7ffbf1
added ctx support for blocking signals
mtsitrin May 8, 2024
a501fa2
cleanup. comments
mtsitrin May 8, 2024
72a8893
moved indexers to own package
mtsitrin May 9, 2024
9815da3
renamed state and some struct fields
mtsitrin May 9, 2024
5c7c454
moved store back to store package
mtsitrin May 9, 2024
f53ddc4
moving height related managment to be based on State
mtsitrin May 9, 2024
8402076
fixed store pruning
mtsitrin May 9, 2024
90031e7
updated block manager to use state. rpc needs fix
mtsitrin May 9, 2024
d2be2ce
cleanup
mtsitrin May 9, 2024
a095024
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 12, 2024
d9698f8
simplified commit
mtsitrin May 12, 2024
051a4e7
moved gossip methods to gossip.go
mtsitrin May 12, 2024
cecf106
reverted executer key
mtsitrin May 12, 2024
b896eaa
fixed publishEvents
mtsitrin May 12, 2024
bb66c2c
removed unused fields. saving state post commit
mtsitrin May 12, 2024
efcf193
removed unused params. cleaned the state flow
mtsitrin May 12, 2024
3992f2a
fixed LastBlockHeight to be atomic
mtsitrin May 13, 2024
5b3cda0
avoid copying state and pass by reference
mtsitrin May 13, 2024
953a9b8
fixed PR comments
mtsitrin May 15, 2024
576325e
simplified genesis check on produce block
mtsitrin May 15, 2024
403ec9e
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
31c8906
pr comments
mtsitrin May 15, 2024
5ab2f15
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
74bfba0
linter
mtsitrin May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
removed unused params. cleaned the state flow
  • Loading branch information
mtsitrin committed May 12, 2024
commit efcf1931a72e2a0a2a61402b1b97707e8ae0bbfb
35 changes: 17 additions & 18 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
if isBlockAlreadyApplied {
// In this case, where the app was committed, but the state wasn't updated
// it will update the state from appInfo, saved responses and validators.
err := m.UpdateStateFromApp()
if err != nil {
return fmt.Errorf("update state from app: %w", err)
Expand All @@ -49,20 +51,20 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("execute block: %w", err)
}

// Updates the state with validator changes and consensus params changes from the app
err = m.Executor.UpdateStateFromResponses(&m.State, responses, block)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

dbBatch := m.Store.NewBatch()
dbBatch, err = m.Store.SaveBlockResponses(block.Header.Height, responses, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save block responses: %w", err)
}

dbBatch, err = m.Store.SaveValidators(block.Header.Height, m.State.Validators, dbBatch)
// Updates the state with validator changes and consensus params changes from the app
validators, err := m.Executor.NextValSetFromResponses(m.State, responses, block)
if err != nil {
return fmt.Errorf("update state from responses: %w", err)
}

dbBatch, err = m.Store.SaveValidators(block.Header.Height, validators, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("save validators: %w", err)
Expand All @@ -79,26 +81,23 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("commit block: %w", err)
}

// If failed here, after the app committed, but before the state is updated, we'll update the state on
// UpdateStateFromApp using the saved responses and validators.

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateFromCommitResponse(&m.State, responses, appHash, block.Header.Height)
_, err = m.Store.SaveState(m.State, nil)
newState := m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, validators)
_, err = m.Store.SaveState(newState, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
return fmt.Errorf("update state: %w", err)
}
m.State = newState

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(uint64(retainHeight))
_, err := m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
m.State.BaseHeight = m.State.BaseHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
}
}
return nil
Expand Down
15 changes: 6 additions & 9 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func TestApplyBlock(t *testing.T) {
state := types.State{
NextValidators: tmtypes.NewValidatorSet(nil),
Validators: tmtypes.NewValidatorSet(nil),
LastValidators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight = 0
Expand Down Expand Up @@ -182,13 +181,10 @@ func TestApplyBlock(t *testing.T) {
resp, err := executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
err = executor.UpdateStateFromResponses(&state, resp, block)
require.NoError(err)
require.NotNil(state)
assert.Equal(uint64(1), state.LastBlockHeight)
appHash, _, err := executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateFromCommitResponse(&state, resp, appHash, block.Header.Height)
state = executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, state.Validators)
assert.Equal(uint64(1), state.Height())
assert.Equal(mockAppHash, state.AppHash)

// Create another block with multiple Tx from mempool
Expand Down Expand Up @@ -236,12 +232,13 @@ func TestApplyBlock(t *testing.T) {
resp, err = executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
err = executor.UpdateStateFromResponses(&state, resp, block)
vals, err := executor.NextValSetFromResponses(state, resp, block)
require.NoError(err)
require.NotNil(state)
assert.Equal(uint64(2), state.LastBlockHeight)
_, _, err = executor.Commit(state, block, resp)
require.NoError(err)
state = executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, vals)

assert.Equal(uint64(2), state.Height())

// wait for at least 4 Tx events, for up to 3 second.
// 3 seconds is a fail-scenario only
Expand Down
112 changes: 54 additions & 58 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,17 @@ func TestProducePendingBlock(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Generate block and commit and save it to the store
blocks, err := testutil.GenerateBlocks(1, 1, manager.ProposerKey)
require.NoError(t, err)
block := blocks[0]
block := testutil.GetRandomBlock(1, 3)
_, err = manager.Store.SaveBlock(block, &block.LastCommit, nil)
require.NoError(t, err)
// Produce block
_, _, err = manager.ProduceAndGossipBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the block that was saved in the store
assert.Equal(t, block.Header.Hash(), *(*[32]byte)(manager.State.LastBlockID.Hash))

//TODO: fix this test
//hacky way to validate the block was indeed contain txs
assert.NotEqual(t, manager.State.LastResultsHash, testutil.GetEmptyLastResultsHash())
}

// Test that in case we fail after the proxy app commit, next time we won't commit again to the proxy app
Expand All @@ -223,6 +224,7 @@ func TestProducePendingBlock(t *testing.T) {
// 5. Produce third block successfully
func TestProduceBlockFailAfterCommit(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
// Setup app
app := testutil.GetAppMock(testutil.Info, testutil.Commit)
// Create proxy app
Expand All @@ -237,64 +239,60 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
require.NoError(err)

cases := []struct {
name string
shouldFailSetSetHeight bool
shouldFailUpdateState bool
LastAppBlockHeight int64
AppCommitHash [32]byte
LastAppCommitHash [32]byte
expectedStoreHeight uint64
expectedStateAppHash [32]byte
name string
shoudFailSaveState bool
LastAppBlockHeight int64
AppCommitHash [32]byte
LastAppCommitHash [32]byte
expectedStoreHeight uint64
expectedStateAppHash [32]byte
}{
{
name: "ProduceFirstBlockSuccessfully",
shouldFailSetSetHeight: false,
shouldFailUpdateState: false,
AppCommitHash: [32]byte{1},
LastAppCommitHash: [32]byte{0},
LastAppBlockHeight: 0,
expectedStoreHeight: 1,
expectedStateAppHash: [32]byte{1},
name: "ProduceFirstBlockSuccessfully",
shoudFailSaveState: false,
AppCommitHash: [32]byte{1},
// LastAppCommitHash: [32]byte{0},
// LastAppBlockHeight: 0,
expectedStoreHeight: 1,
expectedStateAppHash: [32]byte{1},
},
{
name: "ProduceSecondBlockFailOnUpdateState",
shouldFailSetSetHeight: false,
shouldFailUpdateState: true,
AppCommitHash: [32]byte{2},
LastAppCommitHash: [32]byte{},
LastAppBlockHeight: 0,
expectedStoreHeight: 1,
expectedStateAppHash: [32]byte{1},
name: "ProduceSecondBlockFailOnUpdateState",
shoudFailSaveState: true,
AppCommitHash: [32]byte{2},
// LastAppCommitHash: [32]byte{},
// LastAppBlockHeight: 0,
// state not changed on failed save state
expectedStoreHeight: 1,
expectedStateAppHash: [32]byte{1},
},
{
name: "ProduceSecondBlockSuccessfully",
shouldFailSetSetHeight: false,
shouldFailUpdateState: false,
AppCommitHash: [32]byte{},
LastAppCommitHash: [32]byte{2},
LastAppBlockHeight: 2,
expectedStoreHeight: 2,
expectedStateAppHash: [32]byte{2},
name: "ProduceSecondBlockSuccessfullyFromApp",
shoudFailSaveState: false,
// AppCommitHash: [32]byte{},
// expected return from app
LastAppCommitHash: [32]byte{2},
LastAppBlockHeight: 2,
expectedStoreHeight: 2,
expectedStateAppHash: [32]byte{2},
},
{
name: "ProduceThirdBlockFailOnUpdateStoreHeight",
shouldFailSetSetHeight: true,
shouldFailUpdateState: false,
AppCommitHash: [32]byte{3},
LastAppCommitHash: [32]byte{2},
LastAppBlockHeight: 2,
expectedStoreHeight: 2,
expectedStateAppHash: [32]byte{3},
name: "ProduceThirdBlockFailOnUpdateStoreHeight",
shoudFailSaveState: true,
AppCommitHash: [32]byte{3},
// LastAppCommitHash: [32]byte{2},
// LastAppBlockHeight: 2,
expectedStoreHeight: 2,
expectedStateAppHash: [32]byte{2},
},
{
name: "ProduceThirdBlockSuccessfully",
shouldFailSetSetHeight: false,
shouldFailUpdateState: false,
AppCommitHash: [32]byte{},
LastAppCommitHash: [32]byte{3},
LastAppBlockHeight: 3,
expectedStoreHeight: 3,
expectedStateAppHash: [32]byte{3},
name: "ProduceThirdBlockSuccessfully",
shoudFailSaveState: false,
AppCommitHash: [32]byte{},
LastAppCommitHash: [32]byte{3},
LastAppBlockHeight: 3,
expectedStoreHeight: 3,
expectedStateAppHash: [32]byte{3},
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
},
}
for _, tc := range cases {
Expand All @@ -304,14 +302,12 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:],
})
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
mockStore.ShoudFailSaveState = tc.shoudFailSaveState
_, _, _ = manager.ProduceAndGossipBlock(context.Background(), true)
require.Equal(tc.expectedStoreHeight, manager.State.Height(), tc.name)
require.Equal(tc.expectedStateAppHash, manager.State.AppHash, tc.name)
storeState, err := manager.Store.LoadState()
require.NoError(err)
require.Equal(tc.expectedStateAppHash, storeState.AppHash, tc.name)
assert.NoError(err)
assert.Equal(tc.expectedStoreHeight, storeState.Height(), tc.name)
assert.Equal(tc.expectedStateAppHash, storeState.AppHash, tc.name)

app.On("Commit", mock.Anything).Unset()
app.On("Info", mock.Anything).Unset()
Expand Down
9 changes: 9 additions & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,14 @@ func (m *Manager) pruneBlocks(retainHeight uint64) (uint64, error) {

// TODO: prune state/indexer and state/txindexer??

newState := m.State
newState.BaseHeight = retainHeight
_, err = m.Store.SaveState(newState, nil)
if err != nil {
return 0, fmt.Errorf("final update state: %w", err)
danwt marked this conversation as resolved.
Show resolved Hide resolved
}
m.State = newState

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return pruned, nil
}
72 changes: 31 additions & 41 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,18 @@ func (m *Manager) UpdateStateFromApp() error {
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}

vals, err := m.Store.LoadValidators(appHeight)
if err != nil {
return errorsmod.Wrap(err, "load block responses")
}

// update the state with the hash, last store height and last validators.
//TODO: DRY with the post commit update
m.State.AppHash = *(*[32]byte)(proxyAppInfo.LastBlockAppHash)
m.State.Validators = m.State.NextValidators.Copy()
m.State.NextValidators = vals
copy(m.State.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
m.State.SetHeight(appHeight)

//FIXME: load consensus params

_, err = m.Store.SaveState(m.State, nil)
state := m.Executor.UpdateStateAfterCommit(m.State, resp, proxyAppInfo.LastBlockAppHash, appHeight, vals)
_, err = m.Store.SaveState(state, nil)
if err != nil {
return errorsmod.Wrap(err, "update state")
}
m.State = state
return nil
}

Expand Down Expand Up @@ -113,44 +105,42 @@ func (e *Executor) UpdateMempoolAfterInitChain(s types.State) {
e.mempool.SetPostCheckFn(mempool.PostCheckMaxGas(s.ConsensusParams.Block.MaxGas))
}

// UpdateStateFromResponses updates state based on the ABCIResponses.
func (e *Executor) UpdateStateFromResponses(state *types.State, resp *tmstate.ABCIResponses, block *types.Block) error {
// NextValSetFromResponses updates state based on the ABCIResponses.
func (e *Executor) NextValSetFromResponses(state types.State, resp *tmstate.ABCIResponses, block *types.Block) (*tmtypes.ValidatorSet, error) {
// Dymint ignores any setValidator responses from the app, as it is manages the validator set based on the settlement consensus
// TODO: this will be changed when supporting multiple sequencers from the hub
validatorUpdates := []*tmtypes.Validator{}

if state.ConsensusParams.Block.MaxBytes == 0 {
e.logger.Error("maxBytes=0", "state.ConsensusParams.Block", state.ConsensusParams.Block)
}

nValSet := state.NextValidators.Copy()
lastHeightValSetChanged := state.LastHeightValidatorsChanged
// Dymint can work without validators
if len(nValSet.Validators) > 0 {
if len(validatorUpdates) > 0 {
err := nValSet.UpdateWithChangeSet(validatorUpdates)
if err != nil {
return err
return state.NextValidators.Copy(), nil

/*
nValSet := state.NextValidators.Copy()
lastHeightValSetChanged := state.LastHeightValidatorsChanged
// Dymint can work without validators
if len(nValSet.Validators) > 0 {
if len(validatorUpdates) > 0 {
err := nValSet.UpdateWithChangeSet(validatorUpdates)
if err != nil {
return err
}
// Change results from this height but only applies to the next next height.
lastHeightValSetChanged = int64(block.Header.Height + 1 + 1)
}
// Change results from this height but only applies to the next next height.
lastHeightValSetChanged = int64(block.Header.Height + 1 + 1)
}

// TODO(tzdybal): right now, it's for backward compatibility, may need to change this
nValSet.IncrementProposerPriority(1)
}

state.Validators = state.NextValidators.Copy()
state.NextValidators = nValSet
state.LastHeightValidatorsChanged = lastHeightValSetChanged

return nil
// TODO(tzdybal): right now, it's for backward compatibility, may need to change this
nValSet.IncrementProposerPriority(1)
}
*/
}

// Update state from Commit response
func (e *Executor) UpdateStateFromCommitResponse(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64) {
// validators already set on UpdateStateFromResponses
func (e *Executor) UpdateStateAfterCommit(s types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, valSet *tmtypes.ValidatorSet) types.State {
copy(s.AppHash[:], appHash[:])
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

//TODO: load consensus params from endblock?

s.Validators = s.NextValidators.Copy()
s.NextValidators = valSet.Copy()

s.SetHeight(height)
return s
}
Loading
Loading