Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockManager): refactor and use state as single source of truth for height #847

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e36a9a5
removed pending block as submit to SL retries forever
mtsitrin May 5, 2024
0d4c641
removed BlockBatchSize and fix UT
mtsitrin May 5, 2024
5803cda
moved accumulated count to produce to be mutex protected
mtsitrin May 6, 2024
aef3bc4
refactored error handling
mtsitrin May 6, 2024
1e766ab
removed healthEvents from layers. set by manager on submission skew
mtsitrin May 6, 2024
8c4df90
cleanup
mtsitrin May 6, 2024
f3592e5
fixed defaults
mtsitrin May 6, 2024
6973282
fix UT
mtsitrin May 6, 2024
81baf80
changed accumaletd counter to be atomic
mtsitrin May 6, 2024
24d8b85
fixed UT
mtsitrin May 6, 2024
6c1741a
spelling, typo, format
danwt May 7, 2024
b1c0131
spelling
danwt May 7, 2024
9364c7c
feat: block progress to support ibc should be managed by produceloop …
mtsitrin May 8, 2024
9f76e0b
refactored the signaling
mtsitrin May 8, 2024
c7ffbf1
added ctx support for blocking signals
mtsitrin May 8, 2024
a501fa2
cleanup. comments
mtsitrin May 8, 2024
72a8893
moved indexers to own package
mtsitrin May 9, 2024
9815da3
renamed state and some struct fields
mtsitrin May 9, 2024
5c7c454
moved store back to store package
mtsitrin May 9, 2024
f53ddc4
moving height related managment to be based on State
mtsitrin May 9, 2024
8402076
fixed store pruning
mtsitrin May 9, 2024
90031e7
updated block manager to use state. rpc needs fix
mtsitrin May 9, 2024
d2be2ce
cleanup
mtsitrin May 9, 2024
a095024
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 12, 2024
d9698f8
simplified commit
mtsitrin May 12, 2024
051a4e7
moved gossip methods to gossip.go
mtsitrin May 12, 2024
cecf106
reverted executer key
mtsitrin May 12, 2024
b896eaa
fixed publishEvents
mtsitrin May 12, 2024
bb66c2c
removed unused fields. saving state post commit
mtsitrin May 12, 2024
efcf193
removed unused params. cleaned the state flow
mtsitrin May 12, 2024
3992f2a
fixed LastBlockHeight to be atomic
mtsitrin May 13, 2024
5b3cda0
avoid copying state and pass by reference
mtsitrin May 13, 2024
953a9b8
fixed PR comments
mtsitrin May 15, 2024
576325e
simplified genesis check on produce block
mtsitrin May 15, 2024
403ec9e
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
31c8906
pr comments
mtsitrin May 15, 2024
5ab2f15
Merge branch 'main' into mtsitrin/634-refactor-use-state-as-single-so…
mtsitrin May 15, 2024
74bfba0
linter
mtsitrin May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
reverted executer key
  • Loading branch information
mtsitrin committed May 12, 2024
commit cecf10615d0f4e15110b26dce525749fc33d487a
17 changes: 9 additions & 8 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

m.State = newState
dbBatch, err = m.Store.UpdateState(m.State, dbBatch)
dbBatch, err = m.Store.SaveState(m.State, dbBatch)
if err != nil {
dbBatch.Discard()
return fmt.Errorf("update state: %w", err)
Expand All @@ -79,15 +79,15 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Commit block to app
appHash, retainHeight, err := m.Executor.Commit(&newState, block, responses)
appHash, retainHeight, err := m.Executor.Commit(newState, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.SetABCICommitResult(responses, appHash, block.Header.Height)
_, err = m.Store.UpdateState(newState, nil)
m.Executor.UpdateStateFromCommitResponse(&newState, responses, appHash, block.Header.Height)
_, err = m.Store.SaveState(newState, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
}
Expand All @@ -100,12 +100,13 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
newState.BaseHeight = m.State.Base()
//TODO: update state
newState.BaseHeight = m.State.BaseHeight
_, err = m.Store.SaveState(newState, nil)
if err != nil {
return fmt.Errorf("final update state: %w", err)
}
}

m.State = newState

return nil
}

Expand Down
16 changes: 5 additions & 11 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"

libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto"

"github.com/dymensionxyz/dymint/mempool"
"github.com/dymensionxyz/dymint/types"
)
Expand All @@ -35,12 +33,7 @@ type Executor struct {

// NewExecutor creates new instance of BlockExecutor.
// Proposer address and namespace ID will be used in all newly created blocks.
func NewExecutor(proposerKey libp2pcrypto.PrivKey, namespaceID string, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger types.Logger) (*Executor, error) {
proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
}

func NewExecutor(proposerAddress []byte, namespaceID string, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger types.Logger) (*Executor, error) {
bytes, err := hex.DecodeString(namespaceID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -141,13 +134,14 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
}

// Commit commits the block
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error) {
func (e *Executor) Commit(state types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error) {
appHash, retainHeight, err := e.commit(state, block, resp.DeliverTxs)
if err != nil {
return nil, 0, err
}

err = e.publishEvents(resp, block, *state)
//FIXME: state is wrong here
err = e.publishEvents(resp, block, state)
if err != nil {
e.logger.Error("fire block events", "error", err)
return nil, 0, err
Expand All @@ -160,7 +154,7 @@ func (e *Executor) GetAppInfo() (*abci.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abci.RequestInfo{})
}

func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
func (e *Executor) commit(state types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
e.mempool.Lock()
defer e.mempool.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ func TestApplyBlock(t *testing.T) {
newState, err := executor.UpdateStateFromResponses(resp, state, block)
require.NoError(err)
require.NotNil(newState)
assert.Equal(1, newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
assert.Equal(uint64(1), newState.LastBlockHeight)
appHash, _, err := executor.Commit(newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
assert.Equal(mockAppHash, appHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)

// Create another block with multiple Tx from mempool
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(err)
require.NotNil(newState)
assert.Equal(uint64(2), newState.LastBlockHeight)
_, err = executor.Commit(&newState, block, resp)
_, _, err = executor.Commit(newState, block, resp)
require.NoError(err)

// wait for at least 4 Tx events, for up to 3 second.
Expand Down
76 changes: 76 additions & 0 deletions block/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package block

import (
"context"
"fmt"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.retrieverMutex.Lock() // needed to protect blockCache access
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
m.logger.Debug("Received new block via gossip", "height", block.Header.Height, "n cachedBlocks", len(m.blockCache))

nextHeight := m.State.NextHeight()
if block.Header.Height >= nextHeight {
m.blockCache[block.Header.Height] = CachedBlock{
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.State.Height())
}
m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying cached blocks", "err", err)
}
}

func (m *Manager) attemptApplyCachedBlocks() error {
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()

for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache[expectedHeight]
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
delete(m.blockCache, cachedBlock.Block.Header.Height)
/// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, blockMetaData{source: gossipedBlock})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Debug("applied cached block", "height", expectedHeight)

delete(m.blockCache, cachedBlock.Block.Header.Height)
}

return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep this in manager.go because it's used not only in gossip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where else it is used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in syncUntilTarget
you can check usages in IDE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved back


func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
// Although this boils down to publishing on a topic, we don't want to speculate too much on what
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}
return nil
}
4 changes: 2 additions & 2 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func (m *Manager) RunInitChain(ctx context.Context) error {

// update the state with only the consensus pubkey
m.Executor.UpdateStateAfterInitChain(&m.State, res, gensisValSet)
m.Executor.UpdateMempoolAfterInitChain(&m.State)
if _, err := m.Store.UpdateState(m.State, nil); err != nil {
m.Executor.UpdateMempoolAfterInitChain(m.State)
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

Expand Down
25 changes: 7 additions & 18 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ func NewManager(
p2pClient *p2p.Client,
logger types.Logger,
) (*Manager, error) {
exec, err := NewExecutor(proposerKey, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
proposerAddress, err := getAddress(proposerKey)
if err != nil {
return nil, err
}

exec, err := NewExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
if err != nil {
return nil, fmt.Errorf("create block executor: %w", err)
}
Expand Down Expand Up @@ -170,8 +175,7 @@ func (m *Manager) syncBlockManager() error {
res, err := m.SLClient.RetrieveBatch()
if errors.Is(err, gerr.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL. Start writing first batch.")
//FIXME: set correct syncTarget
m.logger.Info("No batches for chain found in SL.")
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
Expand All @@ -196,18 +200,3 @@ func (m *Manager) UpdateSyncParams(endHeight uint64) {
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
m.SyncTarget.Store(endHeight)
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger types.Logger) (s types.State, err error) {
s, err = store.LoadState()
if errors.Is(err, types.ErrNoStateFound) {
logger.Info("failed to find state in the store, creating new state from genesis")
s, err = types.NewFromGenesisDoc(genesis)
}

if err != nil {
return types.State{}, fmt.Errorf("get initial state: %w", err)
}

return s, nil
}
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestInitialState(t *testing.T) {
// Init empty store and full store
emptyStore := store.New(store.NewDefaultInMemoryKVStore())
fullStore := store.New(store.NewDefaultInMemoryKVStore())
_, err = fullStore.UpdateState(sampleState, nil)
_, err = fullStore.SaveState(sampleState, nil)
require.NoError(t, err)

// Init p2p client
Expand Down
17 changes: 6 additions & 11 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
m.logger.Debug("Started produce loop")

// Main ticker for block production
ticker := time.NewTicker(m.Conf.BlockTime)
defer ticker.Stop()

Expand Down Expand Up @@ -90,19 +89,14 @@ func (m *Manager) ProduceAndGossipBlock(ctx context.Context, allowEmpty bool) (*

func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, error) {
var (
lastCommit *types.Commit
lastHeaderHash [32]byte
newHeight uint64
err error
lastHeaderHash [32]byte
lastCommit = &types.Commit{}
newHeight = m.State.NextHeight()
)

if m.State.IsGenesis() {
newHeight = uint64(m.State.InitialHeight)
lastCommit = &types.Commit{}
m.State.BaseHeight = newHeight
} else {
height := m.State.Height()
newHeight = height + 1
if !m.State.IsGenesis() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this if will be entered basically 100% of the times thinking if it would be cleaner to have a produceGenesisBlock and have part of the produceBlock code reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored it a bit, take a look

height := newHeight - 1
danwt marked this conversation as resolved.
Show resolved Hide resolved
lastCommit, err = m.Store.LoadCommit(height)
if err != nil {
return nil, nil, fmt.Errorf("load commit: height: %d: %w: %w", height, err, ErrNonRecoverable)
Expand Down Expand Up @@ -190,6 +184,7 @@ func (m *Manager) createTMSignature(block *types.Block, proposerAddress []byte,
}
v := vote.ToProto()
// convert libp2p key to tm key
//TODO: move to types
raw_key, _ := m.ProposerKey.Raw()
tmprivkey := tmed25519.PrivKey(raw_key)
tmprivkey.PubKey().Bytes()
Expand Down
Loading