Skip to content

Commit 1244f6b

Browse files
tac0turtlegiskook
andauthored
add batches (#564)
Co-authored-by: zhangkai <zhangkai.gis@163.com>
1 parent e25a061 commit 1244f6b

File tree

2 files changed

+136
-62
lines changed

2 files changed

+136
-62
lines changed

mutable_tree.go

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ import (
44
"bytes"
55
"crypto/sha256"
66
"fmt"
7-
"runtime"
87
"sort"
98
"sync"
10-
"time"
119

1210
"github.com/pkg/errors"
1311
dbm "github.com/tendermint/tm-db"
1412

1513
"github.com/cosmos/iavl/internal/logger"
1614
)
1715

16+
// commitGap after upgrade/delete commitGap FastNodes when commit the batch
17+
var commitGap uint64 = 5000000
18+
1819
// ErrVersionDoesNotExist is returned if a requested version does not exist.
1920
var ErrVersionDoesNotExist = errors.New("version does not exist")
2021

@@ -627,12 +628,6 @@ func (tree *MutableTree) IsUpgradeable() (bool, error) {
627628
// from latest tree.
628629
// nolint: unparam
629630
func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error) {
630-
shouldForceUpdate, err := tree.ndb.shouldForceFastStorageUpgrade()
631-
if err != nil {
632-
return false, err
633-
}
634-
isFastStorageEnabled := tree.ndb.hasUpgradedToFastStorage()
635-
636631
isUpgradeable, err := tree.IsUpgradeable()
637632
if err != nil {
638633
return false, err
@@ -642,22 +637,29 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
642637
return false, nil
643638
}
644639

645-
if isFastStorageEnabled && shouldForceUpdate {
646-
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
647-
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
648-
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
649-
// be worth to delete the fast nodes from disk.
650-
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
651-
defer fastItr.Close()
652-
for ; fastItr.Valid(); fastItr.Next() {
653-
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
640+
// If there is a mismatch between which fast nodes are on disk and the live state due to temporary
641+
// downgrade and subsequent re-upgrade, we cannot know for sure which fast nodes have been removed while downgraded,
642+
// Therefore, there might exist stale fast nodes on disk. As a result, to avoid persisting the stale state, it might
643+
// be worth to delete the fast nodes from disk.
644+
fastItr := NewFastIterator(nil, nil, true, tree.ndb)
645+
defer fastItr.Close()
646+
var deletedFastNodes uint64
647+
for ; fastItr.Valid(); fastItr.Next() {
648+
deletedFastNodes++
649+
if err := tree.ndb.DeleteFastNode(fastItr.Key()); err != nil {
650+
return false, err
651+
}
652+
if deletedFastNodes%commitGap == 0 {
653+
if err := tree.ndb.Commit(); err != nil {
654654
return false, err
655655
}
656656
}
657657
}
658-
659-
// Force garbage collection before we proceed to enabling fast storage.
660-
runtime.GC()
658+
if deletedFastNodes%commitGap != 0 {
659+
if err := tree.ndb.Commit(); err != nil {
660+
return false, err
661+
}
662+
}
661663

662664
if err := tree.enableFastStorageAndCommit(); err != nil {
663665
tree.ndb.storageVersion = defaultStorageVersionValue
@@ -675,47 +677,17 @@ func (tree *MutableTree) enableFastStorageAndCommitLocked() error {
675677
func (tree *MutableTree) enableFastStorageAndCommit() error {
676678
var err error
677679

678-
// We start a new thread to keep on checking if we are above 4GB, and if so garbage collect.
679-
// This thread only lasts during the fast node migration.
680-
// This is done to keep RAM usage down.
681-
done := make(chan struct{})
682-
defer func() {
683-
done <- struct{}{}
684-
close(done)
685-
}()
686-
687-
go func() {
688-
timer := time.NewTimer(time.Second)
689-
var m runtime.MemStats
690-
691-
for {
692-
// Sample the current memory usage
693-
runtime.ReadMemStats(&m)
694-
695-
if m.Alloc > 4*1024*1024*1024 {
696-
// If we are using more than 4GB of memory, we should trigger garbage collection
697-
// to free up some memory.
698-
runtime.GC()
699-
}
700-
701-
select {
702-
case <-timer.C:
703-
timer.Reset(time.Second)
704-
case <-done:
705-
if !timer.Stop() {
706-
<-timer.C
707-
}
708-
return
709-
}
710-
}
711-
}()
712-
713680
itr := NewIterator(nil, nil, true, tree.ImmutableTree)
714681
defer itr.Close()
682+
var upgradedFastNodes uint64
715683
for ; itr.Valid(); itr.Next() {
684+
upgradedFastNodes++
716685
if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
717686
return err
718687
}
688+
if upgradedFastNodes%commitGap == 0 {
689+
tree.ndb.Commit()
690+
}
719691
}
720692

721693
if err = itr.Error(); err != nil {

mutable_tree_test.go

Lines changed: 109 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,12 @@ func TestUpgradeStorageToFast_DbErrorEnableFastStorage_Failure(t *testing.T) {
855855
dbMock.EXPECT().NewBatch().Return(batchMock).Times(1)
856856
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1)
857857

858+
iterMock := mock.NewMockIterator(ctrl)
859+
dbMock.EXPECT().Iterator(gomock.Any(), gomock.Any()).Return(iterMock, nil)
860+
iterMock.EXPECT().Error()
861+
iterMock.EXPECT().Valid().Times(2)
862+
iterMock.EXPECT().Close()
863+
858864
batchMock.EXPECT().Set(gomock.Any(), gomock.Any()).Return(expectedError).Times(1)
859865

860866
tree, err := NewMutableTree(dbMock, 0)
@@ -943,7 +949,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
943949

944950
// dbMock represents the underlying database under the hood of nodeDB
945951
dbMock.EXPECT().Get(gomock.Any()).Return(expectedStorageVersion, nil).Times(1)
946-
dbMock.EXPECT().NewBatch().Return(batchMock).Times(2)
952+
dbMock.EXPECT().NewBatch().Return(batchMock).Times(3)
947953
dbMock.EXPECT().ReverseIterator(gomock.Any(), gomock.Any()).Return(rIterMock, nil).Times(1) // called to get latest version
948954
startFormat := fastKeyFormat.Key()
949955
endFormat := fastKeyFormat.Key()
@@ -964,8 +970,8 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
964970
updatedExpectedStorageVersion[len(updatedExpectedStorageVersion)-1]++
965971
batchMock.EXPECT().Delete(fastKeyFormat.Key(fastNodeKeyToDelete)).Return(nil).Times(1)
966972
batchMock.EXPECT().Set(metadataKeyFormat.Key([]byte(storageVersionKey)), updatedExpectedStorageVersion).Return(nil).Times(1)
967-
batchMock.EXPECT().Write().Return(nil).Times(1)
968-
batchMock.EXPECT().Close().Return(nil).Times(1)
973+
batchMock.EXPECT().Write().Return(nil).Times(2)
974+
batchMock.EXPECT().Close().Return(nil).Times(2)
969975

970976
// iterMock is used to mock the underlying db iterator behing fast iterator
971977
// Here, we want to mock the behavior of deleting fast nodes from disk when
@@ -1022,7 +1028,7 @@ func TestFastStorageReUpgradeProtection_ForceUpgradeFirstTime_NoForceSecondTime_
10221028

10231029
func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testing.T) {
10241030
// Setup
1025-
tree, mirror := setupTreeAndMirrorForUpgrade(t)
1031+
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)
10261032

10271033
isFastCacheEnabled, err := tree.IsFastCacheEnabled()
10281034
require.NoError(t, err)
@@ -1089,7 +1095,7 @@ func TestUpgradeStorageToFast_Integration_Upgraded_FastIterator_Success(t *testi
10891095

10901096
func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T) {
10911097
// Setup
1092-
tree, mirror := setupTreeAndMirrorForUpgrade(t)
1098+
tree, mirror := setupTreeAndMirrorForUpgrade(t, 100)
10931099

10941100
isFastCacheEnabled, err := tree.IsFastCacheEnabled()
10951101
require.NoError(t, err)
@@ -1148,12 +1154,108 @@ func TestUpgradeStorageToFast_Integration_Upgraded_GetFast_Success(t *testing.T)
11481154
})
11491155
}
11501156

1151-
func setupTreeAndMirrorForUpgrade(t *testing.T) (*MutableTree, [][]string) {
1157+
func TestUpgradeStorageToFast_Success(t *testing.T) {
1158+
tmpCommitGap := commitGap
1159+
commitGap = 1000
1160+
defer func() {
1161+
commitGap = tmpCommitGap
1162+
}()
1163+
1164+
type fields struct {
1165+
nodeCount int
1166+
}
1167+
tests := []struct {
1168+
name string
1169+
fields fields
1170+
}{
1171+
{"less than commit gap", fields{nodeCount: 100}},
1172+
{"equal to commit gap", fields{nodeCount: int(commitGap)}},
1173+
{"great than commit gap", fields{nodeCount: int(commitGap) + 100}},
1174+
{"two times commit gap", fields{nodeCount: int(commitGap) * 2}},
1175+
{"two times plus commit gap", fields{nodeCount: int(commitGap)*2 + 1}},
1176+
}
1177+
1178+
for _, tt := range tests {
1179+
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
1180+
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
1181+
require.Nil(t, err)
1182+
require.True(t, enabled)
1183+
t.Run(tt.name, func(t *testing.T) {
1184+
i := 0
1185+
iter := NewFastIterator(nil, nil, true, tree.ndb)
1186+
for ; iter.Valid(); iter.Next() {
1187+
require.Equal(t, []byte(mirror[i][0]), iter.Key())
1188+
require.Equal(t, []byte(mirror[i][1]), iter.Value())
1189+
i++
1190+
}
1191+
require.Equal(t, len(mirror), i)
1192+
})
1193+
}
1194+
}
1195+
1196+
func TestUpgradeStorageToFast_Delete_Stale_Success(t *testing.T) {
1197+
// we delete fast node, in case of deadlock. we should limit the stale count lower than chBufferSize(64)
1198+
tmpCommitGap := commitGap
1199+
commitGap = 5
1200+
defer func() {
1201+
commitGap = tmpCommitGap
1202+
}()
1203+
1204+
valStale := "val_stale"
1205+
addStaleKey := func(ndb *nodeDB, staleCount int) {
1206+
var keyPrefix = "key"
1207+
for i := 0; i < staleCount; i++ {
1208+
key := fmt.Sprintf("%s_%d", keyPrefix, i)
1209+
1210+
node := NewFastNode([]byte(key), []byte(valStale), 100)
1211+
var buf bytes.Buffer
1212+
buf.Grow(node.encodedSize())
1213+
err := node.writeBytes(&buf)
1214+
require.NoError(t, err)
1215+
err = ndb.db.Set(ndb.fastNodeKey([]byte(key)), buf.Bytes())
1216+
require.NoError(t, err)
1217+
}
1218+
}
1219+
type fields struct {
1220+
nodeCount int
1221+
staleCount int
1222+
}
1223+
1224+
tests := []struct {
1225+
name string
1226+
fields fields
1227+
}{
1228+
{"stale less than commit gap", fields{nodeCount: 100, staleCount: 4}},
1229+
{"stale equal to commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)}},
1230+
{"stale great than commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap)*2 - 1}},
1231+
{"stale twice commit gap", fields{nodeCount: int(commitGap) + 100, staleCount: int(commitGap) * 2}},
1232+
{"stale great than twice commit gap", fields{nodeCount: int(commitGap), staleCount: int(commitGap)*2 + 1}},
1233+
}
1234+
1235+
for _, tt := range tests {
1236+
tree, mirror := setupTreeAndMirrorForUpgrade(t, tt.fields.nodeCount)
1237+
addStaleKey(tree.ndb, tt.fields.staleCount)
1238+
enabled, err := tree.enableFastStorageAndCommitIfNotEnabled()
1239+
require.Nil(t, err)
1240+
require.True(t, enabled)
1241+
t.Run(tt.name, func(t *testing.T) {
1242+
i := 0
1243+
iter := NewFastIterator(nil, nil, true, tree.ndb)
1244+
for ; iter.Valid(); iter.Next() {
1245+
require.Equal(t, []byte(mirror[i][0]), iter.Key())
1246+
require.Equal(t, []byte(mirror[i][1]), iter.Value())
1247+
i++
1248+
}
1249+
require.Equal(t, len(mirror), i)
1250+
})
1251+
}
1252+
}
1253+
1254+
func setupTreeAndMirrorForUpgrade(t *testing.T, numEntries int) (*MutableTree, [][]string) {
11521255
db := db.NewMemDB()
11531256

11541257
tree, _ := NewMutableTree(db, 0)
11551258

1156-
const numEntries = 100
11571259
var keyPrefix, valPrefix = "key", "val"
11581260

11591261
mirror := make([][]string, 0, numEntries)

0 commit comments

Comments
 (0)