Skip to content

defer txpool reorg until worker fetches txns for the next block #944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,3 +1576,10 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
func (p *BlobPool) RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int {
return 0
}

func (p *BlobPool) PauseReorgs() {
log.Debug("skip BlobPool `PauseReorgs`")
}
func (p *BlobPool) ResumeReorgs() {
log.Debug("skip BlobPool `ResumeReorgs`")
}
24 changes: 23 additions & 1 deletion core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type LegacyPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

Expand Down Expand Up @@ -258,6 +259,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
Expand Down Expand Up @@ -1198,13 +1200,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reorgsPaused bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*sortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
if curDone == nil && launchNextRun && !reorgsPaused {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)

Expand Down Expand Up @@ -1256,6 +1259,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
close(nextDone)
return
case reorgsPaused = <-pool.reorgPauseCh:
}
}
}
Expand Down Expand Up @@ -1705,6 +1709,24 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *LegacyPool) PauseReorgs() {
select {
case pool.reorgPauseCh <- true:
case <-pool.reorgShutdownCh:
}
}

// ResumeReorgs allows new reorg jobs to be started.
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *LegacyPool) ResumeReorgs() {
select {
case pool.reorgPauseCh <- false:
case <-pool.reorgShutdownCh:
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
3 changes: 3 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ type SubPool interface {

// RemoveTx removes a transaction from the pool, returning the number of transactions removed.
RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int

PauseReorgs()
ResumeReorgs()
}
12 changes: 12 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,15 @@ func (p *TxPool) RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int
}
return ret
}

func (pool *TxPool) PauseReorgs() {
for _, subpool := range pool.subpools {
subpool.PauseReorgs()
}
}

func (pool *TxPool) ResumeReorgs() {
for _, subpool := range pool.subpools {
subpool.ResumeReorgs()
}
}
7 changes: 7 additions & 0 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
}
collectL2Timer.UpdateSince(tidyPendingStart)

// Allow txpool to be reorged as we build current block
w.eth.TxPool().ResumeReorgs()

var nextL1MsgIndex uint64
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
nextL1MsgIndex = *dbIndex
Expand Down Expand Up @@ -668,6 +671,10 @@ func (w *worker) commit(res *pipeline.Result) error {
"accRows", res.Rows,
)

// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
w.eth.TxPool().PauseReorgs()

rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
// Commit block and state to database.
_, err = w.chain.WriteBlockAndSetHead(block, res.FinalBlock.Receipts, logs, res.FinalBlock.State, true)
Expand Down
Loading