Skip to content

Commit

Permalink
feat: adaptive for loading journal file or journal kv during loadJour…
Browse files Browse the repository at this point in the history
…nal (#2406)

* core: check journalType before load journal
* fix: when delete trieJournal delete from kv & file
  • Loading branch information
jingjunLi authored Apr 19, 2024
1 parent 4b54601 commit d653cda
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 39 deletions.
2 changes: 2 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type CacheConfig struct {
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.
JournalFilePath string
JournalFile bool

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -190,6 +191,7 @@ func (c *CacheConfig) triedbConfig() *triedb.Config {
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
JournalFilePath: c.JournalFilePath,
JournalFile: c.JournalFile,
}
}
return config
Expand Down
15 changes: 7 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}

// Assemble the Ethereum object
chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config)
chainDb, err := stack.OpenAndMergeDatabase(ChainData, ChainDBNamespace, false, config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,14 +255,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
journalFilePath string
path string
)
if config.JournalFileEnabled {
if stack.CheckIfMultiDataBase() {
path = ChainData + "/state"
} else {
path = ChainData
}
journalFilePath = stack.ResolvePath(path) + "/" + JournalFileName
if stack.CheckIfMultiDataBase() {
path = ChainData + "/state"
} else {
path = ChainData
}
journalFilePath = stack.ResolvePath(path) + "/" + JournalFileName
var (
vmConfig = vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
Expand All @@ -281,6 +279,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateScheme: config.StateScheme,
PathSyncFlush: config.PathSyncFlush,
JournalFilePath: journalFilePath,
JournalFile: config.JournalFileEnabled,
}
)
bcOps := make([]core.BlockChainOption, 0)
Expand Down
53 changes: 39 additions & 14 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ const (
DefaultBatchRedundancyRate = 1.1
)

type JournalType int

const (
JournalKVType JournalType = iota
JournalFileType
)

// layer is the interface implemented by all state layers which includes some
// public methods and some additional methods for internal usage.
type layer interface {
Expand Down Expand Up @@ -92,7 +99,7 @@ type layer interface {
// journal commits an entire diff hierarchy to disk into a single journal entry.
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer, journalFile bool) error
journal(w io.Writer, journalType JournalType) error
}

// Config contains the settings for database.
Expand All @@ -104,6 +111,7 @@ type Config struct {
ReadOnly bool // Flag whether the database is opened in read only mode.
NoTries bool
JournalFilePath string
JournalFile bool
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -527,23 +535,40 @@ func (db *Database) GetAllRooHash() [][]string {
return data
}

func (db *Database) IsEnableJournalFile() bool {
return len(db.config.JournalFilePath) != 0
// DetermineJournalTypeForWriter is used when persisting the journal. It determines JournalType based on the config passed in by the Config.
func (db *Database) DetermineJournalTypeForWriter() JournalType {
if db.config.JournalFile {
return JournalFileType
} else {
return JournalKVType
}
}

// DetermineJournalTypeForReader is used when loading the journal. It loads based on whether JournalKV or JournalFile currently exists.
func (db *Database) DetermineJournalTypeForReader() JournalType {
if journal := rawdb.ReadTrieJournal(db.diskdb); len(journal) != 0 {
return JournalKVType
}

if fileInfo, stateErr := os.Stat(db.config.JournalFilePath); stateErr == nil && !fileInfo.IsDir() {
return JournalFileType
}

return JournalKVType
}

func (db *Database) DeleteTrieJournal(writer ethdb.KeyValueWriter) error {
// To prevent any remnants of old journals after converting from JournalKV to JournalFile or vice versa, all deletions must be completed.
rawdb.DeleteTrieJournal(writer)

// delete from journal file, may not exist
filePath := db.config.JournalFilePath
if len(filePath) == 0 {
rawdb.DeleteTrieJournal(writer)
} else {
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return err
}
errRemove := os.Remove(filePath)
if errRemove != nil {
log.Crit("Failed to remove tries journal", "journal path", filePath, "err", err)
}
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil
}
errRemove := os.Remove(filePath)
if errRemove != nil {
log.Crit("Failed to remove tries journal", "journal path", filePath, "err", errRemove)
}
return nil
}
2 changes: 1 addition & 1 deletion triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,6 @@ func BenchmarkJournal(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
layer.journal(new(bytes.Buffer), false)
layer.journal(new(bytes.Buffer), JournalKVType)
}
}
34 changes: 18 additions & 16 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ func (kr *JournalKVReader) Read(p []byte) (n int, err error) {
func (kr *JournalKVReader) Close() {
}

func newJournalWriter(file string, db ethdb.Database) JournalWriter {
if len(file) == 0 {
func newJournalWriter(file string, db ethdb.Database, journalType JournalType) JournalWriter {
log.Info("New journal writer", "path", file, "journalType", journalType)
if journalType == JournalKVType {
return &JournalKVWriter{
diskdb: db,
}
Expand All @@ -167,8 +168,9 @@ func newJournalWriter(file string, db ethdb.Database) JournalWriter {
}
}

func newJournalReader(file string, db ethdb.Database) (JournalReader, error) {
if len(file) == 0 {
func newJournalReader(file string, db ethdb.Database, journalType JournalType) (JournalReader, error) {
log.Info("New journal reader", "path", file, "journalType", journalType)
if journalType == JournalKVType {
journal := rawdb.ReadTrieJournal(db)
if len(journal) == 0 {
return nil, errMissJournal
Expand All @@ -193,7 +195,7 @@ func newJournalReader(file string, db ethdb.Database) (JournalReader, error) {
// loadJournal tries to parse the layer journal from the disk.
func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
start := time.Now()
reader, err := newJournalReader(db.config.JournalFilePath, db.diskdb)
reader, err := newJournalReader(db.config.JournalFilePath, db.diskdb, db.DetermineJournalTypeForReader())

if err != nil {
return nil, err
Expand Down Expand Up @@ -267,7 +269,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
if err := r.Decode(&journalEncodedBuff); err != nil {
return nil, fmt.Errorf("load disk journal: %v", err)
}
Expand Down Expand Up @@ -308,7 +310,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}

if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
Expand All @@ -334,7 +336,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
journalBuf *rlp.Stream
journalEncodedBuff []byte
)
if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
if err := r.Decode(&journalEncodedBuff); err != nil {
// The first read may fail with EOF, marking the end of the journal
if err == io.EOF {
Expand Down Expand Up @@ -407,7 +409,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {
storages[entry.Account] = set
}

if db.IsEnableJournalFile() {
if db.DetermineJournalTypeForReader() == JournalFileType {
var shaSum [32]byte
if err := r.Decode(&shaSum); err != nil {
return nil, fmt.Errorf("load shasum: %v", err)
Expand All @@ -426,7 +428,7 @@ func (db *Database) loadDiffLayer(parent layer, r *rlp.Stream) (layer, error) {

// journal implements the layer interface, marshaling the un-flushed trie nodes
// along with layer metadata into provided byte buffer.
func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

Expand Down Expand Up @@ -460,7 +462,7 @@ func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {
}

// Store the journal buf into w and calculate checksum
if journalFile {
if journalType == JournalFileType {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
Expand All @@ -480,12 +482,12 @@ func (dl *diskLayer) journal(w io.Writer, journalFile bool) error {

// journal implements the layer interface, writing the memory layer contents
// into a buffer to be stored in the database as the layer journal.
func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
func (dl *diffLayer) journal(w io.Writer, journalType JournalType) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

// journal the parent first
if err := dl.parent.journal(w, journalFile); err != nil {
if err := dl.parent.journal(w, journalType); err != nil {
return err
}
// Create a buffer to store encoded data
Expand Down Expand Up @@ -535,7 +537,7 @@ func (dl *diffLayer) journal(w io.Writer, journalFile bool) error {
}

// Store the journal buf into w and calculate checksum
if journalFile {
if journalType == JournalFileType {
shasum := sha256.Sum256(journalBuf.Bytes())
if err := rlp.Encode(w, journalBuf.Bytes()); err != nil {
return err
Expand Down Expand Up @@ -583,7 +585,7 @@ func (db *Database) Journal(root common.Hash) error {
}
// Firstly write out the metadata of journal
db.DeleteTrieJournal(db.diskdb)
journal := newJournalWriter(db.config.JournalFilePath, db.diskdb)
journal := newJournalWriter(db.config.JournalFilePath, db.diskdb, db.DetermineJournalTypeForWriter())
defer journal.Close()

if err := rlp.Encode(journal, journalVersion); err != nil {
Expand All @@ -600,7 +602,7 @@ func (db *Database) Journal(root common.Hash) error {
return err
}
// Finally write out the journal of each layer in reverse order.
if err := l.journal(journal, db.IsEnableJournalFile()); err != nil {
if err := l.journal(journal, db.DetermineJournalTypeForWriter()); err != nil {
return err
}
// Store the journal into the database and return
Expand Down

0 comments on commit d653cda

Please sign in to comment.