@@ -19,7 +19,6 @@ import (
19
19
20
20
"github.com/ava-labs/avalanchego/database"
21
21
"github.com/ava-labs/avalanchego/database/prefixdb"
22
- "github.com/ava-labs/avalanchego/database/versiondb"
23
22
"github.com/ava-labs/avalanchego/ids"
24
23
"github.com/ava-labs/avalanchego/trace"
25
24
"github.com/ava-labs/avalanchego/utils"
@@ -29,8 +28,8 @@ import (
29
28
)
30
29
31
30
const (
32
- RootPath = EmptyPath
33
-
31
+ RootPath = EmptyPath
32
+ evictionBatchSize = 100
34
33
// TODO: name better
35
34
rebuildViewSizeFractionOfCacheSize = 50
36
35
minRebuildViewSizePerCommit = 1000
@@ -139,9 +138,7 @@ type merkleDB struct {
139
138
// Should be held before taking [db.lock]
140
139
commitLock sync.RWMutex
141
140
142
- // versiondb that the other dbs are built on.
143
- // Allows the changes made to the snapshot and [nodeDB] to be atomic.
144
- nodeDB * versiondb.Database
141
+ nodeDB database.Database
145
142
146
143
// Stores data about the database's current state.
147
144
metadataDB database.Database
@@ -176,7 +173,7 @@ func newDatabase(
176
173
) (* merkleDB , error ) {
177
174
trieDB := & merkleDB {
178
175
metrics : metrics ,
179
- nodeDB : versiondb . New ( prefixdb .New (nodePrefix , db ) ),
176
+ nodeDB : prefixdb .New (nodePrefix , db ),
180
177
metadataDB : prefixdb .New (metadataPrefix , db ),
181
178
history : newTrieHistory (config .HistoryLength ),
182
179
tracer : config .Tracer ,
@@ -265,8 +262,7 @@ func (db *merkleDB) rebuild(ctx context.Context) error {
265
262
return err
266
263
}
267
264
currentViewSize ++
268
- }
269
- if err := db .nodeDB .Delete (key ); err != nil {
265
+ } else if err := db .nodeDB .Delete (key ); err != nil {
270
266
return err
271
267
}
272
268
}
@@ -354,10 +350,6 @@ func (db *merkleDB) Close() error {
354
350
return err
355
351
}
356
352
357
- if err := db .nodeDB .Commit (); err != nil {
358
- return err
359
- }
360
-
361
353
// Successfully wrote intermediate nodes.
362
354
return db .metadataDB .Put (cleanShutdownKey , hadCleanShutdown )
363
355
}
@@ -749,23 +741,42 @@ func (db *merkleDB) NewIteratorWithStartAndPrefix(start, prefix []byte) database
749
741
// the movement of [node] from [db.nodeCache] to [db.nodeDB] is atomic.
750
742
// As soon as [db.nodeCache] no longer has [node], [db.nodeDB] does.
751
743
// Non-nil error is fatal -- causes [db] to close.
752
- func (db * merkleDB ) onEviction (node * node ) error {
753
- if node == nil || node . hasValue () {
754
- // only persist intermediary nodes
744
+ func (db * merkleDB ) onEviction (n * node ) error {
745
+ // the evicted node isn't an intermediary node, so skip writing.
746
+ if n == nil || n . hasValue () {
755
747
return nil
756
748
}
757
749
758
- nodeBytes , err := node .marshal ()
759
- if err != nil {
760
- db .onEvictionErr .Set (err )
761
- // Prevent reads/writes from/to [db.nodeDB] to avoid inconsistent state.
762
- _ = db .nodeDB .Close ()
763
- // This is a fatal error.
764
- go db .Close ()
750
+ batch := db .nodeDB .NewBatch ()
751
+ if err := writeNodeToBatch (batch , n ); err != nil {
765
752
return err
766
753
}
767
754
768
- if err := db .nodeDB .Put (node .key .Bytes (), nodeBytes ); err != nil {
755
+ // Evict the oldest [evictionBatchSize] nodes from the cache
756
+ // and write them to disk. We write a batch of them, rather than
757
+ // just [n], so that we don't immediately evict and write another
758
+ // node, because each time this method is called we do a disk write.
759
+ var err error
760
+ for removedCount := 0 ; removedCount < evictionBatchSize ; removedCount ++ {
761
+ _ , n , exists := db .nodeCache .removeOldest ()
762
+ if ! exists {
763
+ // The cache is empty.
764
+ break
765
+ }
766
+ if n == nil || n .hasValue () {
767
+ // only persist intermediary nodes
768
+ continue
769
+ }
770
+ // Note this must be = not := since we check
771
+ // [err] outside the loop.
772
+ if err = writeNodeToBatch (batch , n ); err != nil {
773
+ break
774
+ }
775
+ }
776
+ if err == nil {
777
+ err = batch .Write ()
778
+ }
779
+ if err != nil {
769
780
db .onEvictionErr .Set (err )
770
781
_ = db .nodeDB .Close ()
771
782
go db .Close ()
@@ -774,6 +785,16 @@ func (db *merkleDB) onEviction(node *node) error {
774
785
return nil
775
786
}
776
787
788
+ // Writes [n] to [batch]. Assumes [n] is non-nil.
789
+ func writeNodeToBatch (batch database.Batch , n * node ) error {
790
+ nodeBytes , err := n .marshal ()
791
+ if err != nil {
792
+ return err
793
+ }
794
+
795
+ return batch .Put (n .key .Bytes (), nodeBytes )
796
+ }
797
+
777
798
// Put upserts the key/value pair into the db.
778
799
func (db * merkleDB ) Put (k , v []byte ) error {
779
800
return db .Insert (context .Background (), k , v )
@@ -859,19 +880,13 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
859
880
return errNoNewRoot
860
881
}
861
882
862
- // commit any outstanding cache evicted nodes.
863
- // Note that we do this here because below we may Abort
864
- // [db.nodeDB], which would cause us to lose these changes.
865
- if err := db .nodeDB .Commit (); err != nil {
866
- return err
867
- }
883
+ batch := db .nodeDB .NewBatch ()
868
884
869
885
_ , nodesSpan := db .tracer .Start (ctx , "MerkleDB.commitChanges.writeNodes" )
870
886
for key , nodeChange := range changes .nodes {
871
887
if nodeChange .after == nil {
872
888
db .metrics .IOKeyWrite ()
873
- if err := db .nodeDB .Delete (key .Bytes ()); err != nil {
874
- db .nodeDB .Abort ()
889
+ if err := batch .Delete (key .Bytes ()); err != nil {
875
890
nodesSpan .End ()
876
891
return err
877
892
}
@@ -883,15 +898,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
883
898
// Otherwise, intermediary nodes are persisted on cache eviction or
884
899
// shutdown.
885
900
db .metrics .IOKeyWrite ()
886
- nodeBytes , err := nodeChange .after .marshal ()
887
- if err != nil {
888
- db .nodeDB .Abort ()
889
- nodesSpan .End ()
890
- return err
891
- }
892
-
893
- if err := db .nodeDB .Put (key .Bytes (), nodeBytes ); err != nil {
894
- db .nodeDB .Abort ()
901
+ if err := writeNodeToBatch (batch , nodeChange .after ); err != nil {
895
902
nodesSpan .End ()
896
903
return err
897
904
}
@@ -900,10 +907,9 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *trieView) e
900
907
nodesSpan .End ()
901
908
902
909
_ , commitSpan := db .tracer .Start (ctx , "MerkleDB.commitChanges.dbCommit" )
903
- err := db . nodeDB . Commit ()
910
+ err := batch . Write ()
904
911
commitSpan .End ()
905
912
if err != nil {
906
- db .nodeDB .Abort ()
907
913
return err
908
914
}
909
915
@@ -1122,11 +1128,13 @@ func (db *merkleDB) initializeRootIfNeeded() (ids.ID, error) {
1122
1128
if err != nil {
1123
1129
return ids .Empty , err
1124
1130
}
1125
- if err := db .nodeDB .Put (rootKey , rootBytes ); err != nil {
1131
+
1132
+ batch := db .nodeDB .NewBatch ()
1133
+ if err := batch .Put (rootKey , rootBytes ); err != nil {
1126
1134
return ids .Empty , err
1127
1135
}
1128
1136
1129
- return db .root .id , db . nodeDB . Commit ()
1137
+ return db .root .id , batch . Write ()
1130
1138
}
1131
1139
1132
1140
// Returns a view of the trie as it was when it had root [rootID] for keys within range [start, end].
0 commit comments