Skip to content

Commit

Permalink
e3: kv/temporal prototype (#6367)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Dec 19, 2022
1 parent 27de232 commit dfa6505
Show file tree
Hide file tree
Showing 67 changed files with 697 additions and 1,082 deletions.
4 changes: 2 additions & 2 deletions cmd/erigon-cl/core/rawdb/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"fmt"
"math/big"

common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/utils"
"github.com/ledgerwatch/erigon/cmd/erigon-cl/core/state"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/consensus/serenity"
rawdb2 "github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -365,7 +365,7 @@ func ReadBeaconBlock(tx kv.RwTx, slot uint64) (*cltypes.SignedBeaconBlockBellatr
if err != nil {
return nil, err
}
if err := tx.ForAmount(kv.EthTx, dbutils.EncodeBlockNumber(body.BaseTxId+1), body.TxAmount-2, func(k, v []byte) error {
if err := tx.ForAmount(kv.EthTx, common2.EncodeTs(body.BaseTxId+1), body.TxAmount-2, func(k, v []byte) error {
payload.Transactions = append(payload.Transactions, v)
return nil
}); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (

"github.com/RoaringBitmap/roaring/roaring64"
"github.com/holiman/uint256"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
"github.com/ledgerwatch/erigon/turbo/debug"
Expand All @@ -34,7 +36,6 @@ import (
"github.com/ledgerwatch/erigon/cmd/hack/flow"
"github.com/ledgerwatch/erigon/cmd/hack/tool"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/paths"
"github.com/ledgerwatch/erigon/core"
Expand Down Expand Up @@ -317,7 +318,7 @@ func searchChangeSet(chaindata string, key []byte, block uint64) error {
}
defer tx.Rollback()

if err := changeset.ForEach(tx, kv.AccountChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error {
if err := historyv2.ForEach(tx, kv.AccountChangeSet, common2.EncodeTs(block), func(blockN uint64, k, v []byte) error {
if bytes.Equal(k, key) {
fmt.Printf("Found in block %d with value %x\n", blockN, v)
}
Expand All @@ -337,7 +338,7 @@ func searchStorageChangeSet(chaindata string, key []byte, block uint64) error {
return err1
}
defer tx.Rollback()
if err := changeset.ForEach(tx, kv.StorageChangeSet, dbutils.EncodeBlockNumber(block), func(blockN uint64, k, v []byte) error {
if err := historyv2.ForEach(tx, kv.StorageChangeSet, common2.EncodeTs(block), func(blockN uint64, k, v []byte) error {
if bytes.Equal(k, key) {
fmt.Printf("Found in block %d with value %x\n", blockN, v)
}
Expand Down Expand Up @@ -476,7 +477,7 @@ func extractHeaders(chaindata string, block uint64, blockTotalOrOffset int64) er
return err
}
defer c.Close()
blockEncoded := dbutils.EncodeBlockNumber(block)
blockEncoded := common2.EncodeTs(block)
blockTotal := getBlockTotal(tx, block, blockTotalOrOffset)
for k, v, err := c.Seek(blockEncoded); k != nil && blockTotal > 0; k, v, err = c.Next() {
if err != nil {
Expand Down Expand Up @@ -1001,7 +1002,7 @@ func scanReceipts2(chaindata string) error {
return err
}
defer tx.Rollback()
blockNum, err := changeset.AvailableFrom(tx)
blockNum, err := historyv2.AvailableFrom(tx)
if err != nil {
return err
}
Expand Down
15 changes: 0 additions & 15 deletions cmd/hack/tool/fromdb/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,10 @@ import (

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/hack/tool"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/params"
)

func HistoryV3(db kv.RoDB) (enabled bool) {
if err := db.View(context.Background(), func(tx kv.Tx) error {
var err error
enabled, err = rawdb.HistoryV3.Enabled(tx)
if err != nil {
return err
}
return nil
}); err != nil {
panic(err)
}
return
}

func ChainConfig(db kv.RoDB) (cc *params.ChainConfig) {
err := db.View(context.Background(), func(tx kv.Tx) error {
cc = tool.ChainConfig(tx)
Expand Down
3 changes: 2 additions & 1 deletion cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/rawdbhelpers"
reset2 "github.com/ledgerwatch/erigon/core/rawdb/rawdbreset"
Expand Down Expand Up @@ -78,7 +79,7 @@ func printStages(tx kv.Tx, snapshots *snapshotsync.RoSnapshots) error {
}
fmt.Fprintf(w, "--\n")
fmt.Fprintf(w, "prune distance: %s\n\n", pm.String())
h3, err := rawdb.HistoryV3.Enabled(tx)
h3, err := kvcfg.HistoryV3.Enabled(tx)
if err != nil {
return err
}
Expand Down
19 changes: 10 additions & 9 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/secp256k1"
Expand Down Expand Up @@ -297,7 +298,7 @@ var cmdForceSetHistoryV3 = &cobra.Command{
db := openDB(dbCfg(kv.ChainDB, chaindata), true)
defer db.Close()
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
return rawdb.HistoryV3.ForceWrite(tx, _forceSetHistoryV3)
return kvcfg.HistoryV3.ForceWrite(tx, _forceSetHistoryV3)
}); err != nil {
log.Error("Error", "err", err)
return
Expand Down Expand Up @@ -539,7 +540,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {

func stageBodies(db kv.RwDB, ctx context.Context) error {
_, _, sync, _, _ := newSync(ctx, db, nil)
chainConfig, historyV3 := fromdb.ChainConfig(db), fromdb.HistoryV3(db)
chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db)
sn, _ := allSnapshots(db)

if err := db.Update(ctx, func(tx kv.RwTx) error {
Expand Down Expand Up @@ -665,7 +666,7 @@ func stageSenders(db kv.RwDB, ctx context.Context) error {
}

func stageExec(db kv.RwDB, ctx context.Context) error {
chainConfig, historyV3, pm := fromdb.ChainConfig(db), fromdb.HistoryV3(db), fromdb.PruneMode(db)
chainConfig, historyV3, pm := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
dirs := datadir.New(datadirCli)
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.Execution))
Expand Down Expand Up @@ -731,7 +732,7 @@ func stageExec(db kv.RwDB, ctx context.Context) error {
}

func stageTrie(db kv.RwDB, ctx context.Context) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), fromdb.HistoryV3(db)
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.IntermediateHashes))
_, agg := allSnapshots(db)
Expand Down Expand Up @@ -782,7 +783,7 @@ func stageTrie(db kv.RwDB, ctx context.Context) error {
}

func stageHashState(db kv.RwDB, ctx context.Context) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), fromdb.HistoryV3(db)
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil)
must(sync.SetCurrentStage(stages.HashState))
_, agg := allSnapshots(db)
Expand Down Expand Up @@ -833,7 +834,7 @@ func stageHashState(db kv.RwDB, ctx context.Context) error {
}

func stageLogIndex(db kv.RwDB, ctx context.Context) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), fromdb.HistoryV3(db)
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
Expand Down Expand Up @@ -885,7 +886,7 @@ func stageLogIndex(db kv.RwDB, ctx context.Context) error {
}

func stageCallTraces(db kv.RwDB, ctx context.Context) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), fromdb.HistoryV3(db)
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
Expand Down Expand Up @@ -945,7 +946,7 @@ func stageCallTraces(db kv.RwDB, ctx context.Context) error {
}

func stageHistory(db kv.RwDB, ctx context.Context) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), fromdb.HistoryV3(db)
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
Expand Down Expand Up @@ -1153,7 +1154,7 @@ func getBlockReader(db kv.RoDB) (blockReader services.FullBlockReader) {

func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig) (consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
logger := log.New()
dirs, historyV3, pm := datadir.New(datadirCli), fromdb.HistoryV3(db), fromdb.PruneMode(db)
dirs, historyV3, pm := datadir.New(datadirCli), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)

vmConfig := &vm.Config{}

Expand Down
29 changes: 15 additions & 14 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"

"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/debugprint"
"github.com/ledgerwatch/erigon/core"
Expand Down Expand Up @@ -148,7 +149,7 @@ func init() {

func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.Context) error {
engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig)
chainConfig, historyV3, pm := fromdb.ChainConfig(db), fromdb.HistoryV3(db), fromdb.PruneMode(db)
chainConfig, historyV3, pm := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), fromdb.PruneMode(db)
dirs := datadir.New(datadirCli)
_, agg := allSnapshots(db)

Expand All @@ -163,8 +164,8 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))

expectedAccountChanges := make(map[uint64]*changeset.ChangeSet)
expectedStorageChanges := make(map[uint64]*changeset.ChangeSet)
expectedAccountChanges := make(map[uint64]*historyv2.ChangeSet)
expectedStorageChanges := make(map[uint64]*historyv2.ChangeSet)
changeSetHook := func(blockNum uint64, csw *state.ChangeSetWriter) {
if csw == nil {
return
Expand Down Expand Up @@ -380,7 +381,7 @@ func syncBySmallSteps(db kv.RwDB, miningConfig params.MiningConfig, ctx context.
return nil
}

func checkChanges(expectedAccountChanges map[uint64]*changeset.ChangeSet, tx kv.Tx, expectedStorageChanges map[uint64]*changeset.ChangeSet, execAtBlock, prunedTo uint64) error {
func checkChanges(expectedAccountChanges map[uint64]*historyv2.ChangeSet, tx kv.Tx, expectedStorageChanges map[uint64]*historyv2.ChangeSet, execAtBlock, prunedTo uint64) error {
checkHistoryFrom := execAtBlock
if prunedTo > checkHistoryFrom {
checkHistoryFrom = prunedTo
Expand Down Expand Up @@ -422,7 +423,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *params.ChainConfig) {
func loopIh(db kv.RwDB, ctx context.Context, unwind uint64) error {
_, _, sync, _, _ := newSync(ctx, db, nil)
dirs := datadir.New(datadirCli)
historyV3 := fromdb.HistoryV3(db)
historyV3 := kvcfg.HistoryV3.FromDB(db)
_, agg := allSnapshots(db)

tx, err := db.BeginRw(ctx)
Expand Down Expand Up @@ -506,7 +507,7 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
sync.EnableStages(stages.Execution)
var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
historyV3, err := rawdb.HistoryV3.Enabled(tx)
historyV3, err := kvcfg.HistoryV3.Enabled(tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -553,10 +554,10 @@ func loopExec(db kv.RwDB, ctx context.Context, unwind uint64) error {
}
}

func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset.ChangeSet, expectedStorageChanges *changeset.ChangeSet) error {
func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *historyv2.ChangeSet, expectedStorageChanges *historyv2.ChangeSet) error {
i := 0
sort.Sort(expectedAccountChanges)
err := changeset.ForPrefix(db, kv.AccountChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
err := historyv2.ForPrefix(db, kv.AccountChangeSet, common2.EncodeTs(blockNum), func(blockN uint64, k, v []byte) error {
c := expectedAccountChanges.Changes[i]
i++
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
Expand All @@ -577,12 +578,12 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset
return fmt.Errorf("db has less changesets")
}
if expectedStorageChanges == nil {
expectedStorageChanges = changeset.NewChangeSet()
expectedStorageChanges = historyv2.NewChangeSet()
}

i = 0
sort.Sort(expectedStorageChanges)
err = changeset.ForPrefix(db, kv.StorageChangeSet, dbutils.EncodeBlockNumber(blockNum), func(blockN uint64, k, v []byte) error {
err = historyv2.ForPrefix(db, kv.StorageChangeSet, common2.EncodeTs(blockNum), func(blockN uint64, k, v []byte) error {
c := expectedStorageChanges.Changes[i]
i++
if bytes.Equal(c.Key, k) && bytes.Equal(c.Value, v) {
Expand All @@ -607,9 +608,9 @@ func checkChangeSet(db kv.Tx, blockNum uint64, expectedAccountChanges *changeset
}

func checkHistory(tx kv.Tx, changeSetBucket string, blockNum uint64) error {
indexBucket := changeset.Mapper[changeSetBucket].IndexBucket
blockNumBytes := dbutils.EncodeBlockNumber(blockNum)
if err := changeset.ForEach(tx, changeSetBucket, blockNumBytes, func(blockN uint64, address, v []byte) error {
indexBucket := historyv2.Mapper[changeSetBucket].IndexBucket
blockNumBytes := common2.EncodeTs(blockNum)
if err := historyv2.ForEach(tx, changeSetBucket, blockNumBytes, func(blockN uint64, address, v []byte) error {
k := dbutils.CompositeKeyWithoutIncarnation(address)
from := blockN
if from > 0 {
Expand Down
8 changes: 4 additions & 4 deletions cmd/rpcdaemon/commands/erigon_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"sort"

"github.com/holiman/uint256"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/hexutil"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -202,9 +202,9 @@ func (api *ErigonImpl) GetBalanceChangesInBlock(ctx context.Context, blockNrOrHa
}
defer c.Close()

startkey := dbutils.EncodeBlockNumber(blockNumber)
startkey := common2.EncodeTs(blockNumber)

decodeFn := changeset.Mapper[kv.AccountChangeSet].Decode
decodeFn := historyv2.Mapper[kv.AccountChangeSet].Decode

balancesMapping := make(map[common.Address]*hexutil.Big)

Expand Down
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/commands/erigon_receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"fmt"

"github.com/RoaringBitmap/roaring"
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/bitmapdb"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/filters"
Expand Down Expand Up @@ -143,7 +143,7 @@ func (api *ErigonImpl) GetLogs(ctx context.Context, crit filters.FilterCriteria)
var logIndex uint
var txIndex uint
var blockLogs []*types.Log
err := tx.ForPrefix(kv.Log, dbutils.EncodeBlockNumber(blockNumber), func(k, v []byte) error {
err := tx.ForPrefix(kv.Log, common2.EncodeTs(blockNumber), func(k, v []byte) error {
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return fmt.Errorf("receipt unmarshal failed: %w", err)
Expand Down Expand Up @@ -304,7 +304,7 @@ func (api *ErigonImpl) GetLatestLogs(ctx context.Context, crit filters.FilterCri
var logIndex uint
var txIndex uint
var blockLogs []*types.Log
err := tx.ForPrefix(kv.Log, dbutils.EncodeBlockNumber(blockNumber), func(k, v []byte) error {
err := tx.ForPrefix(kv.Log, common2.EncodeTs(blockNumber), func(k, v []byte) error {
var logs types.Logs
if err := cbor.Unmarshal(&logs, bytes.NewReader(v)); err != nil {
return fmt.Errorf("receipt unmarshal failed: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"

Expand Down Expand Up @@ -207,7 +208,7 @@ func (api *BaseAPI) historyV3(tx kv.Tx) bool {
if historyV3 != nil {
return *historyV3
}
enabled, err := rawdb.HistoryV3.Enabled(tx)
enabled, err := kvcfg.HistoryV3.Enabled(tx)
if err != nil {
log.Warn("HisoryV2Enabled: read", "err", err)
return false
Expand Down
Loading

0 comments on commit dfa6505

Please sign in to comment.