Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 27 additions & 55 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"bytes"
"crypto/sha256"
"fmt"
"runtime"
"sort"
"sync"
"time"

"github.com/pkg/errors"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/iavl/internal/logger"
)

// commitGap after upgrade/delete commitGap FastNodes when commit the batch
var commitGap uint64 = 5000000

// ErrVersionDoesNotExist is returned if a requested version does not exist.
var ErrVersionDoesNotExist = errors.New("version does not exist")

Expand Down Expand Up @@ -627,12 +628,6 @@ func (tree *MutableTree) IsUpgradeable() (bool, error) {
// from latest tree.
// nolint: unparam
func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error) {
shouldForceUpdate, err := tree.ndb.shouldForceFastStorageUpgrade()
if err != nil {
return false, err
}
isFastStorageEnabled := tree.ndb.hasUpgradedToFastStorage()

isUpgradeable, err := tree.IsUpgradeable()
if err != nil {
return false, err
Expand All @@ -642,22 +637,29 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
return false, nil
}

if isFastStorageEnabled && shouldForceUpdate {
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
// be worth to delete the fast nodes from disk.
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
defer fastItr.Close()
for ; fastItr.Valid(); fastItr.Next() {
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
// be worth to delete the fast nodes from disk.
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
defer fastItr.Close()
var deletedFastNodes uint64
for ; fastItr.Valid(); fastItr.Next() {
deletedFastNodes++
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
return false, err
}
if deletedFastNodes%commitGap == 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}
}

// Force garbage collection before we proceed to enabling fast storage.
runtime.GC()
if deletedFastNodes%commitGap != 0 {
if err := tree.ndb.Commit(); err != nil {
return false, err
}
}

if err := tree.enableFastStorageAndCommit(); err != nil {
tree.ndb.storageVersion = defaultStorageVersionValue
Expand All @@ -675,47 +677,17 @@ func (tree *MutableTree) enableFastStorageAndCommitLocked() error {
func (tree *MutableTree) enableFastStorageAndCommit() error {
var err error

// We start a new thread to keep on checking if we are above 4GB, and if so garbage collect.
// This thread only lasts during the fast node migration.
// This is done to keep RAM usage down.
done := make(chan struct{})
defer func() {
done <- struct{}{}
close(done)
}()

go func() {
timer := time.NewTimer(time.Second)
var m runtime.MemStats

for {
// Sample the current memory usage
runtime.ReadMemStats(&m)

if m.Alloc > 4*1024*1024*1024 {
// If we are using more than 4GB of memory, we should trigger garbage collection
// to free up some memory.
runtime.GC()
}

select {
case <-timer.C:
timer.Reset(time.Second)
case <-done:
if !timer.Stop() {
<-timer.C
}
return
}
}
}()

itr := NewIterator(nil, nil, true, tree.ImmutableTree)
defer itr.Close()
var upgradedFastNodes uint64
for ; itr.Valid(); itr.Next() {
upgradedFastNodes++
if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
return err
}
if upgradedFastNodes%commitGap == 0 {
tree.ndb.Commit()
}
}

if err = itr.Error(); err != nil {
Expand Down
116 changes: 109 additions & 7 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,12 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)

iterMock := mock.NewMockIterator(ctrl)
dbMock.EXPECT().Iterator(gomock.Any(), gomock.Any()).Return(iterMock, nil)
iterMock.EXPECT().Error()
iterMock.EXPECT().Valid().Times(2)
iterMock.EXPECT().Close()

batchMock.EXPECT().Set(gomock.Any(), gomock.Any()).Return(expectedError).Times(1)

tree, err := NewMutableTree(dbMock, 0)
Expand Down Expand Up @@ -943,7 +949,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

// dbMock represents the underlying database under the hood of nodeDB
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
startFormat := fastKeyFormat.Key()
endFormat := fastKeyFormat.Key()
Expand All @@ -964,8 +970,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
updatedExpectedStorageVersion[len(updatedExpectedStorageVersion)-1]++
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(1)
batchMock.EXPECT().Close().Return(nil).Times(1)
batchMock.EXPECT().Write().Return(nil).Times(2)
batchMock.EXPECT().Close().Return(nil).Times(2)

// iterMock is used to mock the underlying db iterator behing fast iterator
// Here, we want to mock the behavior of deleting fast nodes from disk when
Expand Down Expand Up @@ -1022,7 +1028,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_

func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testing.T) {
// Setup
tree, mirror := setupTreeAndMirrorForUpgrade(t)
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)

isFastCacheEnabled, err := tree.IsFastCacheEnabled()
require.NoError(t, err)
Expand Down Expand Up @@ -1089,7 +1095,7 @@ func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testi

func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T) {
// Setup
tree, mirror := setupTreeAndMirrorForUpgrade(t)
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)

isFastCacheEnabled, err := tree.IsFastCacheEnabled()
require.NoError(t, err)
Expand Down Expand Up @@ -1148,12 +1154,108 @@ func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T)
})
}

func setupTreeAndMirrorForUpgrade(t *testing.T) (*MutableTree, [][]string) {
func TestUpgradeStorageToFast_Success(t *testing.T) {
tmpCommitGap := commitGap
commitGap = 1000
defer func() {
commitGap = tmpCommitGap
}()

type fields struct {
nodeCount int
}
tests := []struct {
name string
fields fields
}{
{"less than commit gap", fields{nodeCount: 100}},
{"equal to commit gap", fields{nodeCount: int(commitGap)}},
{"great than commit gap", fields{nodeCount: int(commitGap) + 100}},
{"two times commit gap", fields{nodeCount: int(commitGap) * 2}},
{"two times plus commit gap", fields{nodeCount: int(commitGap)*2 + 1}},
}

for _, tt := range tests {
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
require.Nil(t, err)
require.True(t, enabled)
t.Run(tt.name, func(t *testing.T) {
i := 0
iter := NewFastIterator(nil, nil, true, tree.ndb)
for ; iter.Valid(); iter.Next() {
require.Equal(t, []byte(mirror[i][0]), iter.Key())
require.Equal(t, []byte(mirror[i][1]), iter.Value())
i++
}
require.Equal(t, len(mirror), i)
})
}
}

func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
// we delete fast node, in case of deadlock. we should limit the stale count lower than chBufferSize(64)
tmpCommitGap := commitGap
commitGap = 5
defer func() {
commitGap = tmpCommitGap
}()

valStale := "val_stale"
addStaleKey := func(ndb *nodeDB, staleCount int) {
var keyPrefix = "key"
for i := 0; i < staleCount; i++ {
key := fmt.Sprintf("%s_%d", keyPrefix, i)

node := NewFastNode([]byte(key), []byte(valStale), 100)
var buf bytes.Buffer
buf.Grow(node.encodedSize())
err := node.writeBytes(&buf)
require.NoError(t, err)
err = ndb.db.Set(ndb.fastNodeKey([]byte(key)), buf.Bytes())
require.NoError(t, err)
}
}
type fields struct {
nodeCount int
staleCount int
}

tests := []struct {
name string
fields fields
}{
{"stale less than commit gap", fields{nodeCount: 100, staleCount: 4}},
{"stale equal to commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)}},
{"stale great than commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap)*2 - 1}},
{"stale twice commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap) * 2}},
{"stale great than twice commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)*2 + 1}},
}

for _, tt := range tests {
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
addStaleKey(tree.ndb, tt.fields.staleCount)
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
require.Nil(t, err)
require.True(t, enabled)
t.Run(tt.name, func(t *testing.T) {
i := 0
iter := NewFastIterator(nil, nil, true, tree.ndb)
for ; iter.Valid(); iter.Next() {
require.Equal(t, []byte(mirror[i][0]), iter.Key())
require.Equal(t, []byte(mirror[i][1]), iter.Value())
i++
}
require.Equal(t, len(mirror), i)
})
}
}

func setupTreeAndMirrorForUpgrade(t *testing.T, numEntries int) (*MutableTree, [][]string) {
db := db.NewMemDB()

tree, _ := NewMutableTree(db, 0)

const numEntries = 100
var keyPrefix, valPrefix = "key", "val"

mirror := make([][]string, 0, numEntries)
Expand Down