diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index f8a4b53f538..200919d6edc 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -266,7 +266,14 @@ func (b *SimulatedBackend) TransactionByHash(ctx context.Context, txHash common. if txn != nil { return txn, true, nil } - txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash) + blockNumber, err := rawdb.ReadTxLookupEntry(tx, txHash) + if err != nil { + return nil, false, err + } + if blockNumber == nil { + return nil, false, ethereum.NotFound + } + txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash, *blockNumber) if err != nil { return nil, false, err } diff --git a/cmd/rpcdaemon/commands/eth_block.go b/cmd/rpcdaemon/commands/eth_block.go index b48398ea56f..d0e58926f11 100644 --- a/cmd/rpcdaemon/commands/eth_block.go +++ b/cmd/rpcdaemon/commands/eth_block.go @@ -42,7 +42,14 @@ func (api *APIImpl) CallBundle(ctx context.Context, txHashes []common.Hash, stat var txs types.Transactions for _, txHash := range txHashes { - txn, _, _, _, err := rawdb.ReadTransaction(tx, txHash) + blockNumber, err := rawdb.ReadTxLookupEntry(tx, txHash) + if err != nil { + return nil, err + } + if blockNumber == nil { + return nil, nil + } + txn, _, _, _, err := rawdb.ReadTransaction(tx, txHash, *blockNumber) if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/eth_txs.go b/cmd/rpcdaemon/commands/eth_txs.go index b0683c3b506..13a3088cf3c 100644 --- a/cmd/rpcdaemon/commands/eth_txs.go +++ b/cmd/rpcdaemon/commands/eth_txs.go @@ -26,7 +26,14 @@ func (api *APIImpl) GetTransactionByHash(ctx context.Context, hash common.Hash) defer tx.Rollback() // https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByHash - txn, blockHash, blockNumber, txIndex, err := rawdb.ReadTransaction(tx, hash) + blockNum, err := rawdb.ReadTxLookupEntry(tx, hash) + if err != nil { + return nil, err + } + if blockNum == nil { + return nil, nil + } + txn, blockHash, blockNumber, txIndex, err := rawdb.ReadTransaction(tx, hash, *blockNum) if err != nil { return nil, err } @@ -81,7 +88,14 @@ func (api *APIImpl) GetRawTransactionByHash(ctx context.Context, hash common.Has defer tx.Rollback() // https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByHash - txn, _, _, _, err := rawdb.ReadTransaction(tx, hash) + blockNum, err := rawdb.ReadTxLookupEntry(tx, hash) + if err != nil { + return nil, err + } + if blockNum == nil { + return nil, nil + } + txn, _, _, _, err := rawdb.ReadTransaction(tx, hash, *blockNum) if err != nil { return nil, err } diff --git a/cmd/rpcdaemon/commands/tracing.go b/cmd/rpcdaemon/commands/tracing.go index c2b5fdac65f..7d8e093d0bc 100644 --- a/cmd/rpcdaemon/commands/tracing.go +++ b/cmd/rpcdaemon/commands/tracing.go @@ -29,7 +29,14 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo defer tx.Rollback() // Retrieve the transaction and assemble its EVM context - txn, blockHash, _, txIndex, err := rawdb.ReadTransaction(tx, hash) + blockNum, err := rawdb.ReadTxLookupEntry(tx, hash) + if err != nil { + return err + } + if blockNum == nil { + return nil + } + txn, blockHash, _, txIndex, err := rawdb.ReadTransaction(tx, hash, *blockNum) if err != nil { return err } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 25396fc9bef..a9cfdc8b826 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -388,7 +388,7 @@ func ReadBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *ty var err error body.Transactions, err = CanonicalTransactions(db, baseTxId, txAmount) if err != nil { - log.Error("failed ReadTransaction", "hash", hash, "block", number, "err", err) + log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) return nil } return body @@ -402,7 +402,7 @@ func NonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uin var err error body.Transactions, err = NonCanonicalTransactions(db, baseTxId, txAmount) if err != nil { - log.Error("failed ReadTransaction", "hash", hash, "block", number, "err", err) + log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err) return nil } return body diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 6fbdc79815a..9108b3df66a 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -35,7 +35,7 @@ type TxLookupEntry struct { // ReadTxLookupEntry retrieves the positional metadata associated with a transaction // hash to allow retrieving the transaction or receipt by hash. -func ReadTxLookupEntry(db kv.Tx, txnHash common.Hash) (*uint64, error) { +func ReadTxLookupEntry(db kv.Getter, txnHash common.Hash) (*uint64, error) { data, err := db.GetOne(kv.TxLookup, txnHash.Bytes()) if err != nil { return nil, err @@ -63,9 +63,9 @@ func DeleteTxLookupEntry(db kv.Deleter, hash common.Hash) error { return db.Delete(kv.TxLookup, hash.Bytes(), nil) } -// ReadTransaction retrieves a specific transaction from the database, along with +// ReadTransactionByHash retrieves a specific transaction from the database, along with // its added positional metadata. -func ReadTransaction(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash, uint64, uint64, error) { +func ReadTransactionByHash(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash, uint64, uint64, error) { blockNumber, err := ReadTxLookupEntry(db, hash) if err != nil { return nil, common.Hash{}, 0, 0, err @@ -99,6 +99,35 @@ func ReadTransaction(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash return nil, common.Hash{}, 0, 0, nil } +// ReadTransaction retrieves a specific transaction from the database, along with +// its added positional metadata. +func ReadTransaction(db kv.Tx, hash common.Hash, blockNumber uint64) (types.Transaction, common.Hash, uint64, uint64, error) { + blockHash, err := ReadCanonicalHash(db, blockNumber) + if err != nil { + return nil, common.Hash{}, 0, 0, err + } + if blockHash == (common.Hash{}) { + return nil, common.Hash{}, 0, 0, nil + } + body := ReadBodyWithTransactions(db, blockHash, blockNumber) + if body == nil { + log.Error("Transaction referenced missing", "number", blockNumber, "hash", blockHash) + return nil, common.Hash{}, 0, 0, nil + } + senders, err1 := ReadSenders(db, blockHash, blockNumber) + if err1 != nil { + return nil, common.Hash{}, 0, 0, err1 + } + body.SendersToTxs(senders) + for txIndex, tx := range body.Transactions { + if tx.Hash() == hash { + return tx, blockHash, blockNumber, uint64(txIndex), nil + } + } + log.Error("Transaction not found", "number", blockNumber, "hash", blockHash, "txhash", hash) + return nil, common.Hash{}, 0, 0, nil +} + func ReadReceipt(db kv.Tx, txHash common.Hash) (*types.Receipt, common.Hash, uint64, uint64, error) { // Retrieve the context of the receipt based on the transaction hash blockNumber, err := ReadTxLookupEntry(db, txHash) diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go index f98aa8600f2..cdf56135511 100644 --- a/core/rawdb/accessors_indexes_test.go +++ b/core/rawdb/accessors_indexes_test.go @@ -56,7 +56,7 @@ func TestLookupStorage(t *testing.T) { // Check that no transactions entries are in a pristine database for i, txn := range txs { - if txn2, _, _, _, _ := ReadTransaction(tx, txn.Hash()); txn2 != nil { + if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil { t.Fatalf("txn #%d [%x]: non existent transaction returned: %v", i, txn.Hash(), txn2) } } @@ -73,7 +73,7 @@ func TestLookupStorage(t *testing.T) { tc.writeTxLookupEntries(tx, block) for i, txn := range txs { - if txn2, hash, number, index, _ := ReadTransaction(tx, txn.Hash()); txn2 == nil { + if txn2, hash, number, index, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 == nil { t.Fatalf("txn #%d [%x]: transaction not found", i, txn.Hash()) } else { if hash != block.Hash() || number != block.NumberU64() || index != uint64(i) { @@ -89,7 +89,7 @@ func TestLookupStorage(t *testing.T) { if err := DeleteTxLookupEntry(tx, txn.Hash()); err != nil { t.Fatal(err) } - if txn2, _, _, _, _ := ReadTransaction(tx, txn.Hash()); txn2 != nil { + if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil { t.Fatalf("txn #%d [%x]: deleted transaction returned: %v", i, txn.Hash(), txn2) } } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e96969ae8ef..6b8af5b7b63 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -801,7 +801,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R expect := cfg.snapshots.ChainSnapshotConfig().ExpectBlocks if headers < expect || bodies < expect || txs < expect { chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - if err := cfg.snapshots.BuildIndices(ctx, *chainID); err != nil { + if err := cfg.snapshots.BuildIndices(ctx, *chainID, cfg.tmpdir); err != nil { return err } } diff --git a/turbo/cli/snapshots.go b/turbo/cli/snapshots.go index f38a4c20ab7..06dff3badd2 100644 --- a/turbo/cli/snapshots.go +++ b/turbo/cli/snapshots.go @@ -11,6 +11,7 @@ import ( "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/compress" + "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/cmd/hack/tool" @@ -40,6 +41,14 @@ var snapshotCommand = cli.Command{ SnapshotSegmentSizeFlag, }, }, + { + Name: "index", + Action: doIndicesCommand, + Usage: "Create all indices for snapshots", + Flags: []cli.Flag{ + utils.DataDirFlag, + }, + }, }, } @@ -61,6 +70,22 @@ var ( } ) +func doIndicesCommand(cliCtx *cli.Context) error { + ctx, cancel := utils.RootContext() + defer cancel() + + dataDir := cliCtx.String(utils.DataDirFlag.Name) + snapshotDir := path.Join(dataDir, "snapshots") + tmpDir := path.Join(dataDir, etl.TmpDirName) + + chainDB := mdbx.NewMDBX(log.New()).Path(path.Join(dataDir, "chaindata")).Readonly().MustOpen() + defer chainDB.Close() + + if err := rebuildIndices(ctx, chainDB, snapshotDir, tmpDir); err != nil { + log.Error("Error", "err", err) + } + return nil +} func doSnapshotCommand(ctx *cli.Context) error { fromBlock := ctx.Uint64(SnapshotFromFlag.Name) toBlock := ctx.Uint64(SnapshotToFlag.Name) @@ -80,6 +105,34 @@ func doSnapshotCommand(ctx *cli.Context) error { return nil } +func rebuildIndices(ctx context.Context, chainDB kv.RoDB, snapshotDir, tmpDir string) error { + chainConfig := tool.ChainConfigFromDB(chainDB) + chainID, _ := uint256.FromBig(chainConfig.ChainID) + _ = chainID + _ = os.MkdirAll(snapshotDir, fs.ModePerm) + + workers := runtime.NumCPU() - 1 + if workers < 1 { + workers = 1 + } + + allSnapshots := snapshotsync.NewAllSnapshots(snapshotDir, snapshothashes.KnownConfig(chainConfig.ChainName)) + if err := allSnapshots.ReopenSegments(); err != nil { + return err + } + idxFilesList, err := snapshotsync.IdxFilesList(snapshotDir) + if err != nil { + return err + } + for _, f := range idxFilesList { + _ = os.Remove(f) + } + if err := allSnapshots.BuildIndices(ctx, *chainID, tmpDir); err != nil { + return err + } + return nil +} + func snapshotBlocks(chainDB kv.RoDB, fromBlock, toBlock, blocksPerFile uint64, snapshotDir string) error { var last uint64 diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 4df88863c4b..57f93dfc3a7 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -74,6 +74,17 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Tx, hash co return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight) } +func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) { + n, err := rawdb.ReadTxLookupEntry(tx, txnHash) + if err != nil { + return 0, false, err + } + if n == nil { + return 0, false, nil + } + return *n, true, nil +} + type RemoteBlockReader struct { client remote.ETHBACKENDClient } @@ -356,7 +367,7 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash, gg := sn.Headers.MakeGetter() gg.Reset(headerOffset) buf, _ = gg.Next(buf[:0]) - if hash[0] != buf[1] { + if hash[0] != buf[0] { return nil, nil } @@ -413,3 +424,33 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B body.Uncles = b.Uncles return body, senders, b.BaseTxId, b.TxAmount, nil } + +func (back *BlockReaderWithSnapshots) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) { + n, err := rawdb.ReadTxLookupEntry(tx, txnHash) + if err != nil { + return 0, false, err + } + if n != nil { + return *n, true, nil + } + + buf := make([]byte, 16) + for i := len(back.sn.blocks) - 1; i >= 0; i-- { + sn := back.sn.blocks[i] + + localID := sn.TxnHashIdx.Lookup(txnHash[:]) + offset := sn.TxnHashIdx.Lookup2(localID) + gg := sn.Transactions.MakeGetter() + gg.Reset(offset) + buf, _ = gg.Next(buf[:0]) + if txnHash[0] != buf[0] { + continue + } + + localID = sn.TxnHash2BlockNumIdx.Lookup(txnHash[:]) + blockNum := sn.TxnHash2BlockNumIdx.Lookup2(localID) + return blockNum, true, nil + } + + return 0, false, nil +} diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 951d019fac5..212ea07c196 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -35,12 +35,13 @@ import ( ) type BlocksSnapshot struct { - Bodies *compress.Decompressor // value: rlp(types.BodyForStorage) - Headers *compress.Decompressor // value: first_byte_of_header_hash + header_rlp - Transactions *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp - BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset - HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset - TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset + Bodies *compress.Decompressor // value: rlp(types.BodyForStorage) + Headers *compress.Decompressor // value: first_byte_of_header_hash + header_rlp + Transactions *compress.Decompressor // value: first_byte_of_transaction_hash + transaction_rlp + BodyNumberIdx *recsplit.Index // block_num_u64 -> bodies_segment_offset + HeaderHashIdx *recsplit.Index // header_hash -> headers_segment_offset + TxnHashIdx *recsplit.Index // transaction_hash -> transactions_segment_offset + TxnHash2BlockNumIdx *recsplit.Index // transaction_hash -> block_number From uint64 // included To uint64 // excluded @@ -54,7 +55,12 @@ const ( Transactions SnapshotType = "transactions" ) +var ( + Transactions2Block SnapshotType = "transactions-to-block" +) + var AllSnapshotTypes = []SnapshotType{Headers, Bodies, Transactions} +var AllIdxTypes = []SnapshotType{Headers, Bodies, Transactions, Transactions2Block} var ( ErrInvalidCompressedFileName = fmt.Errorf("invalid compressed file name") @@ -137,7 +143,7 @@ func (s *AllSnapshots) ReopenIndices() error { return s.ReopenSomeIndices(AllSnapshotTypes...) } -func (s *AllSnapshots) ReopenSomeIndices(types ...SnapshotType) error { +func (s *AllSnapshots) ReopenSomeIndices(types ...SnapshotType) (err error) { for _, bs := range s.blocks { for _, snapshotType := range types { switch snapshotType { @@ -146,32 +152,37 @@ func (s *AllSnapshots) ReopenSomeIndices(types ...SnapshotType) error { bs.HeaderHashIdx.Close() bs.HeaderHashIdx = nil } - idx, err := recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers))) + bs.HeaderHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Headers))) if err != nil { return err } - bs.HeaderHashIdx = idx case Bodies: if bs.BodyNumberIdx != nil { bs.BodyNumberIdx.Close() bs.BodyNumberIdx = nil } - idx, err := recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies))) + bs.BodyNumberIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Bodies))) if err != nil { return err } - bs.BodyNumberIdx = idx - case Transactions: if bs.TxnHashIdx != nil { bs.TxnHashIdx.Close() bs.TxnHashIdx = nil } - idx, err := recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions))) + bs.TxnHashIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions))) + if err != nil { + return err + } + + if bs.TxnHash2BlockNumIdx != nil { + bs.TxnHash2BlockNumIdx.Close() + bs.TxnHash2BlockNumIdx = nil + } + bs.TxnHash2BlockNumIdx, err = recsplit.OpenIndex(path.Join(s.dir, IdxFileName(bs.From, bs.To, Transactions2Block))) if err != nil { return err } - bs.TxnHashIdx = idx default: panic(fmt.Sprintf("unknown snapshot type: %s", snapshotType)) } @@ -292,7 +303,7 @@ func (s *AllSnapshots) Blocks(blockNumber uint64) (snapshot *BlocksSnapshot, fou return snapshot, false } -func (s *AllSnapshots) BuildIndices(ctx context.Context, chainID uint256.Int) error { +func (s *AllSnapshots) BuildIndices(ctx context.Context, chainID uint256.Int, tmpDir string) error { for _, sn := range s.blocks { f := path.Join(s.dir, SegmentFileName(sn.From, sn.To, Headers)) if err := HeadersHashIdx(f, sn.From); err != nil { @@ -332,7 +343,7 @@ func (s *AllSnapshots) BuildIndices(ctx context.Context, chainID uint256.Int) er expectedTxsAmount = lastBody.BaseTxId + uint64(lastBody.TxAmount) - firstBody.BaseTxId } f := path.Join(s.dir, SegmentFileName(sn.From, sn.To, Transactions)) - if err := TransactionsHashIdx(chainID, firstBody.BaseTxId, f, expectedTxsAmount); err != nil { + if err := TransactionsHashIdx(chainID, sn, firstBody.BaseTxId, sn.From, f, expectedTxsAmount, tmpDir); err != nil { return err } } @@ -421,6 +432,18 @@ func segments(dir string, ofType SnapshotType) ([]string, error) { sort.Strings(res) return res, nil } + +func IdxFilesList(dir string) (res []string, err error) { + for _, t := range AllIdxTypes { + files, err := idxFiles(dir, t) + if err != nil { + return nil, err + } + res = append(res, files...) + } + return res, nil +} + func idxFiles(dir string, ofType SnapshotType) ([]string, error) { files, err := ioutil.ReadDir(dir) if err != nil { @@ -621,8 +644,7 @@ func DumpHeaders(db kv.RoDB, tmpdir string, fromBlock uint64, blocksAmount int) return false, err } if dataRLP == nil { - log.Warn("header missed", "block_num", blockNum, "hash", fmt.Sprintf("%x", v)) - return true, nil + return false, fmt.Errorf("header missed in db: block_num=%d, hash=%x", blockNum, v) } h := types.Header{} if err := rlp.DecodeBytes(dataRLP, &h); err != nil { @@ -713,39 +735,113 @@ func DumpBodies(db kv.RoDB, tmpdir string, fromBlock uint64, blocksAmount int) e return nil } -func TransactionsHashIdx(chainID uint256.Int, firstTxID uint64, segmentFileName string, expectedCount uint64) error { +func TransactionsHashIdx(chainID uint256.Int, sn *BlocksSnapshot, firstTxID, firstBlockNum uint64, segmentFilePath string, expectedCount uint64, tmpDir string) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() + _, fileName := filepath.Split(segmentFilePath) + parseCtx := txpool.NewTxParseContext(chainID) parseCtx.WithSender(false) slot := txpool.TxSlot{} var sender [20]byte var j uint64 - d, err := compress.NewDecompressor(segmentFileName) + + d, err := compress.NewDecompressor(segmentFilePath) if err != nil { return err } defer d.Close() total := uint64(d.Count()) - if err := Idx(d, firstTxID, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + buf := make([]byte, 1024) + + txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: d.Count(), + Enums: true, + BucketSize: 2000, + Salt: 0, + LeafSize: 8, + TmpDir: tmpDir, + IndexFile: IdxFileName(sn.From, sn.To, Transactions), + BaseDataID: firstTxID, + }) + if err != nil { + return err + } + txnHash2BlockNumIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ + KeyCount: d.Count(), + Enums: true, + BucketSize: 2000, + Salt: 0, + LeafSize: 8, + TmpDir: tmpDir, + IndexFile: IdxFileName(sn.From, sn.To, Transactions2Block), + BaseDataID: firstBlockNum, + }) + if err != nil { + return err + } + +RETRY: + blockNum := firstBlockNum + body := &types.BodyForStorage{} + bodyGetter := sn.Bodies.MakeGetter() + bodyGetter.Reset(0) + buf, _ = bodyGetter.Next(buf[:0]) + if err := rlp.DecodeBytes(buf, body); err != nil { + return err + } + + if err := forEach(d, func(i, offset uint64, word []byte) error { if _, err := parseCtx.ParseTransaction(word[1+20:], 0, &slot, sender[:]); err != nil { return err } - if err := idx.AddKey(slot.IdHash[:], offset); err != nil { + if err := txnHashIdx.AddKey(slot.IdHash[:], offset); err != nil { + return err + } + + for firstTxID+i > body.BaseTxId+uint64(body.TxAmount) { // read next + buf, _ = bodyGetter.Next(buf[:0]) + if err := rlp.DecodeBytes(buf, body); err != nil { + return err + } + blockNum++ + } + + if err := txnHash2BlockNumIdx.AddKey(slot.IdHash[:], blockNum); err != nil { return err } select { default: case <-logEvery.C: - log.Info(fmt.Sprintf("[Snapshots Indexing] TransactionsHashIdx: %s", percent(i, total))) + log.Info(fmt.Sprintf("[Snapshots Indexing] TransactionsHashIdx: %s, %s", percent(i, total), fileName)) } j++ return nil }); err != nil { - return fmt.Errorf("TransactionsHashIdx: %w", err) + return err + } + + if err = txnHashIdx.Build(); err != nil { + if errors.Is(err, recsplit.ErrCollision) { + log.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) + txnHashIdx.ResetNextSalt() + txnHash2BlockNumIdx.ResetNextSalt() + goto RETRY + } + return err } + if err = txnHash2BlockNumIdx.Build(); err != nil { + if errors.Is(err, recsplit.ErrCollision) { + log.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) + txnHashIdx.ResetNextSalt() + txnHash2BlockNumIdx.ResetNextSalt() + goto RETRY + } + return err + } + if j != expectedCount { panic(fmt.Errorf("expect: %d, got %d\n", expectedCount, j)) } @@ -753,11 +849,12 @@ func TransactionsHashIdx(chainID uint256.Int, firstTxID uint64, segmentFileName } // HeadersHashIdx - headerHash -> offset (analog of kv.HeaderNumber) -func HeadersHashIdx(segmentFileName string, firstBlockNumInSegment uint64) error { +func HeadersHashIdx(segmentFilePath string, firstBlockNumInSegment uint64) error { logEvery := time.NewTicker(5 * time.Second) defer logEvery.Stop() + _, fileName := filepath.Split(segmentFilePath) - d, err := compress.NewDecompressor(segmentFileName) + d, err := compress.NewDecompressor(segmentFilePath) if err != nil { return err } @@ -777,7 +874,7 @@ func HeadersHashIdx(segmentFileName string, firstBlockNumInSegment uint64) error select { default: case <-logEvery.C: - log.Info(fmt.Sprintf("[Snapshots Indexing] HeadersHashIdx: %s", percent(i, total))) + log.Info(fmt.Sprintf("[Snapshots Indexing] HeadersHashIdx: %s, %s", percent(i, total), fileName)) } return nil }); err != nil { @@ -786,12 +883,13 @@ func HeadersHashIdx(segmentFileName string, firstBlockNumInSegment uint64) error return nil } -func BodiesIdx(segmentFileName string, firstBlockNumInSegment uint64) error { +func BodiesIdx(segmentFilePath string, firstBlockNumInSegment uint64) error { logEvery := time.NewTicker(5 * time.Second) defer logEvery.Stop() + _, fileName := filepath.Split(segmentFilePath) num := make([]byte, 8) - d, err := compress.NewDecompressor(segmentFileName) + d, err := compress.NewDecompressor(segmentFilePath) if err != nil { return err } @@ -807,7 +905,7 @@ func BodiesIdx(segmentFileName string, firstBlockNumInSegment uint64) error { select { default: case <-logEvery.C: - log.Info(fmt.Sprintf("[Snapshots Indexing] BodyNumberIdx: %s", percent(i, total))) + log.Info(fmt.Sprintf("[Snapshots Indexing] BodyNumberIdx: %s, %s", percent(i, total), fileName)) } return nil }); err != nil { @@ -816,6 +914,22 @@ func BodiesIdx(segmentFileName string, firstBlockNumInSegment uint64) error { return nil } +//forEach - only reason why this func exists - is that .Next returns "nextPos" instead of "pos". If fix this in future - then can remove this func +func forEach(d *compress.Decompressor, walker func(i, offset uint64, word []byte) error) error { + g := d.MakeGetter() + var wc, pos, nextPos uint64 + word := make([]byte, 0, 4096) + for g.HasNext() { + word, nextPos = g.Next(word[:0]) + if err := walker(wc, pos, word); err != nil { + return err + } + wc++ + pos = nextPos + } + return nil +} + // Idx - iterate over segment and building .idx file func Idx(d *compress.Decompressor, firstDataID uint64, walker func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error) error { segmentFileName := d.FilePath() diff --git a/turbo/stages/blockchain_test.go b/turbo/stages/blockchain_test.go index 1bc5b36a387..11de0443c37 100644 --- a/turbo/stages/blockchain_test.go +++ b/turbo/stages/blockchain_test.go @@ -479,7 +479,7 @@ func TestChainTxReorgs(t *testing.T) { // removed tx txs := types.Transactions{pastDrop, freshDrop} for i, txn := range txs { - if txn, _, _, _, _ := rawdb.ReadTransaction(tx, txn.Hash()); txn != nil { + if txn, _, _, _, _ := rawdb.ReadTransactionByHash(tx, txn.Hash()); txn != nil { t.Errorf("drop %d: tx %v found while shouldn't have been", i, txn) } if rcpt, _, _, _, _ := rawdb.ReadReceipt(tx, txn.Hash()); rcpt != nil { @@ -489,7 +489,7 @@ func TestChainTxReorgs(t *testing.T) { // added tx txs = types.Transactions{pastAdd, freshAdd, futureAdd} for i, txn := range txs { - if txn, _, _, _, _ := rawdb.ReadTransaction(tx, txn.Hash()); txn == nil { + if txn, _, _, _, _ := rawdb.ReadTransactionByHash(tx, txn.Hash()); txn == nil { t.Errorf("add %d: expected tx to be found", i) } if rcpt, _, _, _, _ := rawdb.ReadReceipt(tx, txn.Hash()); rcpt == nil { @@ -499,7 +499,7 @@ func TestChainTxReorgs(t *testing.T) { // shared tx txs = types.Transactions{postponed, swapped} for i, txn := range txs { - if txn, _, _, _, _ := rawdb.ReadTransaction(tx, txn.Hash()); txn == nil { + if txn, _, _, _, _ := rawdb.ReadTransactionByHash(tx, txn.Hash()); txn == nil { t.Errorf("share %d: expected tx to be found", i) } if rcpt, _, _, _, _ := rawdb.ReadReceipt(tx, txn.Hash()); rcpt == nil {