Skip to content

Commit

Permalink
Snapshot: describe idx schema (erigontech#3191)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 31, 2021
1 parent 0658983 commit 6904e4c
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 147 deletions.
19 changes: 3 additions & 16 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package downloader
import (
"context"
"fmt"
"path/filepath"
"runtime"
"sync"
"time"
Expand All @@ -17,11 +16,12 @@ import (
"github.com/dustin/go-humanize"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"github.com/ledgerwatch/log/v3"
"golang.org/x/time/rate"
)

const ASSERT = false

type Client struct {
Cli *torrent.Client
pieceCompletionStore storage.PieceCompletion
Expand Down Expand Up @@ -231,27 +231,14 @@ func calcStats(prevStats aggStats, interval time.Duration, client *torrent.Clien
// added first time - pieces verification process will start (disk IO heavy) - progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func AddTorrentFiles(ctx context.Context, snapshotsDir string, torrentClient *torrent.Client, preverifiedHashes snapshothashes.Preverified) error {

func AddTorrentFiles(ctx context.Context, snapshotsDir string, torrentClient *torrent.Client) error {
if err := ForEachTorrentFile(snapshotsDir, func(torrentFilePath string) error {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return err
}
mi.AnnounceList = Trackers

// skip non-preverified files
_, torrentFileName := filepath.Split(torrentFilePath)
segmentFileName := segmentFileNameFromTorrentFileName(torrentFileName)
hashString, ok := preverifiedHashes[segmentFileName]
if !ok {
return nil
}
expect := metainfo.NewHashFromHex(hashString)
if mi.HashInfoBytes() != expect {
return fmt.Errorf("file %s has unexpected hash %x, expected %x. May help: git submodule update --init --recursive --force", torrentFileName, mi.HashInfoBytes(), expect)
}

if _, err = torrentClient.AddTorrent(mi); err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions cmd/downloader/downloader/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"google.golang.org/protobuf/types/known/emptypb"
)

Expand All @@ -32,19 +31,18 @@ func NewServer(db kv.RwDB, client *Client, snapshotDir string) (*SNDownloaderSer
return sn, nil
}

func CreateTorrentFilesAndAdd(ctx context.Context, snapshotDir string, torrentClient *torrent.Client, config *snapshothashes.Config) error {
func CreateTorrentFilesAndAdd(ctx context.Context, snapshotDir string, torrentClient *torrent.Client) error {
if err := BuildTorrentFilesIfNeed(ctx, snapshotDir); err != nil {
return err
}
if err := AddTorrentFiles(ctx, snapshotDir, torrentClient, config.Preverified); err != nil {
if err := AddTorrentFiles(ctx, snapshotDir, torrentClient); err != nil {
return err
}
for _, t := range torrentClient.Torrents() {
t.AllowDataDownload()
t.AllowDataUpload()
t.DownloadAll()
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/downloader/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func CreateTorrentFile(root string, info *metainfo.Info, mi *metainfo.MetaInfo)
return nil
}

// nolint
func segmentFileNameFromTorrentFileName(in string) string {
ext := filepath.Ext(in)
return in[0 : len(in)-len(ext)]
Expand Down
42 changes: 26 additions & 16 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,36 @@ func Downloader(ctx context.Context, cmd *cobra.Command) error {
return fmt.Errorf("new server: %w", err)
}

var cc *params.ChainConfig
{
chaindataDir := path.Join(datadir, "chaindata")
if err := os.MkdirAll(chaindataDir, 0755); err != nil {
return err
}
chaindata, err := mdbx.Open(chaindataDir, log.New(), true)
if err != nil {
return fmt.Errorf("%w, path: %s", err, chaindataDir)
}
cc = tool.ChainConfigFromDB(chaindata)
chaindata.Close()
}

snapshotsCfg := snapshothashes.KnownConfig(cc.ChainName)
err = downloader.CreateTorrentFilesAndAdd(ctx, snapshotsDir, t.Cli, snapshotsCfg)
err = downloader.CreateTorrentFilesAndAdd(ctx, snapshotsDir, t.Cli)
if err != nil {
return fmt.Errorf("start: %w", err)
}

if downloader.ASSERT {
var cc *params.ChainConfig
{
chaindataDir := path.Join(datadir, "chaindata")
if err := os.MkdirAll(chaindataDir, 0755); err != nil {
return err
}
chaindata, err := mdbx.Open(chaindataDir, log.New(), true)
if err != nil {
return fmt.Errorf("%w, path: %s", err, chaindataDir)
}
cc = tool.ChainConfigFromDB(chaindata)
chaindata.Close()
}

snapshotsCfg := snapshothashes.KnownConfig(cc.ChainName)
for _, t := range t.Cli.Torrents() {
expectHashStr := snapshotsCfg.Preverified[t.Info().Name]
expectHash := metainfo.NewHashFromHex(expectHashStr)
if t.InfoHash() != expectHash {
return fmt.Errorf("file %s has unexpected hash %x, expected %x. May help: git submodule update --init --recursive --force", t.Info().Name, t.InfoHash(), expectHash)
}
}
}

go downloader.MainLoop(ctx, t.Cli)

grpcServer, err := StartGrpc(bittorrentServer, downloaderApiAddr, nil)
Expand Down
1 change: 0 additions & 1 deletion cmd/downloader/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ rsync server1:<your_datadir>/snapshots/*.torrent server2:<your_datadir>/snapshot
// re-start downloader
```


2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}

// ResetSequence - allow set arbitrary value to sequence (for example to decrement it to exact value)
lastTxnID := sn.Transactions.Idx.BaseDataID() + uint64(sn.Transactions.Segment.Count())
lastTxnID := sn.TxnHashIdx.BaseDataID() + uint64(sn.Transactions.Count())
if err := rawdb.ResetSequence(tx, kv.EthTx, lastTxnID+1); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20211228005012-bd5e7706f075
github.com/ledgerwatch/erigon-lib v0.0.0-20211231112434-5f40a789859a
github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211228005012-bd5e7706f075 h1:YlrZJZxE5AaczZ8r5ECgSXvI0tuGJEDPCxCyKCs26jQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20211228005012-bd5e7706f075/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y=
github.com/ledgerwatch/erigon-lib v0.0.0-20211231112434-5f40a789859a h1:LWNNoeLpGSrnR8N7McTKjPHgfzgt6YToyBfr8fy1Z1w=
github.com/ledgerwatch/erigon-lib v0.0.0-20211231112434-5f40a789859a/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
32 changes: 16 additions & 16 deletions turbo/snapshotsync/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,18 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k
back.lock.Lock()
defer back.lock.Unlock()

headerOffset := sn.Headers.Idx.Lookup2(blockHeight - sn.Headers.Idx.BaseDataID())
bodyOffset := sn.Bodies.Idx.Lookup2(blockHeight - sn.Bodies.Idx.BaseDataID())
headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID())
bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID())

gg := sn.Headers.Segment.MakeGetter()
gg := sn.Headers.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
h := &types.Header{}
if err = rlp.DecodeBytes(buf, h); err != nil {
return nil, nil, err
}

gg = sn.Bodies.Segment.MakeGetter()
gg = sn.Bodies.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
b := &types.BodyForStorage{}
Expand All @@ -252,15 +252,15 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k
return nil, nil, err
}

if b.BaseTxId < sn.Transactions.Idx.BaseDataID() {
return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.Transactions.Idx.BaseDataID(), sn.Transactions.File)
if b.BaseTxId < sn.TxnHashIdx.BaseDataID() {
return nil, nil, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath())
}

txs := make([]types.Transaction, b.TxAmount)
senders = make([]common.Address, b.TxAmount)
if b.TxAmount > 0 {
txnOffset := sn.Transactions.Idx.Lookup2(b.BaseTxId - sn.Transactions.Idx.BaseDataID()) // need subtract baseID of indexFile
gg = sn.Transactions.Segment.MakeGetter()
txnOffset := sn.TxnHashIdx.Lookup2(b.BaseTxId - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile
gg = sn.Transactions.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
for i := uint32(0); i < b.TxAmount; i++ {
Expand All @@ -287,8 +287,8 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k
func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn *BlocksSnapshot) (*types.Header, error) {
buf := make([]byte, 16)

headerOffset := sn.Headers.Idx.Lookup2(blockHeight - sn.Headers.Idx.BaseDataID())
gg := sn.Headers.Segment.MakeGetter()
headerOffset := sn.HeaderHashIdx.Lookup2(blockHeight - sn.HeaderHashIdx.BaseDataID())
gg := sn.Headers.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
h := &types.Header{}
Expand All @@ -301,9 +301,9 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshot(blockHeight uint64, sn
func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *BlocksSnapshot) (*types.Body, []common.Address, uint64, uint32, error) {
buf := make([]byte, 16)

bodyOffset := sn.Bodies.Idx.Lookup2(blockHeight - sn.Bodies.Idx.BaseDataID())
bodyOffset := sn.BodyNumberIdx.Lookup2(blockHeight - sn.BodyNumberIdx.BaseDataID())

gg := sn.Bodies.Segment.MakeGetter()
gg := sn.Bodies.MakeGetter()
gg.Reset(bodyOffset)
buf, _ = gg.Next(buf[:0])
b := &types.BodyForStorage{}
Expand All @@ -312,15 +312,15 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B
return nil, nil, 0, 0, err
}

if b.BaseTxId < sn.Transactions.Idx.BaseDataID() {
return nil, nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.Transactions.Idx.BaseDataID(), sn.Transactions.File)
if b.BaseTxId < sn.TxnHashIdx.BaseDataID() {
return nil, nil, 0, 0, fmt.Errorf(".idx file has wrong baseDataID? %d<%d, %s", b.BaseTxId, sn.TxnHashIdx.BaseDataID(), sn.Transactions.FilePath())
}

txs := make([]types.Transaction, b.TxAmount)
senders := make([]common.Address, b.TxAmount)
if b.TxAmount > 0 {
txnOffset := sn.Transactions.Idx.Lookup2(b.BaseTxId - sn.Transactions.Idx.BaseDataID()) // need subtract baseID of indexFile
gg = sn.Transactions.Segment.MakeGetter()
txnOffset := sn.TxnHashIdx.Lookup2(b.BaseTxId - sn.TxnHashIdx.BaseDataID()) // need subtract baseID of indexFile
gg = sn.Transactions.MakeGetter()
gg.Reset(txnOffset)
stream := rlp.NewStream(reader, 0)
for i := uint32(0); i < b.TxAmount; i++ {
Expand Down
Loading

0 comments on commit 6904e4c

Please sign in to comment.