Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Erigon 3: Purified states repressentation #13227

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 303 additions & 2 deletions cmd/integration/commands/state_domains.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ package commands

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"

"github.com/erigontech/erigon-lib/etl"
"github.com/erigontech/erigon-lib/seg"
state3 "github.com/erigontech/erigon-lib/state"

"github.com/spf13/cobra"
Expand All @@ -33,7 +40,9 @@ import (
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/datadir"
"github.com/erigontech/erigon-lib/common/length"
downloadertype "github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
kv2 "github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon/cmd/utils"
"github.com/erigontech/erigon/core"
Expand All @@ -52,12 +61,19 @@ func init() {
withStartTx(readDomains)

rootCmd.AddCommand(readDomains)

withDataDir(purifyDomains)
purifyDomains.Flags().StringVar(&purifyDir, "purifiedDomain", "purified-output", "")
purifyDomains.Flags().BoolVar(&purifyOnlyCommitment, "commitment", false, "purify only commitment domain")
rootCmd.AddCommand(purifyDomains)
}

// if trie variant is not hex, we could not have another rootHash with to verify it
var (
stepSize uint64
lastStep uint64
stepSize uint64
lastStep uint64
purifyDir string
purifyOnlyCommitment bool
)

// write command to just seek and query state by addr and domain from state db and files (if any)
Expand Down Expand Up @@ -120,6 +136,291 @@ var readDomains = &cobra.Command{
},
}

var purifyDomains = &cobra.Command{
Use: "purify_domains",
Short: `Regenerate kv files without repeating keys.`,
Example: "go run ./cmd/integration purify_domains --datadir=... --verbosity=3",
Args: cobra.ArbitraryArgs,
Run: func(cmd *cobra.Command, args []string) {
dirs := datadir.New(datadirCli)
// Iterate over all the files in dirs.SnapDomain and print them
domainDir := dirs.SnapDomain

// make a temporary dir
tmpDir, err := os.MkdirTemp(dirs.Tmp, "purifyTemp") // make a temporary dir to store the keys
if err != nil {
fmt.Println("Error creating temporary directory: ", err)
return
}
// make a temporary DB to store the keys

purifyDB := mdbx.MustOpen(tmpDir)
defer purifyDB.Close()
var purificationDomains []string
if purifyOnlyCommitment {
purificationDomains = []string{"commitment"}
} else {
purificationDomains = []string{"account", "storage" /*"code",*/, "commitment", "receipt"}
}
//purificationDomains := []string{"commitment"}
for _, domain := range purificationDomains {
if err := makePurifiableIndexDB(purifyDB, dirs, log.New(), domain); err != nil {
fmt.Println("Error making purifiable index DB: ", err)
return
}
}
for _, domain := range purificationDomains {
if err := makePurifiedDomainsIndexDB(purifyDB, dirs, log.New(), domain); err != nil {
fmt.Println("Error making purifiable index DB: ", err)
return
}
}
if err != nil {
fmt.Printf("error walking the path %q: %v\n", domainDir, err)
}
},
}

func makePurifiableIndexDB(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domain string) error {
var tbl string
switch domain {
case "account":
tbl = kv.MaxTxNum
case "storage":
tbl = kv.HeaderNumber
case "code":
tbl = kv.HeaderCanonical
case "commitment":
tbl = kv.HeaderTD
case "receipt":
tbl = kv.BadHeaderNumber
default:
return fmt.Errorf("invalid domain %s", domain)
}
// Iterate over all the files in dirs.SnapDomain and print them
filesNamesToIndex := []string{}
if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip directories
if info.IsDir() {
return nil
}
if !strings.Contains(info.Name(), domain) {
return nil
}
// Here you can decide if you only want to process certain file extensions
// e.g., .kv files
if filepath.Ext(path) != ".kv" {
// Skip non-kv files if that's your domain’s format
return nil
}

fmt.Printf("Add file to indexing of %s: %s\n", domain, path)

filesNamesToIndex = append(filesNamesToIndex, info.Name())
return nil
}); err != nil {
return fmt.Errorf("failed to walk through the domainDir %s: %w", domain, err)
}

collector := etl.NewCollector("Purification", dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize), logger)
defer collector.Close()
// sort the files by name
sort.Slice(filesNamesToIndex, func(i, j int) bool {
res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[i])
if !ok {
panic("invalid file name")
}
res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToIndex[j])
if !ok {
panic("invalid file name")
}
return res.From < res2.From
})
tx, err := db.BeginRw(context.Background())
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()

// now start the file indexing
for i, fileName := range filesNamesToIndex {
if i == 0 {
continue // we can skip first layer as all the keys are already mapped to 0.
}
layerBytes := make([]byte, 4)
binary.BigEndian.PutUint32(layerBytes, uint32(i))
count := 0

dec, err := seg.NewDecompressor(path.Join(dirs.SnapDomain, fileName))
if err != nil {
return fmt.Errorf("failed to create decompressor: %w", err)
}
defer dec.Close()
getter := dec.MakeGetter()
fmt.Printf("Indexing file %s\n", fileName)
var buf []byte
for getter.HasNext() {
buf = buf[:0]
buf, _ = getter.Next(buf)

collector.Collect(buf, layerBytes)
count++
//fmt.Println("count: ", count, "keyLength: ", len(buf))
if count%100000 == 0 {
fmt.Printf("Indexed %d keys in file %s\n", count, fileName)
}
// skip values
getter.Skip()
}
fmt.Printf("Indexed %d keys in file %s\n", count, fileName)
}
fmt.Println("Loading the keys to DB")
if err := collector.Load(tx, tbl, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return fmt.Errorf("failed to load: %w", err)
}

return tx.Commit()
}

func makePurifiedDomainsIndexDB(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domain string) error {
var tbl string
compressionType := seg.CompressNone
switch domain {
case "account":
tbl = kv.MaxTxNum
case "storage":
compressionType = seg.CompressKeys
tbl = kv.HeaderNumber
case "code":
compressionType = seg.CompressVals
tbl = kv.HeaderCanonical
case "commitment":
tbl = kv.HeaderTD
case "receipt":
tbl = kv.BadHeaderNumber
default:
return fmt.Errorf("invalid domain %s", domain)
}
// Iterate over all the files in dirs.SnapDomain and print them
filesNamesToPurify := []string{}
if err := filepath.Walk(dirs.SnapDomain, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip directories
if info.IsDir() {
return nil
}
if !strings.Contains(info.Name(), domain) {
return nil
}
// Here you can decide if you only want to process certain file extensions
// e.g., .kv files
if filepath.Ext(path) != ".kv" {
// Skip non-kv files if that's your domain’s format
return nil
}

fmt.Printf("Add file to purification of %s: %s\n", domain, path)

filesNamesToPurify = append(filesNamesToPurify, info.Name())
return nil
}); err != nil {
return fmt.Errorf("failed to walk through the domainDir %s: %w", domain, err)
}
// sort the files by name
sort.Slice(filesNamesToPurify, func(i, j int) bool {
res, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[i])
if !ok {
panic("invalid file name")
}
res2, ok, _ := downloadertype.ParseFileName(dirs.SnapDomain, filesNamesToPurify[j])
if !ok {
panic("invalid file name")
}
return res.From < res2.From
})

tx, err := db.BeginRo(context.Background())
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback()
outD := datadir.New(purifyDir)
compressCfg := seg.DefaultCfg
compressCfg.Workers = runtime.NumCPU()
// now start the file indexing
for currentLayer, fileName := range filesNamesToPurify {
count := 0
skipped := 0

dec, err := seg.NewDecompressor(path.Join(dirs.SnapDomain, fileName))
if err != nil {
return fmt.Errorf("failed to create decompressor: %w", err)
}
defer dec.Close()
getter := dec.MakeGetter()

valuesComp, err := seg.NewCompressor(context.Background(), "Purification", path.Join(outD.SnapDomain, fileName), dirs.Tmp, compressCfg, log.LvlTrace, log.New())
if err != nil {
return fmt.Errorf("create %s values compressor: %w", path.Join(outD.SnapDomain, fileName), err)
}

comp := seg.NewWriter(valuesComp, compressionType)
defer comp.Close()

fmt.Printf("Indexing file %s\n", fileName)
var (
bufKey []byte
bufVal []byte
)

var layer uint32
for getter.HasNext() {
// get the key and value for the current entry
bufKey = bufKey[:0]
bufKey, _ = getter.Next(bufKey)
bufVal = bufVal[:0]
bufVal, _ = getter.Next(bufVal)

layerBytes, err := tx.GetOne(tbl, bufKey)
if err != nil {
return fmt.Errorf("failed to get key %x: %w", bufKey, err)
}
// if the key is not found, then the layer is 0
layer = 0
if len(layerBytes) == 4 {
layer = binary.BigEndian.Uint32(layerBytes)
}
if layer != uint32(currentLayer) {
skipped++
continue
}
if err := comp.AddWord(bufKey); err != nil {
return fmt.Errorf("failed to add key %x: %w", bufKey, err)
}
if err := comp.AddWord(bufVal); err != nil {
return fmt.Errorf("failed to add val %x: %w", bufVal, err)
}
count++
if count%100000 == 0 {
fmt.Printf("Indexed %d keys, skipped %d, in file %s\n", count, skipped, fileName)
}
}

fmt.Printf("Loaded %d keys in file %s. now compressing...\n", count, fileName)
if err := comp.Compress(); err != nil {
return fmt.Errorf("failed to compress: %w", err)
}
fmt.Printf("Compressed %d keys in file %s\n", count, fileName)
comp.Close()
}
return nil
}

func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain string, addrs [][]byte, logger log.Logger) error {
sn, bsn, agg, _, _, _ := allSnapshots(ctx, chainDb, logger)
defer sn.Close()
Expand Down
Loading