Skip to content

Commit

Permalink
Snapshots: txnHash2BlockNum idx (erigontech#3213)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jan 6, 2022
1 parent eed7475 commit 13d9e71
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 48 deletions.
9 changes: 8 additions & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,14 @@ func (b *SimulatedBackend) TransactionByHash(ctx context.Context, txHash common.
if txn != nil {
return txn, true, nil
}
txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash)
blockNumber, err := rawdb.ReadTxLookupEntry(tx, txHash)
if err != nil {
return nil, false, err
}
if blockNumber == nil {
return nil, false, ethereum.NotFound
}
txn, _, _, _, err = rawdb.ReadTransaction(tx, txHash, *blockNumber)
if err != nil {
return nil, false, err
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/rpcdaemon/commands/eth_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func (api *APIImpl) CallBundle(ctx context.Context, txHashes []common.Hash, stat
var txs types.Transactions

for _, txHash := range txHashes {
txn, _, _, _, err := rawdb.ReadTransaction(tx, txHash)
blockNumber, err := rawdb.ReadTxLookupEntry(tx, txHash)
if err != nil {
return nil, err
}
if blockNumber == nil {
return nil, nil
}
txn, _, _, _, err := rawdb.ReadTransaction(tx, txHash, *blockNumber)
if err != nil {
return nil, err
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/rpcdaemon/commands/eth_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ func (api *APIImpl) GetTransactionByHash(ctx context.Context, hash common.Hash)
defer tx.Rollback()

// https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByHash
txn, blockHash, blockNumber, txIndex, err := rawdb.ReadTransaction(tx, hash)
blockNum, err := rawdb.ReadTxLookupEntry(tx, hash)
if err != nil {
return nil, err
}
if blockNum == nil {
return nil, nil
}
txn, blockHash, blockNumber, txIndex, err := rawdb.ReadTransaction(tx, hash, *blockNum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,7 +88,14 @@ func (api *APIImpl) GetRawTransactionByHash(ctx context.Context, hash common.Has
defer tx.Rollback()

// https://infura.io/docs/ethereum/json-rpc/eth-getTransactionByHash
txn, _, _, _, err := rawdb.ReadTransaction(tx, hash)
blockNum, err := rawdb.ReadTxLookupEntry(tx, hash)
if err != nil {
return nil, err
}
if blockNum == nil {
return nil, nil
}
txn, _, _, _, err := rawdb.ReadTransaction(tx, hash, *blockNum)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion cmd/rpcdaemon/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ func (api *PrivateDebugAPIImpl) TraceTransaction(ctx context.Context, hash commo
defer tx.Rollback()

// Retrieve the transaction and assemble its EVM context
txn, blockHash, _, txIndex, err := rawdb.ReadTransaction(tx, hash)
blockNum, err := rawdb.ReadTxLookupEntry(tx, hash)
if err != nil {
return err
}
if blockNum == nil {
return nil
}
txn, blockHash, _, txIndex, err := rawdb.ReadTransaction(tx, hash, *blockNum)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func ReadBodyWithTransactions(db kv.Getter, hash common.Hash, number uint64) *ty
var err error
body.Transactions, err = CanonicalTransactions(db, baseTxId, txAmount)
if err != nil {
log.Error("failed ReadTransaction", "hash", hash, "block", number, "err", err)
log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err)
return nil
}
return body
Expand All @@ -402,7 +402,7 @@ func NonCanonicalBodyWithTransactions(db kv.Getter, hash common.Hash, number uin
var err error
body.Transactions, err = NonCanonicalTransactions(db, baseTxId, txAmount)
if err != nil {
log.Error("failed ReadTransaction", "hash", hash, "block", number, "err", err)
log.Error("failed ReadTransactionByHash", "hash", hash, "block", number, "err", err)
return nil
}
return body
Expand Down
35 changes: 32 additions & 3 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type TxLookupEntry struct {

// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
// hash to allow retrieving the transaction or receipt by hash.
func ReadTxLookupEntry(db kv.Tx, txnHash common.Hash) (*uint64, error) {
func ReadTxLookupEntry(db kv.Getter, txnHash common.Hash) (*uint64, error) {
data, err := db.GetOne(kv.TxLookup, txnHash.Bytes())
if err != nil {
return nil, err
Expand Down Expand Up @@ -63,9 +63,9 @@ func DeleteTxLookupEntry(db kv.Deleter, hash common.Hash) error {
return db.Delete(kv.TxLookup, hash.Bytes(), nil)
}

// ReadTransaction retrieves a specific transaction from the database, along with
// ReadTransactionByHash retrieves a specific transaction from the database, along with
// its added positional metadata.
func ReadTransaction(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash, uint64, uint64, error) {
func ReadTransactionByHash(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash, uint64, uint64, error) {
blockNumber, err := ReadTxLookupEntry(db, hash)
if err != nil {
return nil, common.Hash{}, 0, 0, err
Expand Down Expand Up @@ -99,6 +99,35 @@ func ReadTransaction(db kv.Tx, hash common.Hash) (types.Transaction, common.Hash
return nil, common.Hash{}, 0, 0, nil
}

// ReadTransaction retrieves a specific transaction from the database, along with
// its added positional metadata.
func ReadTransaction(db kv.Tx, hash common.Hash, blockNumber uint64) (types.Transaction, common.Hash, uint64, uint64, error) {
blockHash, err := ReadCanonicalHash(db, blockNumber)
if err != nil {
return nil, common.Hash{}, 0, 0, err
}
if blockHash == (common.Hash{}) {
return nil, common.Hash{}, 0, 0, nil
}
body := ReadBodyWithTransactions(db, blockHash, blockNumber)
if body == nil {
log.Error("Transaction referenced missing", "number", blockNumber, "hash", blockHash)
return nil, common.Hash{}, 0, 0, nil
}
senders, err1 := ReadSenders(db, blockHash, blockNumber)
if err1 != nil {
return nil, common.Hash{}, 0, 0, err1
}
body.SendersToTxs(senders)
for txIndex, tx := range body.Transactions {
if tx.Hash() == hash {
return tx, blockHash, blockNumber, uint64(txIndex), nil
}
}
log.Error("Transaction not found", "number", blockNumber, "hash", blockHash, "txhash", hash)
return nil, common.Hash{}, 0, 0, nil
}

func ReadReceipt(db kv.Tx, txHash common.Hash) (*types.Receipt, common.Hash, uint64, uint64, error) {
// Retrieve the context of the receipt based on the transaction hash
blockNumber, err := ReadTxLookupEntry(db, txHash)
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_indexes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestLookupStorage(t *testing.T) {

// Check that no transactions entries are in a pristine database
for i, txn := range txs {
if txn2, _, _, _, _ := ReadTransaction(tx, txn.Hash()); txn2 != nil {
if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil {
t.Fatalf("txn #%d [%x]: non existent transaction returned: %v", i, txn.Hash(), txn2)
}
}
Expand All @@ -73,7 +73,7 @@ func TestLookupStorage(t *testing.T) {
tc.writeTxLookupEntries(tx, block)

for i, txn := range txs {
if txn2, hash, number, index, _ := ReadTransaction(tx, txn.Hash()); txn2 == nil {
if txn2, hash, number, index, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 == nil {
t.Fatalf("txn #%d [%x]: transaction not found", i, txn.Hash())
} else {
if hash != block.Hash() || number != block.NumberU64() || index != uint64(i) {
Expand All @@ -89,7 +89,7 @@ func TestLookupStorage(t *testing.T) {
if err := DeleteTxLookupEntry(tx, txn.Hash()); err != nil {
t.Fatal(err)
}
if txn2, _, _, _, _ := ReadTransaction(tx, txn.Hash()); txn2 != nil {
if txn2, _, _, _, _ := ReadTransactionByHash(tx, txn.Hash()); txn2 != nil {
t.Fatalf("txn #%d [%x]: deleted transaction returned: %v", i, txn.Hash(), txn2)
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
expect := cfg.snapshots.ChainSnapshotConfig().ExpectBlocks
if headers < expect || bodies < expect || txs < expect {
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
if err := cfg.snapshots.BuildIndices(ctx, *chainID); err != nil {
if err := cfg.snapshots.BuildIndices(ctx, *chainID, cfg.tmpdir); err != nil {
return err
}
}
Expand Down
53 changes: 53 additions & 0 deletions turbo/cli/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/cmd/hack/tool"
Expand Down Expand Up @@ -40,6 +41,14 @@ var snapshotCommand = cli.Command{
SnapshotSegmentSizeFlag,
},
},
{
Name: "index",
Action: doIndicesCommand,
Usage: "Create all indices for snapshots",
Flags: []cli.Flag{
utils.DataDirFlag,
},
},
},
}

Expand All @@ -61,6 +70,22 @@ var (
}
)

func doIndicesCommand(cliCtx *cli.Context) error {
ctx, cancel := utils.RootContext()
defer cancel()

dataDir := cliCtx.String(utils.DataDirFlag.Name)
snapshotDir := path.Join(dataDir, "snapshots")
tmpDir := path.Join(dataDir, etl.TmpDirName)

chainDB := mdbx.NewMDBX(log.New()).Path(path.Join(dataDir, "chaindata")).Readonly().MustOpen()
defer chainDB.Close()

if err := rebuildIndices(ctx, chainDB, snapshotDir, tmpDir); err != nil {
log.Error("Error", "err", err)
}
return nil
}
func doSnapshotCommand(ctx *cli.Context) error {
fromBlock := ctx.Uint64(SnapshotFromFlag.Name)
toBlock := ctx.Uint64(SnapshotToFlag.Name)
Expand All @@ -80,6 +105,34 @@ func doSnapshotCommand(ctx *cli.Context) error {
return nil
}

func rebuildIndices(ctx context.Context, chainDB kv.RoDB, snapshotDir, tmpDir string) error {
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
_ = chainID
_ = os.MkdirAll(snapshotDir, fs.ModePerm)

workers := runtime.NumCPU() - 1
if workers < 1 {
workers = 1
}

allSnapshots := snapshotsync.NewAllSnapshots(snapshotDir, snapshothashes.KnownConfig(chainConfig.ChainName))
if err := allSnapshots.ReopenSegments(); err != nil {
return err
}
idxFilesList, err := snapshotsync.IdxFilesList(snapshotDir)
if err != nil {
return err
}
for _, f := range idxFilesList {
_ = os.Remove(f)
}
if err := allSnapshots.BuildIndices(ctx, *chainID, tmpDir); err != nil {
return err
}
return nil
}

func snapshotBlocks(chainDB kv.RoDB, fromBlock, toBlock, blocksPerFile uint64, snapshotDir string) error {
var last uint64

Expand Down
43 changes: 42 additions & 1 deletion turbo/snapshotsync/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ func (back *BlockReader) BlockWithSenders(ctx context.Context, tx kv.Tx, hash co
return rawdb.NonCanonicalBlockWithSenders(tx, hash, blockHeight)
}

func (back *BlockReader) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
n, err := rawdb.ReadTxLookupEntry(tx, txnHash)
if err != nil {
return 0, false, err
}
if n == nil {
return 0, false, nil
}
return *n, true, nil
}

type RemoteBlockReader struct {
client remote.ETHBACKENDClient
}
Expand Down Expand Up @@ -356,7 +367,7 @@ func (back *BlockReaderWithSnapshots) headerFromSnapshotByHash(hash common.Hash,
gg := sn.Headers.MakeGetter()
gg.Reset(headerOffset)
buf, _ = gg.Next(buf[:0])
if hash[0] != buf[1] {
if hash[0] != buf[0] {
return nil, nil
}

Expand Down Expand Up @@ -413,3 +424,33 @@ func (back *BlockReaderWithSnapshots) bodyFromSnapshot(blockHeight uint64, sn *B
body.Uncles = b.Uncles
return body, senders, b.BaseTxId, b.TxAmount, nil
}

func (back *BlockReaderWithSnapshots) TxnLookup(ctx context.Context, tx kv.Getter, txnHash common.Hash) (uint64, bool, error) {
n, err := rawdb.ReadTxLookupEntry(tx, txnHash)
if err != nil {
return 0, false, err
}
if n != nil {
return *n, true, nil
}

buf := make([]byte, 16)
for i := len(back.sn.blocks) - 1; i >= 0; i-- {
sn := back.sn.blocks[i]

localID := sn.TxnHashIdx.Lookup(txnHash[:])
offset := sn.TxnHashIdx.Lookup2(localID)
gg := sn.Transactions.MakeGetter()
gg.Reset(offset)
buf, _ = gg.Next(buf[:0])
if txnHash[0] != buf[0] {
continue
}

localID = sn.TxnHash2BlockNumIdx.Lookup(txnHash[:])
blockNum := sn.TxnHash2BlockNumIdx.Lookup2(localID)
return blockNum, true, nil
}

return 0, false, nil
}
Loading

0 comments on commit 13d9e71

Please sign in to comment.