Skip to content

Commit

Permalink
WIP: cmd, turbo, core, eth: TransactionsV3 flag and persist in new ta…
Browse files Browse the repository at this point in the history
  • Loading branch information
nanevardanyan authored Feb 24, 2023
1 parent 4525db6 commit ab6239b
Show file tree
Hide file tree
Showing 44 changed files with 270 additions and 343 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewTestSimulatedBackendWithConfig(t *testing.T, alloc core.GenesisAlloc, co
func (b *SimulatedBackend) DB() kv.RwDB { return b.m.DB }
func (b *SimulatedBackend) Agg() *state2.AggregatorV3 { return b.m.HistoryV3Components() }
func (b *SimulatedBackend) BlockReader() *snapshotsync.BlockReaderWithSnapshots {
return snapshotsync.NewBlockReaderWithSnapshots(b.m.BlockSnapshots)
return snapshotsync.NewBlockReaderWithSnapshots(b.m.BlockSnapshots, b.m.TransactionsV3)
}
func (b *SimulatedBackend) HistoryV3() bool { return b.m.HistoryV3 }
func (b *SimulatedBackend) Engine() consensus.Engine { return b.m.Engine }
Expand Down
22 changes: 11 additions & 11 deletions cmd/erigon-el/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return err
}

config.TransactionsV3, err = kvcfg.TransactionsV3.WriteOnce(tx, config.TransactionsV3)
if err != nil {
return err
}

// if we are in the incorrect syncmode then we change it to the appropriate one
if !isCorrectSync {
log.Warn("Incorrect snapshot enablement", "got", config.Sync.UseSnapshots, "change_to", useSnapshots)
Expand Down Expand Up @@ -268,7 +273,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
allSnapshots *snapshotsync.RoSnapshots
agg *libstate.AggregatorV3
)
backend.blockReader, allSnapshots, agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader)
backend.blockReader, allSnapshots, agg, err = backend.setUpBlockReader(ctx, config.Dirs, config.Snapshot, config.Downloader, config.TransactionsV3)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -477,7 +482,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
mining := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miner, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, nil, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB),
stagedsync.StageMiningExecCfg(backend.chainDB, miner, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, nil, 0, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3),
stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg),
stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, backend.blockReader, nil, config.HistoryV3, backend.agg),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miner, backend.miningSealingQuit),
Expand All @@ -495,7 +500,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
proposingSync := stagedsync.New(
stagedsync.MiningStages(backend.sentryCtx,
stagedsync.StageMiningCreateBlockCfg(backend.chainDB, miningStatePos, *backend.chainConfig, backend.engine, backend.txPool2, backend.txPool2DB, param, tmpdir),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB),
stagedsync.StageMiningExecCfg(backend.chainDB, miningStatePos, backend.notifications.Events, *backend.chainConfig, backend.engine, &vm.Config{}, tmpdir, interrupt, param.PayloadId, backend.txPool2, backend.txPool2DB, allSnapshots, config.TransactionsV3),
stagedsync.StageHashStateCfg(backend.chainDB, dirs, config.HistoryV3, backend.agg),
stagedsync.StageTrieCfg(backend.chainDB, false, true, true, tmpdir, backend.blockReader, nil, config.HistoryV3, backend.agg),
stagedsync.StageMiningFinishCfg(backend.chainDB, *backend.chainConfig, backend.engine, miningStatePos, backend.miningSealingQuit),
Expand Down Expand Up @@ -612,7 +617,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return nil, err
}

backend.stagedSync, err = stages3.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator, backend.engine)
backend.stagedSync, err = stages3.NewStagedSync(backend.sentryCtx, backend.chainDB, stack.Config().P2P, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, allSnapshots, backend.agg, backend.forkValidator, backend.engine, config.TransactionsV3)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -857,18 +862,13 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
}

// sets up blockReader and client downloader
func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
if !snConfig.Enabled {
blockReader := snapshotsync.NewBlockReader()
return blockReader, nil, nil, nil
}

func (s *Ethereum) setUpBlockReader(ctx context.Context, dirs datadir.Dirs, snConfig ethconfig.Snapshot, downloaderCfg *downloadercfg.Cfg, transactionsV3 bool) (services.FullBlockReader, *snapshotsync.RoSnapshots, *libstate.AggregatorV3, error) {
allSnapshots := snapshotsync.NewRoSnapshots(snConfig, dirs.Snap)
var err error
if !snConfig.NoDownloader {
allSnapshots.OptimisticalyReopenWithDB(s.chainDB)
}
blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
blockReader := snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, transactionsV3)

if !snConfig.NoDownloader {
if snConfig.DownloaderAddr != "" {
Expand Down
10 changes: 3 additions & 7 deletions cmd/erigon-el/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/turbo/engineapi"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
Expand All @@ -42,14 +41,10 @@ func NewStagedSync(ctx context.Context,
agg *state.AggregatorV3,
forkValidator *engineapi.ForkValidator,
engine consensus.Engine,
transactionsV3 bool,
) (*stagedsync.Sync, error) {
dirs := cfg.Dirs
var blockReader services.FullBlockReader
if cfg.Snapshot.Enabled {
blockReader = snapshotsync.NewBlockReaderWithSnapshots(snapshots)
} else {
blockReader = snapshotsync.NewBlockReader()
}
blockReader := snapshotsync.NewBlockReaderWithSnapshots(snapshots, transactionsV3)
blockRetire := snapshotsync.NewBlockRetire(1, dirs.Tmp, snapshots, db, snapDownloader, notifications.Events)

// During Import we don't want other services like header requests, body requests etc. to be running.
Expand Down Expand Up @@ -100,6 +95,7 @@ func NewStagedSync(ctx context.Context,
snapshots,
blockReader,
cfg.HistoryV3,
cfg.TransactionsV3,
),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, false, dirs.Tmp, cfg.Prune, blockRetire, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(
Expand Down
14 changes: 6 additions & 8 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func stageBodies(db kv.RwDB, ctx context.Context) error {
sn, agg := allSnapshots(ctx, db)
defer sn.Close()
defer agg.Close()
chainConfig, historyV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db)
chainConfig, historyV3, transactionsV3 := fromdb.ChainConfig(db), kvcfg.HistoryV3.FromDB(db), kvcfg.TransactionsV3.FromDB(db)
_, _, sync, _, _ := newSync(ctx, db, nil)

if err := db.Update(ctx, func(tx kv.RwTx) error {
Expand All @@ -597,7 +597,7 @@ func stageBodies(db kv.RwDB, ctx context.Context) error {
}

u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber)
if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, getBlockReader(db), historyV3), ctx); err != nil {
if err := stagedsync.UnwindBodiesStage(u, tx, stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, *chainConfig, sn, getBlockReader(db), historyV3, transactionsV3), ctx); err != nil {
return err
}

Expand Down Expand Up @@ -1216,11 +1216,9 @@ var _blockReaderSingleton services.FullBlockReader

func getBlockReader(db kv.RoDB) (blockReader services.FullBlockReader) {
openBlockReaderOnce.Do(func() {
_blockReaderSingleton = snapshotsync.NewBlockReader()
if sn, _ := allSnapshots(context.Background(), db); sn.Cfg().Enabled {
x := snapshotsync.NewBlockReaderWithSnapshots(sn)
_blockReaderSingleton = x
}
sn, _ := allSnapshots(context.Background(), db)
transactionsV3 := kvcfg.TransactionsV3.FromDB(db)
_blockReaderSingleton = snapshotsync.NewBlockReaderWithSnapshots(sn, transactionsV3)
})
return _blockReaderSingleton
}
Expand Down Expand Up @@ -1292,7 +1290,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
miningSync := stagedsync.New(
stagedsync.MiningStages(ctx,
stagedsync.StageMiningCreateBlockCfg(db, miner, *chainConfig, engine, nil, nil, nil, dirs.Tmp),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil),
stagedsync.StageMiningExecCfg(db, miner, events, *chainConfig, engine, &vm.Config{}, dirs.Tmp, nil, 0, nil, nil, allSn, cfg.TransactionsV3),
stagedsync.StageHashStateCfg(db, dirs, historyV3, agg),
stagedsync.StageTrieCfg(db, false, true, false, dirs.Tmp, br, nil, historyV3, agg),
stagedsync.StageMiningFinishCfg(db, *chainConfig, engine, miner, miningCancel),
Expand Down
117 changes: 56 additions & 61 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,73 +344,68 @@ func RemoteServices(ctx context.Context, cfg httpcfg.HttpCfg, logger log.Logger,
}

// Configure sapshots
if cfg.Snap.Enabled {
allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allSnapshots.LogStat()

if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder()

db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
return nil
})
onNewSnapshot = func() {
go func() { // don't block events processing by network communication
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
if err != nil {
log.Warn("[snapshots] reopen", "err", err)
return
}
if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
log.Error("[snapshots] reopen", "err", err)
} else {
allSnapshots.LogStat()
}

_ = reply.HistoryFiles

if err = agg.OpenFolder(); err != nil {
log.Error("[snapshots] reopen", "err", err)
} else {
db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
return nil
})
}
}()
}
onNewSnapshot()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
allSnapshots = snapshotsync.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allSnapshots.LogStat()

if agg, err = libstate.NewAggregatorV3(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, ethconfig.HistoryV3AggregationStep, db); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder()

var histV3Enabled bool
_ = db.View(ctx, func(tx kv.Tx) error {
histV3Enabled, _ = kvcfg.HistoryV3.Enabled(tx)
return nil
db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
if histV3Enabled {
log.Info("HistoryV3", "enable", histV3Enabled)
db, err = temporal.New(rwKv, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[cc.ChainName])
return nil
})
onNewSnapshot = func() {
go func() { // don't block events processing by network communication
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
log.Warn("[snapshots] reopen", "err", err)
return
}
if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
log.Error("[snapshots] reopen", "err", err)
} else {
allSnapshots.LogStat()
}

_ = reply.HistoryFiles

if err = agg.OpenFolder(); err != nil {
log.Error("[snapshots] reopen", "err", err)
} else {
db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
return nil
})
}
}()
}
onNewSnapshot()
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots, ethconfig.Defaults.TransactionsV3)

var histV3Enabled bool
_ = db.View(ctx, func(tx kv.Tx) error {
histV3Enabled, _ = kvcfg.HistoryV3.Enabled(tx)
return nil
})
if histV3Enabled {
log.Info("HistoryV3", "enable", histV3Enabled)
db, err = temporal.New(rwKv, agg, accounts.ConvertV3toV2, historyv2read.RestoreCodeHash, accounts.DecodeIncarnationFromStorage, systemcontracts.SystemContractCodeLookup[cc.ChainName])
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, nil, nil, err
}
stateCache = kvcache.NewDummy()
} else {
blockReader = snapshotsync.NewBlockReader()
stateCache = kvcache.NewDummy()
}
stateCache = kvcache.NewDummy()
}
// If DB can't be configured - used PrivateApiAddr as remote DB
if db == nil {
Expand Down
8 changes: 4 additions & 4 deletions cmd/rpcdaemon/commands/call_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestCallTraceOneByOne(t *testing.T) {
}

agg := m.HistoryV3Components()
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)
api := NewTraceAPI(
NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine),
m.DB, &httpcfg.HttpCfg{})
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestCallTraceUnwind(t *testing.T) {
}

agg := m.HistoryV3Components()
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)
api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{})
if err = m.InsertChain(chainA); err != nil {
t.Fatalf("inserting chainA: %v", err)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestFilterNoAddresses(t *testing.T) {
t.Fatalf("generate chain: %v", err)
}
agg := m.HistoryV3Components()
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)
api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{})
// Insert blocks 1 by 1, to tirgget possible "off by one" errors
for i := 0; i < chain.Length(); i++ {
Expand All @@ -191,7 +191,7 @@ func TestFilterNoAddresses(t *testing.T) {
func TestFilterAddressIntersection(t *testing.T) {
m := stages.Mock(t)
agg := m.HistoryV3Components()
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)
api := NewTraceAPI(NewBaseApi(nil, kvcache.New(kvcache.DefaultCoherentConfig), br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine), m.DB, &httpcfg.HttpCfg{})

toAddress1, toAddress2, other := common.Address{1}, common.Address{2}, common.Address{3}
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/commands/corner_cases_support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNotFoundMustReturnNil(t *testing.T) {
require := require.New(t)
m, _, _ := rpcdaemontest.CreateTestSentry(t)
agg := m.HistoryV3Components()
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots)
br := snapshotsync.NewBlockReaderWithSnapshots(m.BlockSnapshots, m.TransactionsV3)
stateCache := kvcache.New(kvcache.DefaultCoherentConfig)
api := NewEthAPI(
NewBaseApi(nil, stateCache, br, agg, false, rpccfg.DefaultEvmCallTimeout, m.Engine),
Expand Down
Loading

0 comments on commit ab6239b

Please sign in to comment.