Skip to content

Commit

Permalink
make txmgr aware of the txpool.ErrAlreadyReserved condition (ethereum…
Browse files Browse the repository at this point in the history
  • Loading branch information
roberto-bayardo authored Mar 1, 2024
1 parent 3b5c2fc commit ee21678
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 47 deletions.
2 changes: 1 addition & 1 deletion op-service/txmgr/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestQueue_Send(t *testing.T) {
{},
},
nonces: []uint64{0, 1},
total: 3 * time.Second,
total: 1 * time.Second,
},
}
for _, test := range testCases {
Expand Down
56 changes: 37 additions & 19 deletions op-service/txmgr/send_state.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package txmgr

import (
"strings"
"errors"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
)

var (
// Returned by CriticalError when there is an incompatible tx type already in the mempool.
// geth defines this error as txpool.ErrAlreadyReserved in v1.13.14 so we can remove this
// declaration once op-geth is updated to this version.
ErrAlreadyReserved = errors.New("address already reserved")

// Returned by CriticalError when the system is unable to get the tx into the mempool in the
// alloted time
ErrMempoolDeadlineExpired = errors.New("failed to get tx into the mempool")
)

// SendState tracks information about the publication state of a given txn. In
// this context, a txn may correspond to multiple different txn hashes due to
// varying gas prices, though we treat them all as the same logical txn. This
Expand All @@ -27,6 +38,9 @@ type SendState struct {
successFullPublishCount uint64 // nil error => tx made it to the mempool
safeAbortNonceTooLowCount uint64 // nonce too low error

// Whether any attempt to send the tx resulted in ErrAlreadyReserved
alreadyReserved bool

// Miscellaneous tracking
bumpCount int // number of times we have bumped the gas price
}
Expand Down Expand Up @@ -60,8 +74,10 @@ func (s *SendState) ProcessSendError(err error) {
switch {
case err == nil:
s.successFullPublishCount++
case strings.Contains(err.Error(), core.ErrNonceTooLow.Error()):
case errStringMatch(err, core.ErrNonceTooLow):
s.nonceTooLowCount++
case errStringMatch(err, ErrAlreadyReserved):
s.alreadyReserved = true
}
}

Expand Down Expand Up @@ -93,27 +109,29 @@ func (s *SendState) TxNotMined(txHash common.Hash) {
}
}

// ShouldAbortImmediately returns true if the txmgr should give up on trying a
// given txn with the target nonce.
// This occurs when the set of errors recorded indicates that no further progress can be made
// on this transaction.
func (s *SendState) ShouldAbortImmediately() bool {
// CriticalError returns a non-nil error if the txmgr should give up on trying a given txn with the
// target nonce. This occurs when the set of errors recorded indicates that no further progress
// can be made on this transaction, or if there is an incompatible tx type currently in the
// mempool.
func (s *SendState) CriticalError() error {
s.mu.RLock()
defer s.mu.RUnlock()

// Never abort if our latest sample reports having at least one mined txn.
if len(s.minedTxs) > 0 {
return false
}

// If we have exceeded the nonce too low count, abort
if s.nonceTooLowCount >= s.safeAbortNonceTooLowCount ||
// If we have not published a transaction in the allotted time, abort
(s.successFullPublishCount == 0 && s.now().After(s.txInMempoolDeadline)) {
return true
switch {
case len(s.minedTxs) > 0:
// Never abort if our latest sample reports having at least one mined txn.
return nil
case s.nonceTooLowCount >= s.safeAbortNonceTooLowCount:
// we have exceeded the nonce too low count
return core.ErrNonceTooLow
case s.successFullPublishCount == 0 && s.now().After(s.txInMempoolDeadline):
// unable to get the tx into the mempool in the alloted time
return ErrMempoolDeadlineExpired
case s.alreadyReserved:
// incompatible tx type in mempool
return ErrAlreadyReserved
}

return false
return nil
}

// IsWaitingForConfirmation returns true if we have at least one confirmation on
Expand Down
41 changes: 20 additions & 21 deletions op-service/txmgr/send_state_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package txmgr_test
package txmgr

import (
"errors"
Expand All @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-service/txmgr"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
)
Expand All @@ -18,15 +17,15 @@ var (

const testSafeAbortNonceTooLowCount = 3

func newSendState() *txmgr.SendState {
func newSendState() *SendState {
return newSendStateWithTimeout(time.Hour, time.Now)
}

func newSendStateWithTimeout(t time.Duration, now func() time.Time) *txmgr.SendState {
return txmgr.NewSendStateWithNow(testSafeAbortNonceTooLowCount, t, now)
func newSendStateWithTimeout(t time.Duration, now func() time.Time) *SendState {
return NewSendStateWithNow(testSafeAbortNonceTooLowCount, t, now)
}

func processNSendErrors(sendState *txmgr.SendState, err error, n int) {
func processNSendErrors(sendState *SendState, err error, n int) {
for i := 0; i < n; i++ {
sendState.ProcessSendError(err)
}
Expand All @@ -36,7 +35,7 @@ func processNSendErrors(sendState *txmgr.SendState, err error, n int) {
// trigger an abort even after the safe abort interval has elapsed.
func TestSendStateNoAbortAfterInit(t *testing.T) {
sendState := newSendState()
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
require.False(t, sendState.IsWaitingForConfirmation())
}

Expand All @@ -46,7 +45,7 @@ func TestSendStateNoAbortAfterProcessNilError(t *testing.T) {
sendState := newSendState()

processNSendErrors(sendState, nil, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
}

// TestSendStateNoAbortAfterProcessOtherError asserts that non-nil errors other
Expand All @@ -56,7 +55,7 @@ func TestSendStateNoAbortAfterProcessOtherError(t *testing.T) {

otherError := errors.New("other error")
processNSendErrors(sendState, otherError, testSafeAbortNonceTooLowCount)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
}

// TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined asserts that we will
Expand All @@ -65,11 +64,11 @@ func TestSendStateAbortSafelyAfterNonceTooLowButNoTxMined(t *testing.T) {
sendState := newSendState()

sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
require.ErrorIs(t, sendState.CriticalError(), core.ErrNonceTooLow)
}

// TestSendStateMiningTxCancelsAbort asserts that a tx getting mined after
Expand All @@ -80,9 +79,9 @@ func TestSendStateMiningTxCancelsAbort(t *testing.T) {
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.TxMined(testHash)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
}

// TestSendStateReorgingTxResetsAbort asserts that unmining a tx does not
Expand All @@ -96,7 +95,7 @@ func TestSendStateReorgingTxResetsAbort(t *testing.T) {
sendState.TxMined(testHash)
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
}

// TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined asserts that we will not
Expand All @@ -112,7 +111,7 @@ func TestSendStateNoAbortEvenIfNonceTooLowAfterTxMined(t *testing.T) {
processNSendErrors(
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
}

// TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine asserts that we will
Expand All @@ -125,9 +124,9 @@ func TestSendStateSafeAbortIfNonceTooLowPersistsAfterUnmine(t *testing.T) {
sendState.TxNotMined(testHash)
sendState.ProcessSendError(core.ErrNonceTooLow)
sendState.ProcessSendError(core.ErrNonceTooLow)
require.False(t, sendState.ShouldAbortImmediately())
require.Nil(t, sendState.CriticalError())
sendState.ProcessSendError(core.ErrNonceTooLow)
require.True(t, sendState.ShouldAbortImmediately())
require.ErrorIs(t, sendState.CriticalError(), core.ErrNonceTooLow)
}

// TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx asserts that we will
Expand All @@ -140,7 +139,7 @@ func TestSendStateSafeAbortWhileCallingNotMinedOnUnminedTx(t *testing.T) {
sendState, core.ErrNonceTooLow, testSafeAbortNonceTooLowCount,
)
sendState.TxNotMined(testHash)
require.True(t, sendState.ShouldAbortImmediately())
require.ErrorIs(t, sendState.CriticalError(), core.ErrNonceTooLow)
}

// TestSendStateIsWaitingForConfirmationAfterTxMined asserts that we are waiting
Expand Down Expand Up @@ -179,13 +178,13 @@ func stepClock(step time.Duration) func() time.Time {
// when no successful transactions have been recorded
func TestSendStateTimeoutAbort(t *testing.T) {
sendState := newSendStateWithTimeout(10*time.Millisecond, stepClock(20*time.Millisecond))
require.True(t, sendState.ShouldAbortImmediately(), "Should abort after timing out")
require.ErrorIs(t, sendState.CriticalError(), ErrMempoolDeadlineExpired, "Should abort after timing out")
}

// TestSendStateNoTimeoutAbortIfPublishedTx ensure that this will not abort if there is
// a successful transaction send.
func TestSendStateNoTimeoutAbortIfPublishedTx(t *testing.T) {
sendState := newSendStateWithTimeout(10*time.Millisecond, stepClock(20*time.Millisecond))
sendState.ProcessSendError(nil)
require.False(t, sendState.ShouldAbortImmediately(), "Should not abort if published transaction successfully")
require.Nil(t, sendState.CriticalError(), "Should not abort if published transaction successfully")
}
20 changes: 14 additions & 6 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type TxManager interface {
// may be included on L1 even if the context is cancelled.
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
//
// Callers using both Blob and non-Blob transactions should check to see if the returned error
// is ErrAlreadyReserved, which indicates an incompatible transaction may be stuck in the
// mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)

// From returns the sending address associated with the instance of the transaction manager.
Expand Down Expand Up @@ -421,17 +425,16 @@ func (m *SimpleTxManager) sendTx(ctx context.Context, tx *types.Transaction) (*t
defer ticker.Stop()

for {
if err := sendState.CriticalError(); err != nil {
m.txLogger(tx, false).Warn("Aborting transaction submission", "err", err)
return nil, fmt.Errorf("aborted tx send due to critical error: %w", err)
}
select {
case <-ticker.C:
// Don't resubmit a transaction if it has been mined, but we are waiting for the conf depth.
if sendState.IsWaitingForConfirmation() {
continue
}
// If we see lots of unrecoverable errors (and no pending transactions) abort sending the transaction.
if sendState.ShouldAbortImmediately() {
m.txLogger(tx, false).Warn("Aborting transaction submission")
return nil, errors.New("aborted transaction sending")
}
// if the tx manager closed while we were waiting for the tx, give up
if m.closed.Load() {
m.txLogger(tx, false).Warn("TxManager closed, aborting transaction submission")
Expand Down Expand Up @@ -495,9 +498,14 @@ func (m *SimpleTxManager) publishTx(ctx context.Context, tx *types.Transaction,
}

switch {
case errStringMatch(err, ErrAlreadyReserved):
// this can happen if, say, a blob transaction is stuck in the mempool and we try to
// send a non-blob transaction (and vice-versa).
l.Warn("txpool contains pending tx of incompatible type", "err", err)
m.metr.TxPublished("pending_tx_of_incompatible_type")
case errStringMatch(err, core.ErrNonceTooLow):
l.Warn("nonce too low", "err", err)
m.metr.TxPublished("nonce_to_low")
m.metr.TxPublished("nonce_too_low")
case errStringMatch(err, context.Canceled):
m.metr.RPCError()
l.Warn("transaction send cancelled", "err", err)
Expand Down
17 changes: 17 additions & 0 deletions op-service/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,23 @@ func TestTxMgrNeverConfirmCancel(t *testing.T) {
require.Nil(t, receipt)
}

// TestAlreadyReserved tests that AlreadyReserved error results in immediate abort of transaction
// sending.
func TestAlreadyReserved(t *testing.T) {
conf := configWithNumConfs(1)
h := newTestHarnessWithConfig(t, conf)

sendTx := func(ctx context.Context, tx *types.Transaction) error {
return ErrAlreadyReserved
}
h.backend.setTxSender(sendTx)

_, err := h.mgr.Send(context.Background(), TxCandidate{
To: &common.Address{},
})
require.ErrorIs(t, err, ErrAlreadyReserved)
}

// TestTxMgrConfirmsAtMaxGasPrice asserts that Send properly returns the max gas
// price receipt if none of the lower gas price txs were mined.
func TestTxMgrConfirmsAtHigherGasPrice(t *testing.T) {
Expand Down

0 comments on commit ee21678

Please sign in to comment.