Skip to content

Commit

Permalink
fix: private trie cache db with gc (#1235)
Browse files Browse the repository at this point in the history
* fix: private trie cache db with gc

* fix: commit

* fix: use the right private root

* contract implementation enforcement

* fix merge

* GoQ comments

* Apply suggestions from code review

GoQuorum Comments
  • Loading branch information
baptiste-b-pegasys authored Aug 31, 2021
1 parent ccfc426 commit 614b212
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 22 deletions.
66 changes: 60 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ type BlockChain struct {

// privateStateManager manages private state(s) for this blockchain
privateStateManager mps.PrivateStateManager
privateTrieGC *prque.Prque // Priority queue mapping block numbers to tries to gc

// End Quorum
}

Expand Down Expand Up @@ -262,7 +264,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
vmConfig: vmConfig,
badBlocks: badBlocks,
// Quorum
quorumConfig: quorumChainConfig,
quorumConfig: quorumChainConfig,
privateTrieGC: prque.New(nil),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -1165,7 +1168,7 @@ func (bc *BlockChain) Stop() {
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

privateTrieDb := bc.PrivateStateManager().TrieDB()
for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
Expand All @@ -1174,6 +1177,13 @@ func (bc *BlockChain) Stop() {
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
// Quorum
privateRoot := rawdb.GetPrivateStateRoot(bc.db, recent.Root())
log.Info("Writing private cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "privateRoot", privateRoot)
if err := privateTrieDb.Commit(privateRoot, true, nil); err != nil {
log.Error("Failed to commit recent private state trie", "err", err)
}
// End Quorum
}
}
if snapBase != (common.Hash{}) {
Expand All @@ -1185,9 +1195,15 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
}
for !bc.privateTrieGC.Empty() { // Quorum
privateTrieDb.Dereference(bc.privateTrieGC.PopItem().(common.Hash))
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
if size, _ := privateTrieDb.Size(); size != 0 { // Quorum
log.Error("Dangling private trie nodes after full cleanup")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
Expand Down Expand Up @@ -1690,7 +1706,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Make sure no inconsistent state is leaked during insertion
// Quorum
// Write private state changes to database
err = psManager.CommitAndWrite(bc.chainConfig.IsEIP158(block.Number()), block)
privateRoot, err := psManager.CommitAndWrite(bc.chainConfig.IsEIP158(block.Number()), block)
if err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1719,27 +1735,49 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()
// Quorum
privateTrieDB := bc.PrivateStateManager().TrieDB()

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, err
}

if len(privateRoot.Bytes()) != 0 {
// Quorum commit private root
if err := privateTrieDB.Commit(privateRoot, false, nil); err != nil {
return NonStatTy, err
}
}
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

// Quorum
if len(privateRoot.Bytes()) != 0 {
privateTrieDB.Reference(privateRoot, common.Hash{}) // metadata reference to keep private trie alive
bc.privateTrieGC.Push(privateRoot, -int64(block.NumberU64()))
}
// End Quorum

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
nodes, imgs = triedb.Size()
privateNodes, privateImgs = privateTrieDB.Size() // Quorum
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}

// Quorum
if privateNodes > limit || privateImgs > 4*1024*1024 {
privateTrieDB.Cap(limit - ethdb.IdealBatchSize)
}
// End Quorum

// Find the next state trie we need to commit
chosen := current - TriesInMemory

Expand All @@ -1758,6 +1796,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)

// Quorum
privateroot := rawdb.GetPrivateStateRoot(bc.db, header.Root)
privateTrieDB.Commit(privateroot, true, nil)
// End Quorum

lastWrite = chosen
bc.gcproc = 0
}
Expand All @@ -1771,6 +1815,16 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
triedb.Dereference(root.(common.Hash))
}
// Quorum
for !bc.privateTrieGC.Empty() {
root, number := bc.privateTrieGC.Pop()
if uint64(-number) > chosen {
bc.privateTrieGC.Push(root, number)
break
}
privateTrieDB.Dereference(root.(common.Hash))
}
// End Quorum
}
}
// If the total difficulty is higher than our known, add it to the canonical chain
Expand Down
10 changes: 6 additions & 4 deletions core/mps/default_psr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type DefaultPrivateStateRepository struct {
root common.Hash
}

var _ PrivateStateRepository = (*DefaultPrivateStateRepository)(nil) // DefaultPrivateStateRepository must implement PrivateStateRepository

func NewDefaultPrivateStateRepository(db ethdb.Database, cache state.Database, previousBlockHash common.Hash) (*DefaultPrivateStateRepository, error) {
root := rawdb.GetPrivateStateRoot(db, previousBlockHash)

Expand Down Expand Up @@ -63,17 +65,17 @@ func (dpsr *DefaultPrivateStateRepository) Reset() error {
}

// CommitAndWrite commits the private state and writes to disk
func (dpsr *DefaultPrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) error {
func (dpsr *DefaultPrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error) {
privateRoot, err := dpsr.stateDB.Commit(isEIP158)
if err != nil {
return err
return privateRoot, err
}

if err := rawdb.WritePrivateStateRoot(dpsr.db, block.Root(), privateRoot); err != nil {
log.Error("Failed writing private state root", "err", err)
return err
return privateRoot, err
}
return dpsr.stateCache.TrieDB().Commit(privateRoot, false, nil)
return privateRoot, nil
}

// Commit commits the private state only
Expand Down
2 changes: 1 addition & 1 deletion core/mps/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type PrivateStateMetadataResolver interface {
// retrieving from and peristing private states to the underlying database
type PrivateStateRepository interface {
StatePSI(psi types.PrivateStateIdentifier) (*state.StateDB, error)
CommitAndWrite(isEIP158 bool, block *types.Block) error
CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error)
Commit(isEIP158 bool, block *types.Block) error
Copy() PrivateStateRepository
Reset() error
Expand Down
17 changes: 7 additions & 10 deletions core/mps/multiple_psr.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MultiplePrivateStateRepository struct {
managedStates map[types.PrivateStateIdentifier]*managedState
}

var _ PrivateStateRepository = (*MultiplePrivateStateRepository)(nil) // MultiplePrivateStateRepository must implement PrivateStateRepository

func NewMultiplePrivateStateRepository(db ethdb.Database, cache state.Database, privateStatesTrieRoot common.Hash) (*MultiplePrivateStateRepository, error) {
tr, err := cache.OpenTrie(privateStatesTrieRoot)
if err != nil {
Expand Down Expand Up @@ -152,33 +154,28 @@ func (mpsr *MultiplePrivateStateRepository) Reset() error {
}

// CommitAndWrite commits all private states, updates the trie of private states, writes to disk
func (mpsr *MultiplePrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) error {
func (mpsr *MultiplePrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error) {
mpsr.mux.Lock()
defer mpsr.mux.Unlock()
// commit each managed state
for psi, managedState := range mpsr.managedStates {
// calculate and commit state root if required
privateRoot, err := managedState.stateRootProviderFunc(isEIP158)
if err != nil {
return err
return privateRoot, err
}
// update the managed state root in the trie of state roots
if err := mpsr.trie.TryUpdate([]byte(psi), privateRoot.Bytes()); err != nil {
return err
return privateRoot, err
}
}
// commit the trie of states
mtRoot, err := mpsr.trie.Commit(nil)
if err != nil {
return err
return mtRoot, err
}
err = rawdb.WritePrivateStatesTrieRoot(mpsr.db, block.Root(), mtRoot)
if err != nil {
return err
}
privateTriedb := mpsr.repoCache.TrieDB()
err = privateTriedb.Commit(mtRoot, false, nil)
return err
return mtRoot, err
}

// Commit commits all private states, updates the trie of private states only
Expand Down
6 changes: 5 additions & 1 deletion core/mps/upgrade_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func UpgradeDB(db ethdb.Database, chain chainReader) error {
privateState.stateRootProviderFunc = func(_ bool) (common.Hash, error) {
return rawdb.GetPrivateStateRoot(db, header.Root), nil
}
err = mpsRepo.CommitAndWrite(chain.Config().IsEIP158(block.Number()), block)
privateRoot, err := mpsRepo.CommitAndWrite(chain.Config().IsEIP158(block.Number()), block)
if err != nil {
return err
}
err = mpsRepo.repoCache.TrieDB().Commit(privateRoot, false, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 614b212

Please sign in to comment.