Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/state): change map of tries implementation to have working garbage collection #2206

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dot/state/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func TestService_PruneStorage(t *testing.T) {
time.Sleep(1 * time.Second)

for _, v := range prunedArr {
_, has := serv.Storage.tries.Load(v.hash)
require.Equal(t, false, has)
tr := serv.Storage.tries.get(v.hash)
require.Nil(t, tr)
}
}

Expand Down
50 changes: 16 additions & 34 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func errTrieDoesNotExist(hash common.Hash) error {
// StorageState is the struct that holds the trie, db and lock
type StorageState struct {
blockState *BlockState
tries *sync.Map // map[common.Hash]*trie.Trie // map of root -> trie
tries *tries

db chaindb.Database
sync.RWMutex
Expand All @@ -39,7 +39,6 @@ type StorageState struct {
changedLock sync.RWMutex
observerList []Observer
pruner pruner.Pruner
syncing bool
}

// NewStorageState creates a new StorageState backed by the given trie and database located at basePath.
Expand All @@ -53,8 +52,7 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,
return nil, fmt.Errorf("cannot have nil trie")
}

tries := new(sync.Map)
tries.Store(t.MustHash(), t)
tries := newTries(t)

storageTable := chaindb.NewTable(db, storagePrefix)

Expand All @@ -78,30 +76,16 @@ func NewStorageState(db chaindb.Database, blockState *BlockState,
}, nil
}

// SetSyncing sets whether the node is currently syncing or not
func (s *StorageState) SetSyncing(syncing bool) {
s.syncing = syncing
}

func (s *StorageState) pruneKey(keyHeader *types.Header) {
logger.Tracef("pruning trie, number=%d hash=%s", keyHeader.Number, keyHeader.Hash())
s.tries.Delete(keyHeader.StateRoot)
s.tries.delete(keyHeader.StateRoot)
}

// StoreTrie stores the given trie in the StorageState and writes it to the database
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header) error {
root := ts.MustRoot()

if s.syncing {
// keep only the trie at the head of the chain when syncing
// TODO: probably remove this when memory usage improves (#1494)
s.tries.Range(func(k, _ interface{}) bool {
s.tries.Delete(k)
return true
})
}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved

_, _ = s.tries.LoadOrStore(root, ts.Trie())
s.tries.softSet(root, ts.Trie())

if _, ok := s.pruner.(*pruner.FullNode); header == nil && ok {
return fmt.Errorf("block cannot be empty for Full node pruner")
Expand Down Expand Up @@ -142,20 +126,16 @@ func (s *StorageState) TrieState(root *common.Hash) (*rtstorage.TrieState, error
root = &sr
}

st, has := s.tries.Load(*root)
if !has {
t := s.tries.get(*root)
if t == nil {
var err error
st, err = s.LoadFromDB(*root)
t, err = s.LoadFromDB(*root)
if err != nil {
return nil, err
}

_, _ = s.tries.LoadOrStore(*root, st)
}

t := st.(*trie.Trie)

if has && t.MustHash() != *root {
s.tries.softSet(*root, t)
} else if t.MustHash() != *root {
panic("trie does not have expected root")
}

Expand All @@ -177,7 +157,7 @@ func (s *StorageState) LoadFromDB(root common.Hash) (*trie.Trie, error) {
return nil, err
}

_, _ = s.tries.LoadOrStore(t.MustHash(), t)
s.tries.softSet(t.MustHash(), t)
return t, nil
}

Expand All @@ -190,8 +170,9 @@ func (s *StorageState) loadTrie(root *common.Hash) (*trie.Trie, error) {
root = &sr
}

if t, has := s.tries.Load(*root); has && t != nil {
return t.(*trie.Trie), nil
t := s.tries.get(*root)
if t != nil {
return t, nil
}

tr, err := s.LoadFromDB(*root)
Expand Down Expand Up @@ -220,8 +201,9 @@ func (s *StorageState) GetStorage(root *common.Hash, key []byte) ([]byte, error)
root = &sr
}

if t, has := s.tries.Load(*root); has {
val := t.(*trie.Trie).Get(key)
t := s.tries.get(*root)
if t != nil {
val := t.Get(key)
return val, nil
}

Expand Down
38 changes: 6 additions & 32 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package state

import (
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -99,7 +98,7 @@ func TestStorage_TrieState(t *testing.T) {
time.Sleep(time.Millisecond * 100)

// get trie from db
storage.tries.Delete(root)
storage.tries.delete(root)
ts3, err := storage.TrieState(&root)
require.NoError(t, err)
require.Equal(t, ts.Trie().MustHash(), ts3.Trie().MustHash())
Expand Down Expand Up @@ -131,49 +130,25 @@ func TestStorage_LoadFromDB(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.Delete(root)
storage.tries.delete(root)

data, err := storage.GetStorage(&root, trieKV[0].key)
require.NoError(t, err)
require.Equal(t, trieKV[0].value, data)

storage.tries.Delete(root)
storage.tries.delete(root)

prefixKeys, err := storage.GetKeysWithPrefix(&root, []byte("ke"))
require.NoError(t, err)
require.Equal(t, 2, len(prefixKeys))

storage.tries.Delete(root)
storage.tries.delete(root)

entries, err := storage.Entries(&root)
require.NoError(t, err)
require.Equal(t, 3, len(entries))
}

func syncMapLen(m *sync.Map) int {
l := 0
m.Range(func(_, _ interface{}) bool {
l++
return true
})
return l
}

func TestStorage_StoreTrie_Syncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
require.NoError(t, err)

key := []byte("testkey")
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(true)
err = storage.StoreTrie(ts, nil)
require.NoError(t, err)
require.Equal(t, 1, syncMapLen(storage.tries))
}

func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
storage := newTestStorageState(t)
ts, err := storage.TrieState(&trie.EmptyHash)
Expand All @@ -183,10 +158,9 @@ func TestStorage_StoreTrie_NotSyncing(t *testing.T) {
value := []byte("testvalue")
ts.Set(key, value)

storage.SetSyncing(false)
err = storage.StoreTrie(ts, nil)
require.NoError(t, err)
require.Equal(t, 2, syncMapLen(storage.tries))
require.Equal(t, 2, storage.tries.len())
}

func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
Expand Down Expand Up @@ -233,7 +207,7 @@ func TestGetStorageChildAndGetStorageFromChild(t *testing.T) {
require.NoError(t, err)

// Clear trie from cache and fetch data from disk.
storage.tries.Delete(rootHash)
storage.tries.delete(rootHash)

_, err = storage.GetStorageChild(&rootHash, []byte("keyToChild"))
require.NoError(t, err)
Expand Down
60 changes: 60 additions & 0 deletions dot/state/tries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package state

import (
"sync"

"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/trie"
)

type tries struct {
rootToTrie map[common.Hash]*trie.Trie
mapMutex sync.RWMutex
}

func newTries(t *trie.Trie) *tries {
return &tries{
rootToTrie: map[common.Hash]*trie.Trie{
t.MustHash(): t,
},
}
}

// softSet sets the given trie at the given root hash
// in the memory map only if it is not already set.
func (t *tries) softSet(root common.Hash, trie *trie.Trie) {
t.mapMutex.Lock()
defer t.mapMutex.Unlock()

_, has := t.rootToTrie[root]
if has {
return
}

t.rootToTrie[root] = trie
}

func (t *tries) delete(root common.Hash) {
t.mapMutex.Lock()
defer t.mapMutex.Unlock()
delete(t.rootToTrie, root)
}

// get retrieves the trie corresponding to the root hash given
// from the in-memory thread safe map.
func (t *tries) get(root common.Hash) (tr *trie.Trie) {
t.mapMutex.RLock()
defer t.mapMutex.RUnlock()
return t.rootToTrie[root]
}

// len returns the current numbers of tries
// stored in the in-memory map.
func (t *tries) len() int {
t.mapMutex.RLock()
defer t.mapMutex.RUnlock()
return len(t.rootToTrie)
}
Loading