Skip to content

Commit

Permalink
Add feature to TXM to detect and purge stuck transactions (#12881)
Browse files Browse the repository at this point in the history
* Added the EVM stuck tx detector component

* Added stuck tx handling in Confirmer

* Fixed linting

* Fixed toml config decoding

* Fixed config tests

* Fixed web resolver config tests

* Fixed config docs test

* Added zkEVM overflow detection and added unit tests for the detector

* Fixed broken tests after merge

* Reverted AutoPurgeConfig validation changes and fixed config tests

* Added changeset and fixed config validation logic

* Fixed linting

* Fixed linting

* Fixed purge attempt builder and added tests

* Updated evm.txes contraint to allow non-null nonce if fatal_error

* Added confirmer test

* Adjusted confirmer test to better reflect actual process

* Fixed linting

* Added purge block num loading on Confirmer startup

* Updated EVM tx store mock

* Fixed linting and testdata

* Updated stuck tx fatal error messages

* Updated sql migration file sequence

* Skipped loading purge block num if auto-purge feature disabled and fixed test

* Fixed linting

* Renamed function and moved log

* Added stricter config validation

* Fixed linting

* Updated auto-purge feature configs to adhere to config naming standards

* Fixed config doc test and updated changeset

* Updated Scroll and zkEVM config defaults and linted common config

* Updated config doc

* Generated config doc and fixed linting

* Updated config description for AutoPurge.MinAttempts

* Fixed sql migration conflict

* Fixed linting

* Updated stuck tx detector to use PriceMax config and added comments

* Fixed linting

* Updated DetectionApiUrl config example

* Fixed issues from latest merge

* Renumbered sql migration file

* Fixed testdata
  • Loading branch information
amit-momin authored May 27, 2024
1 parent 33dfd57 commit d675d86
Show file tree
Hide file tree
Showing 47 changed files with 1,761 additions and 53 deletions.
5 changes: 5 additions & 0 deletions .changeset/red-eagles-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added Added an auto-purge feature to the EVM TXM that identifies terminally stuck transactions either through a chain specific method or heurisitic then purges them to unblock the nonce. Included 4 new toml configs under Transactions.AutoPurge to configure this new feature: Enabled, Threshold, MinAttempts, and DetectionApiUrl.
6 changes: 5 additions & 1 deletion common/config/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
ChainScroll ChainType = "scroll"
ChainWeMix ChainType = "wemix"
ChainXLayer ChainType = "xlayer"
ChainZkEvm ChainType = "zkevm"
ChainZkSync ChainType = "zksync"
)

Expand All @@ -34,7 +35,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkSync:
case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync:
return true
}
return false
Expand All @@ -60,6 +61,8 @@ func ChainTypeFromSlug(slug string) ChainType {
return ChainWeMix
case "xlayer":
return ChainXLayer
case "zkevm":
return ChainZkEvm
case "zksync":
return ChainZkSync
default:
Expand Down Expand Up @@ -123,5 +126,6 @@ var ErrInvalidChainType = fmt.Errorf("must be one of %s or omitted", strings.Joi
string(ChainScroll),
string(ChainWeMix),
string(ChainXLayer),
string(ChainZkEvm),
string(ChainZkSync),
}, ", "))
106 changes: 97 additions & 9 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ type Confirmer[
lggr logger.SugaredLogger
client txmgrtypes.TxmClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
resumeCallback ResumeCallback
chainConfig txmgrtypes.ConfirmerChainConfig
feeConfig txmgrtypes.ConfirmerFeeConfig
txConfig txmgrtypes.ConfirmerTransactionsConfig
dbConfig txmgrtypes.ConfirmerDatabaseConfig
chainID CHAIN_ID
stuckTxDetector txmgrtypes.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
resumeCallback ResumeCallback
chainConfig txmgrtypes.ConfirmerChainConfig
feeConfig txmgrtypes.ConfirmerFeeConfig
txConfig txmgrtypes.ConfirmerTransactionsConfig
dbConfig txmgrtypes.ConfirmerDatabaseConfig
chainID CHAIN_ID

ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
enabledAddresses []ADDR
Expand Down Expand Up @@ -162,6 +163,7 @@ func NewConfirmer[
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
lggr logger.Logger,
isReceiptNil func(R) bool,
stuckTxDetector txmgrtypes.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
lggr = logger.Named(lggr, "Confirmer")
return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -178,6 +180,7 @@ func NewConfirmer[
ks: keystore,
mb: mailbox.NewSingle[HEAD](),
isReceiptNil: isReceiptNil,
stuckTxDetector: stuckTxDetector,
}
}

Expand Down Expand Up @@ -205,6 +208,9 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sta
if err != nil {
return fmt.Errorf("Confirmer: failed to load EnabledAddressesForChain: %w", err)
}
if err = ec.stuckTxDetector.LoadPurgeBlockNumMap(ctx, ec.enabledAddresses); err != nil {
ec.lggr.Debugf("Confirmer: failed to load the last purged block num for enabled addresses. Process can continue as normal but purge rate limiting may be affected.")
}

ec.stopCh = make(chan struct{})
ec.wg = sync.WaitGroup{}
Expand Down Expand Up @@ -298,6 +304,13 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro
ec.lggr.Debugw("Finished CheckForReceipts", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer")
mark = time.Now()

if err := ec.ProcessStuckTransactions(ctx, head.BlockNumber()); err != nil {
return fmt.Errorf("ProcessStuckTransactions failed: %w", err)
}

ec.lggr.Debugw("Finished ProcessStuckTransactions", "headNum", head.BlockNumber(), "time", time.Since(mark), "id", "confirmer")
mark = time.Now()

if err := ec.RebroadcastWhereNecessary(ctx, head.BlockNumber()); err != nil {
return fmt.Errorf("RebroadcastWhereNecessary failed: %w", err)
}
Expand Down Expand Up @@ -436,6 +449,57 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Che
return nil
}

// Determines if any of the unconfirmed transactions are terminally stuck for each enabled address
// If any transaction is found to be terminally stuck, this method sends an empty attempt with bumped gas in an attempt to purge the stuck transaction
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ProcessStuckTransactions(ctx context.Context, blockNum int64) error {
// Use the detector to find a stuck tx for each enabled address
stuckTxs, err := ec.stuckTxDetector.DetectStuckTransactions(ctx, ec.enabledAddresses, blockNum)
if err != nil {
return fmt.Errorf("failed to detect stuck transactions: %w", err)
}
if len(stuckTxs) == 0 {
return nil
}

var wg sync.WaitGroup
wg.Add(len(stuckTxs))
errorList := []error{}
var errMu sync.Mutex
for _, tx := range stuckTxs {
// All stuck transactions will have unique from addresses. It is safe to process separate keys concurrently
// NOTE: This design will block one key if another takes a really long time to execute
go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
defer wg.Done()
lggr := tx.GetLogger(ec.lggr)
// Create an purge attempt for tx
purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr)
if err != nil {
errMu.Lock()
errorList = append(errorList, fmt.Errorf("failed to create a purge attempt: %w", err))
errMu.Unlock()
return
}
// Save purge attempt
if err := ec.txStore.SaveInProgressAttempt(ctx, &purgeAttempt); err != nil {
errMu.Lock()
errorList = append(errorList, fmt.Errorf("failed to save purge attempt: %w", err))
errMu.Unlock()
return
}
lggr.Warnw("marked transaction as terminally stuck", "etx", tx)
// Send purge attempt
if err := ec.handleInProgressAttempt(ctx, lggr, tx, purgeAttempt, blockNum); err != nil {
errMu.Lock()
errorList = append(errorList, fmt.Errorf("failed to send purge attempt: %w", err))
errMu.Unlock()
return
}
}(tx)
}
wg.Wait()
return errors.Join(errorList...)
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) separateLikelyConfirmedAttempts(from ADDR, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], minedSequence SEQ) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
if len(attempts) == 0 {
return attempts
Expand Down Expand Up @@ -486,15 +550,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
j = len(attempts)
}

ec.lggr.Debugw(fmt.Sprintf("Batch fetching receipts at indexes %v until (excluded) %v", i, j), "blockNum", blockNum)
ec.lggr.Debugw(fmt.Sprintf("Batch fetching receipts at indexes %d until (excluded) %d", i, j), "blockNum", blockNum)

batch := attempts[i:j]

receipts, err := ec.batchFetchReceipts(ctx, batch, blockNum)
if err != nil {
return fmt.Errorf("batchFetchReceipts failed: %w", err)
}
if err := ec.txStore.SaveFetchedReceipts(ctx, receipts, ec.chainID); err != nil {
validReceipts, purgeReceipts := ec.separateValidAndPurgeAttemptReceipts(receipts, batch)
// Saves the receipts and mark the associated transactions as Confirmed
if err := ec.txStore.SaveFetchedReceipts(ctx, validReceipts, TxConfirmed, nil, ec.chainID); err != nil {
return fmt.Errorf("saveFetchedReceipts failed: %w", err)
}
// Save the receipts but mark the associated transactions as Fatal Error since the original transaction was purged
if err := ec.txStore.SaveFetchedReceipts(ctx, purgeReceipts, TxFatalError, ec.stuckTxDetector.StuckTxFatalError(), ec.chainID); err != nil {
return fmt.Errorf("saveFetchedReceipts failed: %w", err)
}
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(receipts)))
Expand All @@ -507,6 +577,25 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) fet
return nil
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) separateValidAndPurgeAttemptReceipts(receipts []R, attempts []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (valid []R, purge []R) {
receiptMap := make(map[TX_HASH]R)
for _, receipt := range receipts {
receiptMap[receipt.GetTxHash()] = receipt
}
for _, attempt := range attempts {
if receipt, ok := receiptMap[attempt.Hash]; ok {
if attempt.IsPurgeAttempt {
// Setting the purged block num here is ok since we have confirmation the tx has been purged with the receipt
ec.stuckTxDetector.SetPurgeBlockNum(attempt.Tx.FromAddress, receipt.GetBlockNumber().Int64())
purge = append(purge, receipt)
} else {
valid = append(valid, receipt)
}
}
}
return
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) getMinedSequenceForAddress(ctx context.Context, from ADDR) (SEQ, error) {
return ec.client.SequenceAt(ctx, from, nil)
}
Expand Down Expand Up @@ -700,7 +789,6 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Fin
lggr.Infow(fmt.Sprintf("Found %d transactions to be re-sent that were previously rejected due to insufficient native token balance", len(etxInsufficientFunds)), "blockNum", blockNum, "address", address)
}

// TODO: Just pass the Q through everything
etxBumps, err := ec.txStore.FindTxsRequiringGasBump(ctx, address, blockNum, gasBumpThreshold, bumpDepth, chainID)
if ctx.Err() != nil {
return nil, nil
Expand Down
28 changes: 28 additions & 0 deletions common/txmgr/types/mocks/tx_attempt_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions common/txmgr/types/stuck_tx_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import (
"context"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

// StuckTxDetector is used by the Confirmer to determine if any unconfirmed transactions are terminally stuck
type StuckTxDetector[
CHAIN_ID types.ID, // CHAIN_ID - chain id type
ADDR types.Hashable, // ADDR - chain address type
TX_HASH, BLOCK_HASH types.Hashable, // various chain hash types
SEQ types.Sequence, // SEQ - chain sequence type (nonce, utxo, etc)
FEE feetypes.Fee, // FEE - chain fee type
] interface {
// Uses either a chain specific API or heuristic to determine if any unconfirmed transactions are terminally stuck. Returns only one transaction per enabled address.
DetectStuckTransactions(ctx context.Context, enabledAddresses []ADDR, blockNum int64) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
// Loads the internal map that tracks the last block num a transaction was purged at using the DB state
LoadPurgeBlockNumMap(ctx context.Context, addresses []ADDR) error
// Sets the last purged block num after a transaction has been successfully purged with receipt
SetPurgeBlockNum(fromAddress ADDR, blockNum int64)
// Returns the error message to set in the transaction error field to mark it as terminally stuck
StuckTxFatalError() *string
}
1 change: 1 addition & 0 deletions common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ type TxAttempt[
State TxAttemptState
Receipts []ChainReceipt[TX_HASH, BLOCK_HASH] `json:"-"`
TxType int
IsPurgeAttempt bool
}

func (a *TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) String() string {
Expand Down
3 changes: 3 additions & 0 deletions common/txmgr/types/tx_attempt_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ type TxAttemptBuilder[

// NewEmptyTxAttempt is used in ForceRebroadcast to create a signed tx with zero value sent to the zero address
NewEmptyTxAttempt(ctx context.Context, seq SEQ, feeLimit uint64, fee FEE, fromAddress ADDR) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)

// NewPurgeTxAttempt is used to create empty transaction attempts with higher gas than the previous attempt to purge stuck transactions
NewPurgeTxAttempt(ctx context.Context, etx Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], lggr logger.Logger) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error)
}
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type TxStore[
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, receipts []R, chainID CHAIN_ID) (err error)
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error

// additional methods for tx store management
CheckTxQueueCapacity(ctx context.Context, fromAddress ADDR, maxQueuedTransactions uint64, chainID CHAIN_ID) (err error)
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/config/chain_scoped_transactions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"net/url"
"time"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
Expand Down Expand Up @@ -33,3 +34,27 @@ func (t *transactionsConfig) MaxInFlight() uint32 {
func (t *transactionsConfig) MaxQueued() uint64 {
return uint64(*t.c.MaxQueued)
}

func (t *transactionsConfig) AutoPurge() AutoPurgeConfig {
return &autoPurgeConfig{c: t.c.AutoPurge}
}

type autoPurgeConfig struct {
c toml.AutoPurgeConfig
}

func (a *autoPurgeConfig) Enabled() bool {
return *a.c.Enabled
}

func (a *autoPurgeConfig) Threshold() uint32 {
return *a.c.Threshold
}

func (a *autoPurgeConfig) MinAttempts() uint32 {
return *a.c.MinAttempts
}

func (a *autoPurgeConfig) DetectionApiUrl() *url.URL {
return a.c.DetectionApiUrl.URL()
}
9 changes: 9 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"math/big"
"net/url"
"time"

gethcommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -100,6 +101,14 @@ type Transactions interface {
ReaperThreshold() time.Duration
MaxInFlight() uint32
MaxQueued() uint64
AutoPurge() AutoPurgeConfig
}

type AutoPurgeConfig interface {
Enabled() bool
Threshold() uint32
MinAttempts() uint32
DetectionApiUrl() *url.URL
}

//go:generate mockery --quiet --name GasEstimator --output ./mocks/ --case=underscore
Expand Down
Loading

0 comments on commit d675d86

Please sign in to comment.