Skip to content

Commit

Permalink
Hack: dump bodies and headers (#2994)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Nov 20, 2021
1 parent 2a08dbd commit d4850b6
Show file tree
Hide file tree
Showing 5 changed files with 470 additions and 124 deletions.
197 changes: 81 additions & 116 deletions cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"flag"
"fmt"
"io"
"io/fs"
"io/ioutil"
"math/big"
"net/http"
_ "net/http/pprof" //nolint:gosec
"os"
"os/signal"
"path"
"runtime"
"runtime/pprof"
"sort"
Expand Down Expand Up @@ -1586,31 +1588,30 @@ const maxPatternLen = 64
// minPatternScore is minimum score (per superstring) required to consider including pattern into the dictionary
const minPatternScore = 1024

func compress1(chaindata string, name string) error {
func compress1(chaindata string, fileName, segmentFileName string) error {
database := mdbx.MustOpen(chaindata)
defer database.Close()
chainConfig := tool.ChainConfigFromDB(database)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

// Read keys from the file and generate superstring (with extra byte 0x1 prepended to each character, and with 0x0 0x0 pair inserted between keys and values)
// We only consider values with length > 2, because smaller values are not compressible without going into bits
var superstring []byte

workers := runtime.NumCPU() / 2
// Collector for dictionary words (sorted by their score)
tmpDir := ""
ch := make(chan []byte, runtime.NumCPU())
ch := make(chan []byte, workers)
var wg sync.WaitGroup
wg.Add(runtime.NumCPU())
collectors := make([]*etl.Collector, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(workers)
collectors := make([]*etl.Collector, workers)
for i := 0; i < workers; i++ {
collector := etl.NewCollector(CompressLogPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize))
collectors[i] = collector
go processSuperstring(ch, collector, &wg)
}
i := 0
if err := compress.ReadDatFile(name+".dat", func(v []byte) error {
if err := snapshotsync.ReadSimpleFile(fileName+".dat", func(v []byte) error {
if len(superstring)+2*len(v)+2 > superstringLimit {
ch <- superstring
superstring = nil
Expand All @@ -1629,7 +1630,6 @@ func compress1(chaindata string, name string) error {
}); err != nil {
return err
}
itemsCount := i
if len(superstring) > 0 {
ch <- superstring
}
Expand All @@ -1638,16 +1638,13 @@ func compress1(chaindata string, name string) error {

db, err := compress.DictionaryBuilderFromCollectors(context.Background(), CompressLogPrefix, tmpDir, collectors)
if err != nil {
return err
panic(err)
}
if err := compress.PersistDictrionary(name+".dictionary.txt", db); err != nil {
if err := compress.PersistDictrionary(fileName+".dictionary.txt", db); err != nil {
return err
}

if err := reducedict(name); err != nil {
return err
}
if err := _createIdx(*chainID, name, itemsCount); err != nil {
if err := reducedict(fileName, segmentFileName); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -2142,7 +2139,7 @@ func (hf *HuffmanCoder) flush() error {
}

// reduceDict reduces the dictionary by trying the substitutions and counting frequency for each word
func reducedict(name string) error {
func reducedict(name string, segmentFileName string) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

Expand All @@ -2168,7 +2165,7 @@ func reducedict(name string) error {
ch := make(chan []byte, 10000)
var inputSize, outputSize atomic2.Uint64
var wg sync.WaitGroup
workers := runtime.NumCPU()
workers := runtime.NumCPU() / 2
var collectors []*etl.Collector
var posMaps []map[uint64]uint64
for i := 0; i < workers; i++ {
Expand All @@ -2180,7 +2177,7 @@ func reducedict(name string) error {
go reduceDictWorker(ch, &wg, &pt, collector, inputSize, outputSize, posMap)
}
i := 0
if err := compress.ReadDatFile(name+".dat", func(v []byte) error {
if err := snapshotsync.ReadSimpleFile(name+".dat", func(v []byte) error {
input := make([]byte, 8+int(len(v)))
binary.BigEndian.PutUint64(input, uint64(i))
copy(input[8:], v)
Expand Down Expand Up @@ -2216,6 +2213,12 @@ func reducedict(name string) error {
}
}
sort.Sort(&patternList)
if len(patternList) == 0 {
log.Warn("dictionary is empty")
//err := fmt.Errorf("dictionary is empty")
//panic(err)
//return err
}
// Calculate offsets of the dictionary patterns and total size
var offset uint64
numBuf := make([]byte, binary.MaxVarintLen64)
Expand Down Expand Up @@ -2276,10 +2279,13 @@ func reducedict(name string) error {
heap.Push(&codeHeap, h)
huffs = append(huffs, h)
}
root := heap.Pop(&codeHeap).(*PatternHuff)
root := &PatternHuff{}
if len(patternList) > 0 {
root = heap.Pop(&codeHeap).(*PatternHuff)
}
var cf *os.File
var err error
if cf, err = os.Create(name + ".compressed.dat"); err != nil {
if cf, err = os.Create(segmentFileName); err != nil {
return err
}
cw := bufio.NewWriterSize(cf, etl.BufIOSize)
Expand Down Expand Up @@ -2547,7 +2553,7 @@ func reducedict(name string) error {
return nil
}
func recsplitWholeChain(chaindata string) error {
blocksPerFile := uint64(500_000)
blocksPerFile := uint64(*blockTotal) //uint64(500_000)
lastChunk := func(tx kv.Tx, blocksPerFile uint64) (uint64, error) {
c, err := tx.Cursor(kv.BlockBody)
if err != nil {
Expand All @@ -2571,22 +2577,66 @@ func recsplitWholeChain(chaindata string) error {

database := mdbx.MustOpen(chaindata)
defer database.Close()
chainConfig := tool.ChainConfigFromDB(database)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
_ = chainID
if err := database.View(context.Background(), func(tx kv.Tx) (err error) {
last, err = lastChunk(tx, blocksPerFile)
return err
}); err != nil {
return err
}
database.Close()
dataDir := path.Dir(chaindata)
snapshotDir := path.Join(dataDir, "snapshots")
_ = os.MkdirAll(snapshotDir, fs.ModePerm)

log.Info("Last body number", "last", last)
for i := uint64(*block); i < last; i += blocksPerFile {
fileName := snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Transactions)
segmentFile := path.Join(snapshotDir, fileName) + ".seg"
log.Info("Creating", "file", fileName+".seg")
if err := dumpTxs(chaindata, i, *blockTotal, fileName); err != nil {
return err
db := mdbx.MustOpen(chaindata)
if err := snapshotsync.DumpTxs(db, "", i, int(blocksPerFile)); err != nil {
panic(err)
}
db.Close()
if err := compress1(chaindata, fileName, segmentFile); err != nil {
panic(err)
}
if err := snapshotsync.TransactionsIdx(*chainID, snapshotDir, fileName); err != nil {
panic(err)
}
_ = os.Remove(fileName + ".dat")

fileName = snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Headers)
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
log.Info("Creating", "file", fileName+".seg")
db = mdbx.MustOpen(chaindata)
if err := snapshotsync.DumpHeaders(db, "", i, int(blocksPerFile)); err != nil {
panic(err)
}
db.Close()
if err := compress1(chaindata, fileName, segmentFile); err != nil {
panic(err)
}
if err := compress1(chaindata, fileName); err != nil {
if err := snapshotsync.BodiesIdx(snapshotDir, fileName); err != nil {
panic(err)
}
_ = os.Remove(fileName + ".dat")

fileName = snapshotsync.FileName(i, i+blocksPerFile, snapshotsync.Bodies)
segmentFile = path.Join(snapshotDir, fileName) + ".seg"
log.Info("Creating", "file", fileName+".seg")
db = mdbx.MustOpen(chaindata)
if err := snapshotsync.DumpBodies(db, "", i, int(blocksPerFile)); err != nil {
panic(err)
}
db.Close()
if err := compress1(chaindata, fileName, segmentFile); err != nil {
panic(err)
}
if err := snapshotsync.BodiesIdx(snapshotDir, fileName); err != nil {
return err
}
_ = os.Remove(fileName + ".dat")
Expand All @@ -2602,7 +2652,7 @@ func recsplitLookup(chaindata, name string) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

d, err := compress.NewDecompressor(name + ".compressed.dat")
d, err := compress.NewDecompressor(name + ".seg")
if err != nil {
return err
}
Expand Down Expand Up @@ -2674,29 +2724,15 @@ func createIdx(chaindata string, name string) error {
defer database.Close()
chainConfig := tool.ChainConfigFromDB(database)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
d, err := compress.NewDecompressor(name + ".compressed.dat")
d, err := compress.NewDecompressor(name + ".seg")
if err != nil {
return err
}
defer d.Close()
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
g := d.MakeGetter()
var word = make([]byte, 0, 4*1024)
wc := 0
for g.HasNext() {
word, _ = g.Next(word[:0])
wc++
select {
default:
case <-logEvery.C:
log.Info("[Filling recsplit] Processed", "millions", wc/1_000_000)
}
}
return _createIdx(*chainID, name, wc)
return _createIdx(*chainID, name, d.Count())
}
func _createIdx(chainID uint256.Int, name string, count int) error {
d, err := compress.NewDecompressor(name + ".compressed.dat")
d, err := compress.NewDecompressor(name + ".seg")
if err != nil {
return err
}
Expand All @@ -2710,10 +2746,7 @@ func _createIdx(chainID uint256.Int, name string, count int) error {
Salt: 0,
LeafSize: 8,
TmpDir: "",
StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73,
0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d,
0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a},
IndexFile: name + ".idx",
IndexFile: name + ".idx",
})
if err != nil {
return err
Expand Down Expand Up @@ -2759,7 +2792,7 @@ RETRY:
return nil
}
func decompress(name string) error {
d, err := compress.NewDecompressor(name + ".compressed.dat")
d, err := compress.NewDecompressor(name + ".seg")
if err != nil {
return err
}
Expand Down Expand Up @@ -3466,72 +3499,6 @@ func fixState(chaindata string) error {
return tx.Commit()
}

func dumpTxs(chaindata string, block uint64, blockTotal int, name string) error {
db := mdbx.MustOpen(chaindata)
defer db.Close()
chainConfig := tool.ChainConfigFromDB(db)
chainID, _ := uint256.FromBig(chainConfig.ChainID)

f, err := os.Create(name + ".dat")
if err != nil {
return err
}
defer f.Close()
w := bufio.NewWriterSize(f, etl.BufIOSize)
defer w.Flush()
i := 0
numBuf := make([]byte, binary.MaxVarintLen64)
parseCtx := txpool.NewTxParseContext(*chainID)
parseCtx.WithSender(false)
slot := txpool.TxSlot{}
valueBuf := make([]byte, 16*4096)
from := dbutils.EncodeBlockNumber(block)
if err := kv.BigChunks(db, kv.BlockBody, from, func(tx kv.Tx, k, v []byte) (bool, error) {
bodyNum := binary.BigEndian.Uint64(k)
if bodyNum >= block+uint64(blockTotal) {
return false, nil
}

var body types.BodyForStorage
if e := rlp.DecodeBytes(v, &body); err != nil {
return false, e
}
if body.TxAmount == 0 {
return true, nil
}

binary.BigEndian.PutUint64(numBuf, body.BaseTxId)
if err := tx.ForAmount(kv.EthTx, numBuf[:8], body.TxAmount, func(tk, tv []byte) error {
if _, err := parseCtx.ParseTransaction(tv, 0, &slot, nil); err != nil {
return err
}
valueBuf = valueBuf[:0]
valueBuf = append(append(valueBuf, slot.IdHash[:1]...), tv...)
n := binary.PutUvarint(numBuf, uint64(len(valueBuf)))
if _, e := w.Write(numBuf[:n]); e != nil {
return e
}
if len(valueBuf) > 0 {
if _, e := w.Write(valueBuf); e != nil {
return e
}
}
i++
if i%1_000_000 == 0 {
log.Info("Wrote into file", "million txs", i/1_000_000, "block num", bodyNum)
}
return nil
}); err != nil {
return false, err
}
return true, nil
}); err != nil {
return err
}

return nil
}

func trimTxs(chaindata string) error {
db := mdbx.MustOpen(chaindata)
defer db.Close()
Expand Down Expand Up @@ -4104,7 +4071,7 @@ func main() {
case "dumpState":
err = dumpState(*chaindata, int(*block), *name)
case "compress":
err = compress1(*chaindata, *name)
err = compress1(*chaindata, *name, *name)
case "createIdx":
err = createIdx(*chaindata, *name)
case "recsplitWholeChain":
Expand All @@ -4115,8 +4082,6 @@ func main() {
err = decompress(*name)
case "genstate":
err = genstate()
case "dumpTxs":
err = dumpTxs(*chaindata, uint64(*block), int(*blockTotal), *name)
}

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42
github.com/ledgerwatch/erigon-lib v0.0.0-20211119092045-3e3b93617644
github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42 h1:u6JmlvVVy285hy3H21PUOqMk6W3FWwDvcM/FVQtGwC0=
github.com/ledgerwatch/erigon-lib v0.0.0-20211119034502-bef3d1e3bd42/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
github.com/ledgerwatch/erigon-lib v0.0.0-20211119092045-3e3b93617644 h1:KHalrpnX2i7DiLjTowUxh2d68pq5wrJi7hWs3T1T0vg=
github.com/ledgerwatch/erigon-lib v0.0.0-20211119092045-3e3b93617644/go.mod h1:CuEZROm43MykZT5CjCj02jw0FOwaDl8Nh+PZkTEGopg=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
Loading

0 comments on commit d4850b6

Please sign in to comment.