Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: private trie cache db with gc #1235

Merged
merged 12 commits into from
Aug 31, 2021
64 changes: 58 additions & 6 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ type BlockChain struct {

// privateStateManager manages private state(s) for this blockchain
privateStateManager mps.PrivateStateManager
privateTrieGC *prque.Prque // Priority queue mapping block numbers to tries to gc

// End Quorum
}

Expand Down Expand Up @@ -262,7 +264,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
vmConfig: vmConfig,
badBlocks: badBlocks,
// Quorum
quorumConfig: quorumChainConfig,
quorumConfig: quorumChainConfig,
privateTrieGC: prque.New(nil),
}
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
Expand Down Expand Up @@ -1165,7 +1168,7 @@ func (bc *BlockChain) Stop() {
// - HEAD-127: So we have a hard limit on the number of blocks reexecuted
if !bc.cacheConfig.TrieDirtyDisabled {
triedb := bc.stateCache.TrieDB()

privateTrieDb := bc.PrivateStateManager().TrieDB()
for _, offset := range []uint64{0, 1, TriesInMemory - 1} {
if number := bc.CurrentBlock().NumberU64(); number > offset {
recent := bc.GetBlockByNumber(number - offset)
Expand All @@ -1174,6 +1177,13 @@ func (bc *BlockChain) Stop() {
if err := triedb.Commit(recent.Root(), true, nil); err != nil {
log.Error("Failed to commit recent state trie", "err", err)
}
// Quorum
privateRoot := rawdb.GetPrivateStateRoot(bc.db, recent.Root())
log.Info("Writing private cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "privateRoot", privateRoot)
if err := privateTrieDb.Commit(privateRoot, true, nil); err != nil {
log.Error("Failed to commit recent private state trie", "err", err)
}
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
// End Quorum
}
}
if snapBase != (common.Hash{}) {
Expand All @@ -1185,9 +1195,15 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
}
for !bc.privateTrieGC.Empty() {
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
privateTrieDb.Dereference(bc.privateTrieGC.PopItem().(common.Hash))
}
if size, _ := triedb.Size(); size != 0 {
log.Error("Dangling trie nodes after full cleanup")
}
if size, _ := privateTrieDb.Size(); size != 0 {
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
log.Error("Dangling private trie nodes after full cleanup")
}
}
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
Expand Down Expand Up @@ -1690,7 +1706,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// Make sure no inconsistent state is leaked during insertion
// Quorum
// Write private state changes to database
err = psManager.CommitAndWrite(bc.chainConfig.IsEIP158(block.Number()), block)
privateRoot, err := psManager.CommitAndWrite(bc.chainConfig.IsEIP158(block.Number()), block)
if err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1719,27 +1735,49 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()
// Quorum
privateTrieDB := bc.PrivateStateManager().TrieDB()

// If we're running an archive node, always flush
if bc.cacheConfig.TrieDirtyDisabled {
if err := triedb.Commit(root, false, nil); err != nil {
return NonStatTy, err
}

if len(privateRoot.Bytes()) != 0 {
// Quorum commit private root
if err := privateTrieDB.Commit(privateRoot, false, nil); err != nil {
return NonStatTy, err
}
}
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -int64(block.NumberU64()))

// Quorum
if len(privateRoot.Bytes()) != 0 {
privateTrieDB.Reference(privateRoot, common.Hash{}) // metadata reference to keep private trie alive
bc.privateTrieGC.Push(privateRoot, -int64(block.NumberU64()))
}
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
// End Quorum

if current := block.NumberU64(); current > TriesInMemory {
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
nodes, imgs = triedb.Size()
privateNodes, privateImgs = privateTrieDB.Size()
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}

// Quorum
if privateNodes > limit || privateImgs > 4*1024*1024 {
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
privateTrieDB.Cap(limit - ethdb.IdealBatchSize)
}
// End Quorum

// Find the next state trie we need to commit
chosen := current - TriesInMemory

Expand All @@ -1758,6 +1796,12 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// Flush an entire trie and restart the counters
triedb.Commit(header.Root, true, nil)

// Quorum
privateroot := rawdb.GetPrivateStateRoot(bc.db, header.Root)
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
privateTrieDB.Commit(privateroot, true, nil)
// End Quorum

lastWrite = chosen
bc.gcproc = 0
}
Expand All @@ -1771,6 +1815,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
triedb.Dereference(root.(common.Hash))
}
for !bc.privateTrieGC.Empty() {
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
root, number := bc.privateTrieGC.Pop()
if uint64(-number) > chosen {
bc.privateTrieGC.Push(root, number)
break
}
privateTrieDB.Dereference(root.(common.Hash))
}
baptiste-b-pegasys marked this conversation as resolved.
Show resolved Hide resolved
}
}
// If the total difficulty is higher than our known, add it to the canonical chain
Expand Down
10 changes: 6 additions & 4 deletions core/mps/default_psr.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type DefaultPrivateStateRepository struct {
root common.Hash
}

var _ PrivateStateRepository = (*DefaultPrivateStateRepository)(nil) // DefaultPrivateStateRepository must implement PrivateStateRepository

func NewDefaultPrivateStateRepository(db ethdb.Database, cache state.Database, previousBlockHash common.Hash) (*DefaultPrivateStateRepository, error) {
root := rawdb.GetPrivateStateRoot(db, previousBlockHash)

Expand Down Expand Up @@ -63,17 +65,17 @@ func (dpsr *DefaultPrivateStateRepository) Reset() error {
}

// CommitAndWrite commits the private state and writes to disk
func (dpsr *DefaultPrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) error {
func (dpsr *DefaultPrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error) {
privateRoot, err := dpsr.stateDB.Commit(isEIP158)
if err != nil {
return err
return privateRoot, err
}

if err := rawdb.WritePrivateStateRoot(dpsr.db, block.Root(), privateRoot); err != nil {
log.Error("Failed writing private state root", "err", err)
return err
return privateRoot, err
}
return dpsr.stateCache.TrieDB().Commit(privateRoot, false, nil)
return privateRoot, nil
}

// Commit commits the private state only
Expand Down
2 changes: 1 addition & 1 deletion core/mps/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type PrivateStateMetadataResolver interface {
// retrieving from and peristing private states to the underlying database
type PrivateStateRepository interface {
StatePSI(psi types.PrivateStateIdentifier) (*state.StateDB, error)
CommitAndWrite(isEIP158 bool, block *types.Block) error
CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error)
Commit(isEIP158 bool, block *types.Block) error
Copy() PrivateStateRepository
Reset() error
Expand Down
17 changes: 7 additions & 10 deletions core/mps/multiple_psr.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MultiplePrivateStateRepository struct {
managedStates map[types.PrivateStateIdentifier]*managedState
}

var _ PrivateStateRepository = (*MultiplePrivateStateRepository)(nil) // MultiplePrivateStateRepository must implement PrivateStateRepository

func NewMultiplePrivateStateRepository(db ethdb.Database, cache state.Database, privateStatesTrieRoot common.Hash) (*MultiplePrivateStateRepository, error) {
tr, err := cache.OpenTrie(privateStatesTrieRoot)
if err != nil {
Expand Down Expand Up @@ -152,33 +154,28 @@ func (mpsr *MultiplePrivateStateRepository) Reset() error {
}

// CommitAndWrite commits all private states, updates the trie of private states, writes to disk
func (mpsr *MultiplePrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) error {
func (mpsr *MultiplePrivateStateRepository) CommitAndWrite(isEIP158 bool, block *types.Block) (common.Hash, error) {
mpsr.mux.Lock()
defer mpsr.mux.Unlock()
// commit each managed state
for psi, managedState := range mpsr.managedStates {
// calculate and commit state root if required
privateRoot, err := managedState.stateRootProviderFunc(isEIP158)
if err != nil {
return err
return privateRoot, err
}
// update the managed state root in the trie of state roots
if err := mpsr.trie.TryUpdate([]byte(psi), privateRoot.Bytes()); err != nil {
return err
return privateRoot, err
}
}
// commit the trie of states
mtRoot, err := mpsr.trie.Commit(nil)
if err != nil {
return err
return mtRoot, err
}
err = rawdb.WritePrivateStatesTrieRoot(mpsr.db, block.Root(), mtRoot)
if err != nil {
return err
}
privateTriedb := mpsr.repoCache.TrieDB()
err = privateTriedb.Commit(mtRoot, false, nil)
return err
return mtRoot, err
}

// Commit commits all private states, updates the trie of private states only
Expand Down
6 changes: 5 additions & 1 deletion core/mps/upgrade_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func UpgradeDB(db ethdb.Database, chain chainReader) error {
privateState.stateRootProviderFunc = func(_ bool) (common.Hash, error) {
return rawdb.GetPrivateStateRoot(db, header.Root), nil
}
err = mpsRepo.CommitAndWrite(chain.Config().IsEIP158(block.Number()), block)
privateRoot, err := mpsRepo.CommitAndWrite(chain.Config().IsEIP158(block.Number()), block)
if err != nil {
return err
}
err = mpsRepo.repoCache.TrieDB().Commit(privateRoot, false, nil)
if err != nil {
return err
}
Expand Down