Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/devel' into erigon-cl
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 committed Apr 10, 2023
2 parents 58c1978 + db52bba commit 4550053
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 254 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ db-tools:

go mod vendor
cd vendor/github.com/torquem-ch/mdbx-go && MDBX_BUILD_TIMESTAMP=unknown make tools
mkdir -p $(GOBIN)
cd vendor/github.com/torquem-ch/mdbx-go/mdbxdist && cp mdbx_chk $(GOBIN) && cp mdbx_copy $(GOBIN) && cp mdbx_dump $(GOBIN) && cp mdbx_drop $(GOBIN) && cp mdbx_load $(GOBIN) && cp mdbx_stat $(GOBIN)
rm -rf vendor
@echo "Run \"$(GOBIN)/mdbx_stat -h\" to get info about mdbx db file."
Expand Down
8 changes: 8 additions & 0 deletions cmd/rpcdaemon/commands/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (api *PrivateDebugAPIImpl) traceBlock(ctx context.Context, blockNrOrHash rp
return err
}

if config == nil {
config = &tracers.TraceConfig{}
}

if config.BorTraceEnabled == nil {
config.BorTraceEnabled = newBoolPtr(false)
}
Expand Down Expand Up @@ -323,6 +327,10 @@ func (api *PrivateDebugAPIImpl) TraceCallMany(ctx context.Context, bundles []Bun
baseFee uint256.Int
)

if config == nil {
config = &tracers.TraceConfig{}
}

overrideBlockHash = make(map[uint64]common.Hash)
tx, err := api.db.BeginRo(ctx)
if err != nil {
Expand Down
174 changes: 173 additions & 1 deletion cmd/state/e3types/txtask.go → cmd/state/exec22/txtask.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package e3types
package exec22

import (
"container/heap"
Expand Down Expand Up @@ -50,6 +50,7 @@ type TxTask struct {
UsedGas uint64
}

// TxTaskQueue non-thread-safe priority-queue
type TxTaskQueue []*TxTask

func (h TxTaskQueue) Len() int {
Expand Down Expand Up @@ -145,6 +146,9 @@ func (q *QueueWithRetry) ReTry(t *TxTask) {
q.retiresLock.Lock()
heap.Push(&q.retires, t)
q.retiresLock.Unlock()
if q.closed {
return
}
select {
case q.newTasks <- nil:
default:
Expand Down Expand Up @@ -206,6 +210,7 @@ func (q *QueueWithRetry) popNoWait() (task *TxTask, ok bool) {
select {
case task, ok = <-q.newTasks:
if !ok {

return nil, false
}
default:
Expand All @@ -223,3 +228,170 @@ func (q *QueueWithRetry) Close() {
q.closed = true
close(q.newTasks)
}

// ResultsQueue thread-safe priority-queue of execution results
type ResultsQueue struct {
limit int
closed bool

resultCh chan *TxTask
iter *ResultsQueueIter

sync.Mutex
results *TxTaskQueue
}

func NewResultsQueue(newTasksLimit, queueLimit int) *ResultsQueue {
r := &ResultsQueue{
results: &TxTaskQueue{},
limit: queueLimit,
resultCh: make(chan *TxTask, newTasksLimit),
}
heap.Init(r.results)
r.iter = &ResultsQueueIter{q: r, results: r.results}
return r
}

// Add result of execution. May block when internal channel is full
func (q *ResultsQueue) Add(ctx context.Context, task *TxTask) error {
select {
case q.resultCh <- task: // Needs to have outside of the lock
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (q *ResultsQueue) drainNoBlock(task *TxTask) {
q.Lock()
defer q.Unlock()
if task != nil {
heap.Push(q.results, task)
}

for {
select {
case txTask, ok := <-q.resultCh:
if !ok {
return
}
if txTask != nil {
heap.Push(q.results, txTask)
}
default: // we are inside mutex section, can't block here
return
}
}
}

func (q *ResultsQueue) Iter() *ResultsQueueIter {
q.Lock()
q.iter.needUnlock = true
return q.iter
}
func (q *ResultsQueue) IterLocked() *ResultsQueueIter {
q.iter.needUnlock = false
return q.iter
}

type ResultsQueueIter struct {
q *ResultsQueue
results *TxTaskQueue //pointer to `q.results` - just to reduce amount of dereferences
needUnlock bool
}

func (q *ResultsQueueIter) Close() {
if q.needUnlock {
q.q.Unlock()
}
}
func (q *ResultsQueueIter) HasNext(outputTxNum uint64) bool {
return len(*q.results) > 0 && (*q.results)[0].TxNum == outputTxNum
}
func (q *ResultsQueueIter) PopNext() *TxTask {
return heap.Pop(q.results).(*TxTask)
}

func (q *ResultsQueue) Drain(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case txTask, ok := <-q.resultCh:
if !ok {
return nil
}
q.drainNoBlock(txTask)
}
return nil
}
func (q *ResultsQueue) DrainNonBlocking() { q.drainNoBlock(nil) }

func (q *ResultsQueue) DrainLocked() {
var drained bool
for !drained {
select {
case txTask, ok := <-q.resultCh:
if !ok {
return
}
heap.Push(q.results, txTask)
default:
drained = true
}
}
}
func (q *ResultsQueue) DropResults(f func(t *TxTask)) {
q.Lock()
defer q.Unlock()
Loop:
for {
select {
case txTask, ok := <-q.resultCh:
if !ok {
break Loop
}
f(txTask)
default:
break Loop
}
}

// Drain results queue as well
for q.results.Len() > 0 {
f(heap.Pop(q.results).(*TxTask))
}
}

func (q *ResultsQueue) Close() {
if q.closed {
return
}
q.closed = true
close(q.resultCh)
}
func (q *ResultsQueue) ResultChLen() int { return len(q.resultCh) }
func (q *ResultsQueue) ResultChCap() int { return cap(q.resultCh) }
func (q *ResultsQueue) Limit() int { return q.limit }
func (q *ResultsQueue) Len() (l int) {
q.Lock()
l = q.results.Len()
q.Unlock()
return l
}
func (q *ResultsQueue) FirstTxNumLocked() uint64 { return (*q.results)[0].TxNum }
func (q *ResultsQueue) LenLocked() (l int) { return q.results.Len() }
func (q *ResultsQueue) HasLocked() bool { return len(*q.results) > 0 }
func (q *ResultsQueue) PushLocked(t *TxTask) { heap.Push(q.results, t) }
func (q *ResultsQueue) Push(t *TxTask) {
q.Lock()
heap.Push(q.results, t)
q.Unlock()
}
func (q *ResultsQueue) PopLocked() (t *TxTask) {
return heap.Pop(q.results).(*TxTask)
}
func (q *ResultsQueue) Dbg() (t *TxTask) {
if len(*q.results) > 0 {
return (*q.results)[0]
}
return nil
}
35 changes: 16 additions & 19 deletions cmd/state/exec3/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"

"github.com/ledgerwatch/erigon/cmd/state/e3types"
"github.com/ledgerwatch/erigon/cmd/state/exec22"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
Expand All @@ -30,7 +30,7 @@ type Worker struct {
chainTx kv.Tx
background bool // if true - worker does manage RoTx (begin/rollback) in .ResetTx()
blockReader services.FullBlockReader
in *e3types.QueueWithRetry
in *exec22.QueueWithRetry
rs *state.StateV3
stateWriter *state.StateWriterV3
stateReader *state.StateReaderV3
Expand All @@ -39,9 +39,8 @@ type Worker struct {

ctx context.Context
engine consensus.Engine
logger log.Logger
genesis *types.Genesis
resultCh chan *e3types.TxTask
resultCh *exec22.ResultsQueue
chain ChainReader
isPoSA bool
posa consensus.PoSA
Expand All @@ -53,7 +52,7 @@ type Worker struct {
ibs *state.IntraBlockState
}

func NewWorker(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *e3types.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, logger log.Logger, genesis *types.Genesis, resultCh chan *e3types.TxTask, engine consensus.Engine) *Worker {
func NewWorker(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *exec22.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, results *exec22.ResultsQueue, engine consensus.Engine) *Worker {
w := &Worker{
lock: lock,
chainDb: chainDb,
Expand All @@ -66,9 +65,8 @@ func NewWorker(lock sync.Locker, ctx context.Context, background bool, chainDb k
chainConfig: chainConfig,

ctx: ctx,
logger: logger,
genesis: genesis,
resultCh: resultCh,
resultCh: results,
engine: engine,

evm: vm.NewEVM(evmtypes.BlockContext{}, evmtypes.TxContext{}, nil, chainConfig, vm.Config{}),
Expand Down Expand Up @@ -106,22 +104,20 @@ func (rw *Worker) ResetTx(chainTx kv.Tx) {
func (rw *Worker) Run() error {
for txTask, ok := rw.in.Next(rw.ctx); ok; txTask, ok = rw.in.Next(rw.ctx) {
rw.RunTxTask(txTask)
select {
case rw.resultCh <- txTask: // Needs to have outside of the lock
case <-rw.ctx.Done():
return rw.ctx.Err()
if err := rw.resultCh.Add(rw.ctx, txTask); err != nil {
return err
}
}
return nil
}

func (rw *Worker) RunTxTask(txTask *e3types.TxTask) {
func (rw *Worker) RunTxTask(txTask *exec22.TxTask) {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.RunTxTaskNoLock(txTask)
}

func (rw *Worker) RunTxTaskNoLock(txTask *e3types.TxTask) {
func (rw *Worker) RunTxTaskNoLock(txTask *exec22.TxTask) {
if rw.background && rw.chainTx == nil {
var err error
if rw.chainTx, err = rw.chainDb.BeginRo(rw.ctx); err != nil {
Expand Down Expand Up @@ -286,18 +282,18 @@ func (cr ChainReader) GetTd(hash libcommon.Hash, number uint64) *big.Int {
return td
}

func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *e3types.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, logger log.Logger, genesis *types.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, resultCh chan *e3types.TxTask, clear func(), wait func()) {
func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chainDb kv.RoDB, rs *state.StateV3, in *exec22.QueueWithRetry, blockReader services.FullBlockReader, chainConfig *chain.Config, genesis *types.Genesis, engine consensus.Engine, workerCount int) (reconWorkers []*Worker, applyWorker *Worker, rws *exec22.ResultsQueue, clear func(), wait func()) {
reconWorkers = make([]*Worker, workerCount)

resultChSize := workerCount * 8
resultCh = make(chan *e3types.TxTask, resultChSize)
rws = exec22.NewResultsQueue(resultChSize, workerCount) // workerCount * 4
{
// we all errors in background workers (except ctx.Cancele), because applyLoop will detect this error anyway.
// and in applyLoop all errors are critical
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, in, blockReader, chainConfig, logger, genesis, resultCh, engine)
reconWorkers[i] = NewWorker(lock, ctx, background, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine)
}
if background {
for i := 0; i < workerCount; i++ {
Expand All @@ -321,10 +317,11 @@ func NewWorkersPool(lock sync.Locker, ctx context.Context, background bool, chai
w.ResetTx(nil)
}
//applyWorker.ResetTx(nil)
close(resultCh)
log.Warn("before rws.Close()")
rws.Close()
}
}
applyWorker = NewWorker(lock, ctx, false, chainDb, rs, in, blockReader, chainConfig, logger, genesis, resultCh, engine)
applyWorker = NewWorker(lock, ctx, false, chainDb, rs, in, blockReader, chainConfig, genesis, rws, engine)

return reconWorkers, applyWorker, resultCh, clear, wait
return reconWorkers, applyWorker, rws, clear, wait
}
4 changes: 2 additions & 2 deletions cmd/state/exec3/state_recon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/cmd/state/e3types"
"github.com/ledgerwatch/erigon/cmd/state/exec22"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/misc"
Expand Down Expand Up @@ -286,7 +286,7 @@ func (rw *ReconWorker) Run() error {

var noop = state.NewNoopWriter()

func (rw *ReconWorker) runTxTask(txTask *e3types.TxTask) error {
func (rw *ReconWorker) runTxTask(txTask *exec22.TxTask) error {
rw.lock.Lock()
defer rw.lock.Unlock()
rw.stateReader.SetTxNum(txTask.TxNum)
Expand Down
Loading

0 comments on commit 4550053

Please sign in to comment.