Skip to content

Commit

Permalink
perf(store): Batch save Block (cometbft#1755)
Browse files Browse the repository at this point in the history
* batching in store

* save only small blocks as 1 batch

* add changelog

* extract const

* Update .changelog/unreleased/improvements/1755-batch-save-block.md

Co-authored-by: Thane Thomson <connect@thanethomson.com>

* simplify code

* no need to panic on closing batch

---------

Co-authored-by: werty144 <anton-paramonov2000@yandex.ru>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
  • Loading branch information
3 people authored Dec 8, 2023
1 parent 0bf3f0a commit 30c9cde
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .changelog/unreleased/improvements/1755-batch-save-block.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[store]` Save block using a single DB batch if block is less than 640kB, otherwise each block part is saved individually
([\#1755](https://github.com/cometbft/cometbft/pull/1755))
76 changes: 57 additions & 19 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"github.com/cometbft/cometbft/types"
)

// Assuming the length of a block part is 64kB (`types.BlockPartSizeBytes`),
// the maximum size of a block, that will be batch saved, is 640kB. The
// benchmarks have shown that `goleveldb` still performs well with blocks of
// this size. However, if the block is larger than 1MB, the performance degrades.
const maxBlockPartsToBatch = 10

/*
BlockStore is a simple low level store for blocks.
Expand Down Expand Up @@ -324,7 +330,7 @@ func (bs *BlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64,
bs.mtx.Lock()
bs.base = base
bs.mtx.Unlock()
bs.saveState()
bs.saveState(batch)

err := batch.WriteSync()
if err != nil {
Expand Down Expand Up @@ -403,12 +409,21 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
if block == nil {
panic("BlockStore can only save a non-nil block")
}
if err := bs.saveBlockToBatch(block, blockParts, seenCommit); err != nil {

batch := bs.db.NewBatch()
defer batch.Close()

if err := bs.saveBlockToBatch(block, blockParts, seenCommit, batch); err != nil {
panic(err)
}

// Save new BlockStoreState descriptor. This also flushes the database.
bs.saveState()
bs.saveState(batch)

err := batch.WriteSync()
if err != nil {
panic(err)
}
}

// SaveBlockWithExtendedCommit persists the given block, blockParts, and
Expand All @@ -423,22 +438,36 @@ func (bs *BlockStore) SaveBlockWithExtendedCommit(block *types.Block, blockParts
if err := seenExtendedCommit.EnsureExtensions(true); err != nil {
panic(fmt.Errorf("problems saving block with extensions: %w", err))
}
if err := bs.saveBlockToBatch(block, blockParts, seenExtendedCommit.ToCommit()); err != nil {

batch := bs.db.NewBatch()
defer batch.Close()

if err := bs.saveBlockToBatch(block, blockParts, seenExtendedCommit.ToCommit(), batch); err != nil {
panic(err)
}
height := block.Height

pbec := seenExtendedCommit.ToProto()
extCommitBytes := mustEncode(pbec)
if err := bs.db.Set(calcExtCommitKey(height), extCommitBytes); err != nil {
if err := batch.Set(calcExtCommitKey(height), extCommitBytes); err != nil {
panic(err)
}

// Save new BlockStoreState descriptor. This also flushes the database.
bs.saveState()
bs.saveState(batch)

err := batch.WriteSync()
if err != nil {
panic(err)
}
}

func (bs *BlockStore) saveBlockToBatch(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error {
func (bs *BlockStore) saveBlockToBatch(
block *types.Block,
blockParts *types.PartSet,
seenCommit *types.Commit,
batch dbm.Batch) error {

if block == nil {
panic("BlockStore can only save a non-nil block")
}
Expand All @@ -456,13 +485,17 @@ func (bs *BlockStore) saveBlockToBatch(block *types.Block, blockParts *types.Par
return fmt.Errorf("BlockStore cannot save seen commit of a different height (block: %d, commit: %d)", height, seenCommit.Height)
}

// If the block is small, batch save the block parts. Otherwise, save the
// parts individually.
saveBlockPartsToBatch := blockParts.Count() <= maxBlockPartsToBatch

// Save block parts. This must be done before the block meta, since callers
// typically load the block meta first as an indication that the block exists
// and then go on to load block parts - we must make sure the block is
// complete as soon as the block meta is written.
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
bs.saveBlockPart(height, i, part, batch, saveBlockPartsToBatch)
}

// Save block meta
Expand All @@ -472,25 +505,25 @@ func (bs *BlockStore) saveBlockToBatch(block *types.Block, blockParts *types.Par
return errors.New("nil blockmeta")
}
metaBytes := mustEncode(pbm)
if err := bs.db.Set(calcBlockMetaKey(height), metaBytes); err != nil {
if err := batch.Set(calcBlockMetaKey(height), metaBytes); err != nil {
return err
}
if err := bs.db.Set(calcBlockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil {
if err := batch.Set(calcBlockHashKey(hash), []byte(fmt.Sprintf("%d", height))); err != nil {
return err
}

// Save block commit (duplicate and separate from the Block)
pbc := block.LastCommit.ToProto()
blockCommitBytes := mustEncode(pbc)
if err := bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes); err != nil {
if err := batch.Set(calcBlockCommitKey(height-1), blockCommitBytes); err != nil {
return err
}

// Save seen commit (seen +2/3 precommits for block)
// NOTE: we can delete this at a later height
pbsc := seenCommit.ToProto()
seenCommitBytes := mustEncode(pbsc)
if err := bs.db.Set(calcSeenCommitKey(height), seenCommitBytes); err != nil {
if err := batch.Set(calcSeenCommitKey(height), seenCommitBytes); err != nil {
return err
}

Expand All @@ -505,25 +538,30 @@ func (bs *BlockStore) saveBlockToBatch(block *types.Block, blockParts *types.Par
return nil
}

func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) {
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part, batch dbm.Batch, saveBlockPartsToBatch bool) {
pbp, err := part.ToProto()
if err != nil {
panic(cmterrors.ErrMsgToProto{MessageName: "Part", Err: err})
}
partBytes := mustEncode(pbp)
if err := bs.db.Set(calcBlockPartKey(height, index), partBytes); err != nil {
if saveBlockPartsToBatch {
err = batch.Set(calcBlockPartKey(height, index), partBytes)
} else {
err = bs.db.Set(calcBlockPartKey(height, index), partBytes)
}
if err != nil {
panic(err)
}
}

func (bs *BlockStore) saveState() {
func (bs *BlockStore) saveState(batch dbm.Batch) {
bs.mtx.RLock()
bss := cmtstore.BlockStoreState{
Base: bs.base,
Height: bs.height,
}
bs.mtx.RUnlock()
SaveBlockStoreState(&bss, bs.db)
SaveBlockStoreState(&bss, batch)
}

// SaveSeenCommit saves a seen commit, used by e.g. the state sync reactor when bootstrapping node.
Expand Down Expand Up @@ -571,12 +609,12 @@ func calcBlockHashKey(hash []byte) []byte {
var blockStoreKey = []byte("blockStore")

// SaveBlockStoreState persists the blockStore state to the database.
func SaveBlockStoreState(bsj *cmtstore.BlockStoreState, db dbm.DB) {
func SaveBlockStoreState(bsj *cmtstore.BlockStoreState, batch dbm.Batch) {
bytes, err := proto.Marshal(bsj)
if err != nil {
panic(fmt.Sprintf("Could not marshal state bytes: %v", err))
}
if err := db.SetSync(blockStoreKey, bytes); err != nil {
if err := batch.Set(blockStoreKey, bytes); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -655,7 +693,7 @@ func (bs *BlockStore) DeleteLatestBlock() error {
bs.mtx.Lock()
bs.height = targetHeight - 1
bs.mtx.Unlock()
bs.saveState()
bs.saveState(batch)

err := batch.WriteSync()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,14 @@ func TestLoadBlockStoreState(t *testing.T) {

for _, tc := range testCases {
db := dbm.NewMemDB()
SaveBlockStoreState(tc.bss, db)
batch := db.NewBatch()
SaveBlockStoreState(tc.bss, batch)
err := batch.WriteSync()
require.NoError(t, err)
retrBSJ := LoadBlockStoreState(db)
assert.Equal(t, tc.want, retrBSJ, "expected the retrieved DBs to match: %s", tc.testName)
err = batch.Close()
require.NoError(t, err)
}
}

Expand Down

0 comments on commit 30c9cde

Please sign in to comment.