Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: nodes keep out of sync when missing gossiped block (issue #284) #540

Merged
merged 15 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
if block.Header.Height != m.store.Height()+1 {
// We crashed after the commit and before updating the store height.
m.logger.Error("Block not applied. Wrong height", "block height", block.Header.Height, "store height", m.store.Height())

Check warning on line 22 in block/block.go

View check run for this annotation

Codecov / codecov/patch

block/block.go#L22

Added line #L22 was not covered by tests
return nil
}

Expand Down Expand Up @@ -112,6 +113,32 @@
return nil
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

prevCachedBlock, exists := m.prevBlock[m.store.Height()+1]

for exists {
m.logger.Debug("Applying cached block", "height", m.store.Height()+1)

err := m.applyBlock(ctx, prevCachedBlock, m.prevCommit[m.store.Height()+1], blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply previously cached block", "err", err)
return err
}
prevCachedBlock, exists = m.prevBlock[m.store.Height()+1]

Check warning on line 130 in block/block.go

View check run for this annotation

Codecov / codecov/patch

block/block.go#L123-L130

Added lines #L123 - L130 were not covered by tests
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}

Check warning on line 137 in block/block.go

View check run for this annotation

Codecov / codecov/patch

block/block.go#L134-L137

Added lines #L134 - L137 were not covered by tests
}
return nil
}

// alignStoreWithApp is responsible for aligning the state of the store and the abci app if necessary.
func (m *Manager) alignStoreWithApp(ctx context.Context, block *types.Block) (bool, error) {
isRequired := false
Expand Down
35 changes: 27 additions & 8 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool

syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond
produceBlockMutex sync.Mutex
syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

syncCache map[uint64]*types.Block

logger log.Logger

prevBlock map[uint64]*types.Block
prevCommit map[uint64]*types.Commit
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
Expand Down Expand Up @@ -168,6 +172,8 @@
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
prevBlock: make(map[uint64]*types.Block),
prevCommit: make(map[uint64]*types.Commit),
}

return agg, nil
Expand Down Expand Up @@ -212,13 +218,26 @@
}

func (m *Manager) applyBlockCallback(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data())
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))

Check warning on line 221 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L221

Added line #L221 was not covered by tests
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})

if block.Header.Height != m.store.Height()+1 {
if block.Header.Height > m.store.Height() {
m.prevBlock[block.Header.Height] = &block
m.prevCommit[block.Header.Height] = &commit
m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height())
}
} else {
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
}

Check warning on line 236 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L225-L236

Added lines #L225 - L236 were not covered by tests
}
err := m.attemptApplyCachedBlocks(context.Background())

Check warning on line 238 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L238

Added line #L238 was not covered by tests
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
m.logger.Debug("Failed to apply previous cached blocks", "err", err)

Check warning on line 240 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L240

Added line #L240 was not covered by tests
}
}

Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestInitialState(t *testing.T) {

// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", 50, logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
11 changes: 6 additions & 5 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,12 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {

// Init manager with empty blocks feature enabled
managerConfig := config.BlockManagerConfig{
BlockTime: blockTime,
EmptyBlocksMaxTime: 0,
BatchSubmitMaxTime: submitTimeout,
BlockBatchSize: batchSize,
BlockBatchMaxSizeBytes: 1000,
BlockTime: blockTime,
EmptyBlocksMaxTime: 0,
BatchSubmitMaxTime: submitTimeout,
BlockBatchSize: batchSize,
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
}

manager, err := getManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
Expand Down
11 changes: 4 additions & 7 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,12 @@
return err
}

if settlementBatch.StartHeight != currentHeight+1 {
return fmt.Errorf("settlement batch start height (%d) on index (%d) is not the expected", settlementBatch.StartHeight, currStateIdx)
}

err = m.processNextDABatch(ctx, settlementBatch.MetaData.DA.Height)
if err != nil {
return err
}

currentHeight = m.store.Height()
if currentHeight != settlementBatch.EndHeight {
return fmt.Errorf("after applying state index (%d), the height (%d) is not as expected (%d)", currStateIdx, currentHeight, settlementBatch.EndHeight)
}

err = m.updateStateIndex(settlementBatch.StateIndex)
if err != nil {
Expand Down Expand Up @@ -99,6 +92,10 @@
}
}
}
err = m.attemptApplyCachedBlocks(ctx)
if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}

Check warning on line 98 in block/retriever.go

View check run for this annotation

Codecov / codecov/patch

block/retriever.go#L97-L98

Added lines #L97 - L98 were not covered by tests
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions block/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", 50, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,10 +143,11 @@ func initSettlementLayerMock(settlementlc settlement.LayerI, proposer string, pu

func getManagerConfig() config.BlockManagerConfig {
return config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
BlockTime: 100 * time.Millisecond,
BlockBatchSize: defaultBatchSize,
BlockBatchMaxSizeBytes: 1000,
BatchSubmitMaxTime: 30 * time.Minute,
NamespaceID: "0102030405060708",
GossipedBlocksCacheSize: 50,
}
}
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
BlockBatchSize uint64 `mapstructure:"block_batch_size"`
// The size of the batch in Bytes. Every batch we'll write to the DA and the settlement layer.
BlockBatchMaxSizeBytes uint64 `mapstructure:"block_batch_max_size_bytes"`
// The number of messages cached by gossipsub protocol
GossipedBlocksCacheSize int `mapstructure:"gossiped_blocks_cache_size"`
}

// GetViperConfig reads configuration parameters from Viper instance.
Expand Down Expand Up @@ -114,6 +116,10 @@
return fmt.Errorf("block_batch_size_bytes must be positive")
}

if c.GossipedBlocksCacheSize <= 0 {
return fmt.Errorf("gossiped_blocks_cache_size must be positive")
}

Check warning on line 121 in config/config.go

View check run for this annotation

Codecov / codecov/patch

config/config.go#L120-L121

Added lines #L120 - L121 were not covered by tests

return nil
}

Expand Down
13 changes: 7 additions & 6 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ func DefaultConfig(home, chainId string) *NodeConfig {
Seeds: ""},
Aggregator: true,
BlockManagerConfig: BlockManagerConfig{
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000},
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000,
GossipedBlocksCacheSize: 50},
DALayer: "mock",
SettlementLayer: "mock",
Instrumentation: &InstrumentationConfig{
Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ da_config = "{{ .DAConfig }}"
# max size of batch in bytes that can be accepted by DA
block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}

# max number of cached messages by gossipsub protocol
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}

#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"namespace_id\":\"000000000000ffff\"}"
# Avail config example:
Expand Down
Loading
Loading