From 17a595f82b120694dca71cb4e2ad5885ad440fed Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 13 Aug 2024 09:52:16 +0700 Subject: [PATCH] chain-tip perf: share worker between execs (#11542) --- cmd/state/exec3/state.go | 11 ++++------- eth/stagedsync/exec3.go | 4 +++- eth/stagedsync/stage_execute.go | 3 +++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cmd/state/exec3/state.go b/cmd/state/exec3/state.go index 85606047c8b..791d2802fc4 100644 --- a/cmd/state/exec3/state.go +++ b/cmd/state/exec3/state.go @@ -71,18 +71,14 @@ type Worker struct { dirs datadir.Dirs } -func NewWorker(lock sync.Locker, logger log.Logger, accumulator *shards.Accumulator, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { - +func NewWorker(lock sync.Locker, logger log.Logger, ctx context.Context, background bool, chainDb kv.RoDB, in *state.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *state.ResultsQueue, engine consensus.Engine, dirs datadir.Dirs) *Worker { w := &Worker{ lock: lock, logger: logger, chainDb: chainDb, in: in, - rs: rs, background: background, blockReader: blockReader, - stateWriter: state.NewStateWriterV3(rs, accumulator), - stateReader: state.NewStateReaderV3(rs.Domains()), chainConfig: chainConfig, ctx: ctx, @@ -298,7 +294,8 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo ctx, cancel := context.WithCancel(ctx) g, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { - reconWorkers[i] = NewWorker(lock, logger, accumulator, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + reconWorkers[i] = NewWorker(lock, logger, ctx, background, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs) + reconWorkers[i].ResetState(rs, accumulator) } if background { for i := 0; i < workerCount; i++ { @@ -324,7 +321,7 @@ func NewWorkersPool(lock sync.Locker, accumulator *shards.Accumulator, logger lo //applyWorker.ResetTx(nil) } } - applyWorker = NewWorker(lock, logger, accumulator, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine, dirs) + applyWorker = NewWorker(lock, logger, ctx, false, chainDb, in, blockReader, chainConfig, genesis, rws, engine, dirs) return reconWorkers, applyWorker, rws, clear, wait } diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 12ec509d657..0e8876bc5f4 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -356,8 +356,10 @@ func ExecV3(ctx context.Context, rwsConsumed := make(chan struct{}, 1) defer close(rwsConsumed) - execWorkers, applyWorker, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) + execWorkers, _, rws, stopWorkers, waitWorkers := exec3.NewWorkersPool(lock.RLocker(), accumulator, logger, ctx, parallel, chainDb, rs, in, blockReader, chainConfig, genesis, engine, workerCount+1, cfg.dirs) defer stopWorkers() + applyWorker := cfg.applyWorker + applyWorker.ResetState(rs, accumulator) applyWorker.DiscardReadList() commitThreshold := batchSize.Bytes() diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index d6754bb5673..5081a503532 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -23,6 +23,7 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/erigontech/erigon/cmd/state/exec3" "golang.org/x/sync/errgroup" "github.com/erigontech/erigon-lib/chain" @@ -84,6 +85,7 @@ type ExecuteBlockCfg struct { silkworm *silkworm.Silkworm blockProduction bool + applyWorker *exec3.Worker } func StageExecuteBlocksCfg( @@ -125,6 +127,7 @@ func StageExecuteBlocksCfg( historyV3: true, syncCfg: syncCfg, silkworm: silkworm, + applyWorker: exec3.NewWorker(nil, log.Root(), context.Background(), false, db, nil, blockReader, chainConfig, genesis, nil, engine, dirs), } }