Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt committed Apr 15, 2024
2 parents a5d42d1 + b4921cb commit ce1a88a
Show file tree
Hide file tree
Showing 83 changed files with 369 additions and 405 deletions.
25 changes: 12 additions & 13 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
if err != nil {
m.logger.Error("Failed to save block", "error", err)
m.logger.Error("save block", "error", err)
return err
}

responses, err := m.executeBlock(ctx, block, commit)
if err != nil {
m.logger.Error("Failed to execute block", "error", err)
m.logger.Error("execute block", "error", err)
return err
}

Expand Down Expand Up @@ -74,22 +74,22 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty

err = batch.Commit()
if err != nil {
m.logger.Error("Failed to persist batch to disk", "error", err)
m.logger.Error("persist batch to disk", "error", err)
return err
}

// Commit block to app
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("Failed to commit to the block", "error", err)
m.logger.Error("commit to the block", "error", err)
return err
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
if err != nil {
m.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
Expand All @@ -103,7 +103,7 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty

_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
m.logger.Error("update state", "error", err)
return err
}
m.lastState = newState
Expand All @@ -124,7 +124,7 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {

err := m.applyBlock(ctx, prevCachedBlock, m.prevCommit[m.store.Height()+1], blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply previously cached block", "err", err)
m.logger.Debug("apply previously cached block", "err", err)
return err
}
prevCachedBlock, exists = m.prevBlock[m.store.Height()+1]
Expand All @@ -145,7 +145,7 @@ func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bo
// Validate incosistency in height wasn't caused by a crash and if so handle it.
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return isRequired, errors.Wrap(err, "failed to get app info")
return isRequired, errors.Wrap(err, "get app info")
}
if uint64(proxyAppInfo.LastBlockHeight) != block.Header.Height {
return isRequired, nil
Expand All @@ -160,13 +160,13 @@ func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bo

resp, err := m.store.LoadBlockResponses(block.Header.Height)
if err != nil {
return isRequired, errors.Wrap(err, "failed to load block responses")
return isRequired, errors.Wrap(err, "load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
return isRequired, errors.Wrap(err, "failed to update state")
return isRequired, errors.Wrap(err, "update state")
}
m.store.SetHeight(block.Header.Height)
return isRequired, nil
Expand All @@ -193,13 +193,12 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
m.logger.Error("Failed to marshal block", "error", err)
m.logger.Error("marshal block", "error", err)
return err
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
m.logger.Error("Failed to gossip block", "error", err)
m.logger.Error("gossip block", "error", err)
return err
}
return nil

}
6 changes: 3 additions & 3 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ import (
)

func (m *Manager) RunInitChain(ctx context.Context) error {
//get the proposer's consensus pubkey
// get the proposer's consensus pubkey
proposer := m.settlementClient.GetProposer()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposer.PublicKey)
if err != nil {
return err
}
gensisValSet := []*tmtypes.Validator{tmtypes.NewValidator(tmPubKey, 1)}

//call initChain with both addresses
// call initChain with both addresses
res, err := m.executor.InitChain(m.genesis, gensisValSet)
if err != nil {
return err
}

//update the state with only the consensus pubkey
// update the state with only the consensus pubkey
m.executor.UpdateStateAfterInitChain(&m.lastState, res, gensisValSet)
if _, err := m.store.UpdateState(m.lastState, nil); err != nil {
return err
Expand Down
17 changes: 9 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -93,11 +94,11 @@ func NewManager(

exec, err := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
if err != nil {
return nil, fmt.Errorf("failed to create block executor: %w", err)
return nil, fmt.Errorf("create block executor: %w", err)
}
s, err := getInitialState(store, genesis, logger)
if err != nil {
return nil, fmt.Errorf("failed to get initial state: %w", err)
return nil, fmt.Errorf("get initial state: %w", err)
}

agg := &Manager{
Expand Down Expand Up @@ -149,7 +150,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {

err := m.syncBlockManager(ctx)
if err != nil {
err = fmt.Errorf("failed to sync block manager: %w", err)
err = fmt.Errorf("sync block manager: %w", err)
return err
}

Expand All @@ -171,8 +172,8 @@ func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err != nil {
// TODO: separate between fresh rollapp and non-registred rollapp
if err == settlement.ErrBatchNotFound {
// TODO: separate between fresh rollapp and non-registered rollapp
if errors.Is(err, settlement.ErrBatchNotFound) {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
Expand Down Expand Up @@ -236,12 +237,12 @@ func (m *Manager) applyBlockCallback(event pubsub.Message) {
} else {
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
m.logger.Debug("apply block", "err", err)
}
}
err := m.attemptApplyCachedBlocks(context.Background())
if err != nil {
m.logger.Debug("Failed to apply previous cached blocks", "err", err)
m.logger.Debug("apply previous cached blocks", "err", err)
}
}

Expand All @@ -253,7 +254,7 @@ func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultR
// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger types.Logger) (types.State, error) {
s, err := store.LoadState()
if err == types.ErrNoStateFound {
if errors.Is(err, types.ErrNoStateFound) {
logger.Info("failed to find state in the store, creating new state from genesis")
return types.NewFromGenesisDoc(genesis)
}
Expand Down
22 changes: 12 additions & 10 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func TestInitialState(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", logger)
BoostrapTime: 30 * time.Second,
}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down Expand Up @@ -94,7 +95,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := getMockDALC(logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand Down Expand Up @@ -134,11 +134,11 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
time.Sleep(time.Millisecond * 500)
}

//Initially sync target is 0
// Initially sync target is 0
assert.True(t, manager.syncTarget == 0)
assert.True(t, manager.store.Height() == 0)

//enough time to sync and produce blocks
// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.
Expand All @@ -151,7 +151,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}()
<-ctx.Done()
assert.True(t, manager.syncTarget == batch.EndHeight)
//validate that we produced blocks
// validate that we produced blocks
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}

Expand Down Expand Up @@ -376,8 +376,10 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: tc.AppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: tc.LastAppBlockHeight,
LastBlockAppHash: tc.LastAppCommitHash[:],
}).Once()
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.produceBlock(context.Background(), true)
Expand Down Expand Up @@ -405,7 +407,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1000
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes //enough for 2 block, not enough for 10 blocks
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

Expand Down Expand Up @@ -452,8 +454,8 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1)
assert.Less(batch.EndHeight, endHeight)

//validate next added block to batch would have been actually too big
//First relax the byte limit so we could proudce larger batch
// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.conf.BlockBatchMaxSizeBytes = 10 * manager.conf.BlockBatchMaxSizeBytes
newBatch, err := manager.createNextDABatch(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)
Expand Down
17 changes: 8 additions & 9 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -29,39 +30,38 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
defer tickerEmptyBlocksMaxTime.Stop()
}

//Allow the initial block to be empty
// Allow the initial block to be empty
produceEmptyBlock := true
for {
select {
//Context canceled
// Context canceled
case <-ctx.Done():
return
// If we got a request for an empty block produce it and don't wait for the ticker
case <-m.produceEmptyBlockCh:
produceEmptyBlock = true
//Empty blocks timeout
// Empty blocks timeout
case <-tickerEmptyBlocksMaxTimeCh:
m.logger.Debug(fmt.Sprintf("No transactions for %.2f seconds, producing empty block", m.conf.EmptyBlocksMaxTime.Seconds()))
produceEmptyBlock = true
//Produce block
// Produce block
case <-ticker.C:
err := m.produceBlock(ctx, produceEmptyBlock)
if err == types.ErrSkippedEmptyBlock {
// m.logger.Debug("Skipped empty block")
if errors.Is(err, types.ErrSkippedEmptyBlock) {
continue
}
if err != nil {
m.logger.Error("error while producing block", "error", err)
m.shouldProduceBlocksCh <- false
continue
}
//If empty blocks enabled, after block produced, reset the timeout timer
// If empty blocks enabled, after block produced, reset the timeout timer
if tickerEmptyBlocksMaxTime != nil {
produceEmptyBlock = false
tickerEmptyBlocksMaxTime.Reset(m.conf.EmptyBlocksMaxTime)
}

//Node's health check channel
// Node's health check channel
case shouldProduceBlocks := <-m.shouldProduceBlocksCh:
for !shouldProduceBlocks {
m.logger.Info("Stopped block production")
Expand Down Expand Up @@ -193,5 +193,4 @@ func (m *Manager) createTMSignature(block *types.Block, proposerAddress []byte,
return nil, fmt.Errorf("wrong signature")
}
return vote.Signature, nil

}
4 changes: 2 additions & 2 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCreateEmptyBlocksEnableDisable(t *testing.T) {
manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

//Check initial height
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.store.Height())

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestCreateEmptyBlocksNew(t *testing.T) {

mpool := mempoolv1.NewTxMempool(log.TestingLogger(), tmcfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)

//Check initial height
// Check initial height
expectedHeight := uint64(0)
assert.Equal(expectedHeight, manager.store.Height())

Expand Down
4 changes: 2 additions & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) {

pruned, err := m.store.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
return 0, fmt.Errorf("prune block store: %w", err)
}

//TODO: prune state/indexer and state/txindexer??
// TODO: prune state/indexer and state/txindexer??

return pruned, nil
}
8 changes: 4 additions & 4 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.lastState.SLStateIndex, stateIndex)
_, err := m.store.UpdateState(m.lastState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
m.logger.Error("update state", "error", err)
return err
}
return nil
Expand Down Expand Up @@ -115,9 +115,9 @@ func (m *Manager) fetchBatch(daMetaData *da.DASubmitMetaData) da.ResultRetrieveB
},
}
}
//batchRes.MetaData includes proofs necessary to open disputes with the Hub
// batchRes.MetaData includes proofs necessary to open disputes with the Hub
batchRes := m.retriever.RetrieveBatches(daMetaData)
//TODO(srene) : for invalid transactions there is no specific error code since it will need to be validated somewhere else for fraud proving.
//NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
// TODO(srene) : for invalid transactions there is no specific error code since it will need to be validated somewhere else for fraud proving.
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
return batchRes
}
Loading

0 comments on commit ce1a88a

Please sign in to comment.