Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature to TXM to detect and purge stuck transactions #12881

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
246b04d
Added the EVM stuck tx detector component
amit-momin Apr 18, 2024
d47d88f
Added stuck tx handling in Confirmer
amit-momin Apr 19, 2024
8c713dc
Fixed linting
amit-momin Apr 19, 2024
05c5a11
Fixed toml config decoding
amit-momin Apr 19, 2024
360d9fa
Fixed config tests
amit-momin Apr 19, 2024
0a04fff
Fixed web resolver config tests
amit-momin Apr 19, 2024
d26473c
Fixed config docs test
amit-momin Apr 19, 2024
1541c5a
Added zkEVM overflow detection and added unit tests for the detector
amit-momin Apr 23, 2024
1d4113d
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 23, 2024
c71ac14
Fixed broken tests after merge
amit-momin Apr 23, 2024
2e46dde
Reverted AutoPurgeConfig validation changes and fixed config tests
amit-momin Apr 23, 2024
8185dc8
Added changeset and fixed config validation logic
amit-momin Apr 23, 2024
20bfb9d
Fixed linting
amit-momin Apr 23, 2024
82e3a5d
Fixed linting
amit-momin Apr 23, 2024
db5264e
Fixed purge attempt builder and added tests
amit-momin Apr 24, 2024
0d5b318
Updated evm.txes contraint to allow non-null nonce if fatal_error
amit-momin Apr 24, 2024
25105a1
Added confirmer test
amit-momin Apr 24, 2024
491f9e3
Adjusted confirmer test to better reflect actual process
amit-momin Apr 24, 2024
338ab26
Fixed linting
amit-momin Apr 24, 2024
dc17b98
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 25, 2024
468b73b
Added purge block num loading on Confirmer startup
amit-momin Apr 25, 2024
736eb89
Updated EVM tx store mock
amit-momin Apr 25, 2024
b2902c6
Fixed linting and testdata
amit-momin Apr 25, 2024
6eb5dc5
Updated stuck tx fatal error messages
amit-momin Apr 25, 2024
a5663ee
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 25, 2024
0cea37b
Updated sql migration file sequence
amit-momin Apr 25, 2024
a3cd9b7
Skipped loading purge block num if auto-purge feature disabled and fi…
amit-momin Apr 26, 2024
02306fe
Fixed linting
amit-momin Apr 26, 2024
a13605b
Renamed function and moved log
amit-momin Apr 26, 2024
c2deae5
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 26, 2024
a5ea4aa
Added stricter config validation
amit-momin Apr 26, 2024
1cfb5cc
Fixed linting
amit-momin Apr 27, 2024
335033a
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 29, 2024
a5ba3a0
Updated auto-purge feature configs to adhere to config naming standards
amit-momin Apr 29, 2024
7513d36
Fixed config doc test and updated changeset
amit-momin Apr 29, 2024
4b41b4b
Updated Scroll and zkEVM config defaults and linted common config
amit-momin Apr 29, 2024
cc5764d
Updated config doc
amit-momin Apr 29, 2024
b1faa07
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin Apr 30, 2024
d67d79b
Generated config doc and fixed linting
amit-momin Apr 30, 2024
c4062f5
Updated config description for AutoPurge.MinAttempts
amit-momin May 9, 2024
54b9c7c
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin May 9, 2024
d244012
Fixed sql migration conflict
amit-momin May 9, 2024
0915400
Fixed linting
amit-momin May 9, 2024
c95d261
Updated stuck tx detector to use PriceMax config and added comments
amit-momin May 13, 2024
db5e5d9
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin May 13, 2024
31d69ba
Fixed linting
amit-momin May 13, 2024
90ec4a3
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin May 22, 2024
d81da52
Updated DetectionApiUrl config example
amit-momin May 22, 2024
a33460b
Merge branch 'develop' into BCI-2941-Implement-auto-purge-stuck-trans…
amit-momin May 23, 2024
c08504b
Fixed issues from latest merge
amit-momin May 23, 2024
88826a7
Renumbered sql migration file
amit-momin May 23, 2024
e400a36
Fixed testdata
amit-momin May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added purge block num loading on Confirmer startup
  • Loading branch information
amit-momin committed Apr 25, 2024
commit 468b73b0eaf5e632a7f4f928468a4f988b36e86e
4 changes: 4 additions & 0 deletions common/txmgr/confirmer.go
prashantkumar1982 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sta
if err != nil {
return fmt.Errorf("Confirmer: failed to load EnabledAddressesForChain: %w", err)
}
err = ec.stuckTxDetector.LoadPurgeBlockNumMap(ctx, ec.enabledAddresses)
if err != nil {
ec.lggr.Debugf("Confirmer: failed to load the last purged block num for enabled addresses. Process can continue as normal but purge rate limiting may be affected.")
}

ec.ctx, ec.ctxCancel = context.WithCancel(context.Background())
ec.wg = sync.WaitGroup{}
Expand Down
1 change: 1 addition & 0 deletions common/txmgr/types/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type StuckTxDetector[
FEE feetypes.Fee, // FEE - chain fee type
] interface {
DetectStuckTransactions(ctx context.Context, enabledAddresses []ADDR, blockNum int64) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
LoadPurgeBlockNumMap(ctx context.Context, addresses []ADDR) error
prashantkumar1982 marked this conversation as resolved.
Show resolved Hide resolved
SetPurgeBlockNum(fromAddress ADDR, blockNum int64)
StuckTxFatalError() *string
}
2 changes: 1 addition & 1 deletion core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3171,7 +3171,7 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) {
// Create attempts so that the oldest broadcast attempt's block num is what meets the threshold check
// Create autoPurgeMinAttempts number of attempts to ensure the broadcast attempt count check is not being triggered
// Create attempts broadcasted autoPurgeThreshold block ago to ensure broadcast block num check is not being triggered
tx := mustInsertUnconfirmedTxWithXBroadcastAttempts(t, txStore, nonce, fromAddress, autoPurgeMinAttempts, blockNum-int64(autoPurgeThreshold), marketGasPrice.Add(oneGwei))
tx := mustInsertUnconfirmedTxWithBroadcastAttempts(t, txStore, nonce, fromAddress, autoPurgeMinAttempts, blockNum-int64(autoPurgeThreshold), marketGasPrice.Add(oneGwei))

head := evmtypes.Head{
Hash: utils.NewHash(),
Expand Down
32 changes: 18 additions & 14 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type TxStoreWebApi interface {
TransactionsWithAttempts(ctx context.Context, offset, limit int) ([]Tx, int, error)
FindTxAttempt(ctx context.Context, hash common.Hash) (*TxAttempt, error)
FindTxWithAttempts(ctx context.Context, etxID int64) (etx Tx, err error)
FindUnconfirmedTxsByFromAddresses(ctx context.Context, addresses []common.Address, chainID *big.Int) (txs []Tx, err error)
FindTxsByStateAndFromAddresses(ctx context.Context, addresses []common.Address, state txmgrtypes.TxState, chainID *big.Int) (txs []*Tx, err error)
}

type TestEvmTxStore interface {
Expand Down Expand Up @@ -828,30 +828,34 @@ ORDER BY evm.txes.nonce ASC, evm.tx_attempts.gas_price DESC, evm.tx_attempts.gas
return
}

// Returns the latest attempt for the lowest nonce unconfirmed transaction in the DB if it is not marked as insufficient eth or for purge already
func (o *evmTxStore) FindUnconfirmedTxsByFromAddresses(ctx context.Context, addresses []common.Address, chainID *big.Int) (txs []Tx, err error) {
// Returns the transaction by state and from addresses
// Loads attempt and receipts in the transactions
func (o *evmTxStore) FindTxsByStateAndFromAddresses(ctx context.Context, addresses []common.Address, state txmgrtypes.TxState, chainID *big.Int) (txs []*Tx, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poopoothegorilla FYI, this will need to be implemented in the in-memory store as well.
Hopefully there, it will be easier to fetch by state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for the ping will update when this gets merged in :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to keep it as general as possible with in-memory in mind. But if there's any issue with it's compatibility with your work, let me know and I can adjust it and the logic accordingly to help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont worry this is higher priority

var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
enabledAddrsBytea := make([][]byte, len(addresses))
for i, addr := range addresses {
enabledAddrsBytea[i] = addr.Bytes()
}
err = o.Transaction(ctx, true, func(orm *evmTxStore) error {
var dbTxs []DbEthTx
err = orm.q.SelectContext(ctx, &dbTxs, `
SELECT evm.txes.* FROM evm.txes
WHERE evm.txes.state = 'unconfirmed' AND evm.txes.from_address = ANY($1) AND evm.txes.evm_chain_id = $2
`, enabledAddrsBytea, chainID.String())
err = o.Transact(ctx, true, func(orm *evmTxStore) error {
var dbEtxs []DbEthTx
err = orm.q.SelectContext(ctx, &dbEtxs, `SELECT * FROM evm.txes WHERE state = $1 AND from_address = ANY($2) AND evm_chain_id = $3`, state, enabledAddrsBytea, chainID.String())
if err != nil {
return fmt.Errorf("FindTxAttemptsForPurge failed to load evm.tx_attempts: %w", err)
return fmt.Errorf("FindTxsByStateAndFromAddresses failed to load evm.txes: %w", err)
}
if len(dbTxs) == 0 {
if len(dbEtxs) == 0 {
return nil
}
txs = dbEthTxsToEvmEthTxs(dbTxs)
err = orm.preloadTxAttempts(ctx, txs)
return pkgerrors.Wrap(err, "FindTxAttemptsForPurge failed to load evm.txes")
txs = make([]*Tx, len(dbEtxs))
dbEthTxsToEvmEthTxPtrs(dbEtxs, txs)
if err = orm.LoadTxesAttempts(ctx, txs); err != nil {
return fmt.Errorf("FindTxsByStateAndFromAddresses failed to load evm.tx_attempts: %w", err)
}
if err = orm.loadEthTxesAttemptsReceipts(ctx, txs); err != nil {
return fmt.Errorf("FindTxsByStateAndFromAddresses failed to load evm.receipts: %w", err)
}
return nil
})
return
}
Expand Down
62 changes: 31 additions & 31 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 35 additions & 6 deletions core/chains/evm/txmgr/stuck_tx_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink/v2/common/config"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/txmgr"
"github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
Expand All @@ -30,7 +31,7 @@ type stuckTxDetectorClient interface {
}

type stuckTxDetectorTxStore interface {
FindUnconfirmedTxsByFromAddresses(ctx context.Context, addresses []common.Address, chainID *big.Int) (txs []Tx, err error)
FindTxsByStateAndFromAddresses(ctx context.Context, addresses []common.Address, state types.TxState, chainID *big.Int) (txs []*Tx, err error)
}

type stuckTxDetectorConfig interface {
Expand Down Expand Up @@ -59,7 +60,6 @@ func NewStuckTxDetector(lggr logger.Logger, chainID *big.Int, chainType config.C
t := http.DefaultTransport.(*http.Transport).Clone()
t.DisableCompression = true
httpClient := &http.Client{Transport: t}
// TODO: Load purgeBlockNumMap with some DB state or confirm rate limit is not needed on first purge after restart
return &stuckTxDetector{
lggr: lggr,
chainID: chainID,
Expand All @@ -73,6 +73,35 @@ func NewStuckTxDetector(lggr logger.Logger, chainID *big.Int, chainType config.C
}
}

func (d *stuckTxDetector) LoadPurgeBlockNumMap(ctx context.Context, addresses []common.Address) error {
d.purgeBlockNumLock.Lock()
defer d.purgeBlockNumLock.Unlock()
// Ok to reset the map here since this method could be reloaded with a new list of from addresses
d.purgeBlockNumMap = make(map[common.Address]int64)
for _, address := range addresses {
d.purgeBlockNumMap[address] = 0
}

// Find all fatal error transactions to see if any were from previous purges to properly set the map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why you need to do all this.
At startup time, just let the map be empty.
If map value for a certain address is empty, means you can purge txs on this address immediately.
When you purge a Tx, then you set the map value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had it that way but was worried that we'd lose expected rate limiting on restart. If we restart right after a tx got purged, we'd immediately purge the next if one was stuck behind the original. If tx get cleared or reaped, there isn't much more we can do but thought I'd at least give best effort.

txs, err := d.txStore.FindTxsByStateAndFromAddresses(ctx, addresses, txmgr.TxFatalError, d.chainID)
if err != nil {
return fmt.Errorf("failed to query fatal error transactions from the txstore: %w", err)
}

// Set the purgeBlockNumMap with the receipt block num of purge attempts
for _, tx := range txs {
for _, attempt := range tx.TxAttempts {
if attempt.IsPurgeAttempt && len(attempt.Receipts) > 0 {
// There should only be 1 receipt in an attempt for a transaction
d.purgeBlockNumMap[tx.FromAddress] = attempt.Receipts[0].GetBlockNumber().Int64()
break
}
}
}

return nil
}

// If the AutoPurgeStuckTxs feature is enabled, finds terminally stuck transactions
// Uses a chain specific method for detection, or if one does not exist, applies a general heuristic
func (d *stuckTxDetector) DetectStuckTransactions(ctx context.Context, enabledAddresses []common.Address, blockNum int64) ([]Tx, error) {
Expand Down Expand Up @@ -100,7 +129,7 @@ func (d *stuckTxDetector) DetectStuckTransactions(ctx context.Context, enabledAd

for _, stuckTx := range stuckTxs {
lggr := stuckTx.GetLogger(d.lggr)
lggr.Errorw("marking transaction as terminally stuck", "etx", stuckTx)
lggr.Debugw("marking transaction as terminally stuck", "etx", stuckTx)
amit-momin marked this conversation as resolved.
Show resolved Hide resolved
}

return stuckTxs, err
Expand All @@ -110,17 +139,17 @@ func (d *stuckTxDetector) DetectStuckTransactions(ctx context.Context, enabledAd
// Only the earliest transaction can be considered terminally stuck. All others may be valid and just stuck behind the nonce
func (d *stuckTxDetector) FindPotentialStuckTxs(ctx context.Context, enabledAddresses []common.Address) ([]Tx, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this to:
findUnconfirmedTxWithLowestNonce()

Because this logic has nothing to do with stuck txs.

Also, for this struct, don't export any methods that are used internally only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was exporting this one primarily for testing

// Loads attempts within tx
txs, err := d.txStore.FindUnconfirmedTxsByFromAddresses(ctx, enabledAddresses, d.chainID)
txs, err := d.txStore.FindTxsByStateAndFromAddresses(ctx, enabledAddresses, txmgr.TxUnconfirmed, d.chainID)
if err != nil {
return nil, fmt.Errorf("failed to retrieve unconfirmed transactions for enabled addresses: %w", err)
}
// Stores the lowest nonce tx found in the query results for each from address
lowestNonceTxMap := make(map[common.Address]Tx)
for _, tx := range txs {
if _, ok := lowestNonceTxMap[tx.FromAddress]; !ok {
lowestNonceTxMap[tx.FromAddress] = tx
lowestNonceTxMap[tx.FromAddress] = *tx
} else if lowestNonceTx := lowestNonceTxMap[tx.FromAddress]; *lowestNonceTx.Sequence > *tx.Sequence {
lowestNonceTxMap[tx.FromAddress] = tx
lowestNonceTxMap[tx.FromAddress] = *tx
}
}

Expand Down
Loading
Loading