Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and re-enable abandoned tx tracker #12533

Merged
merged 35 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bdea570
Re-enable tracker
DylanTinianov Mar 21, 2024
dc448e8
generate
DylanTinianov Mar 21, 2024
58de60a
update tracker
DylanTinianov Mar 22, 2024
f7e0d44
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Mar 24, 2024
a81653b
fix race conditions
DylanTinianov Mar 24, 2024
4cad710
Update tracker.go
DylanTinianov Mar 24, 2024
5d2da7f
Update tracker.go
DylanTinianov Mar 24, 2024
4875df4
Ensure thread safety
DylanTinianov Mar 24, 2024
f9dbe8c
Update tracker.go
DylanTinianov Mar 24, 2024
ab3a4ee
concurrently track txes
DylanTinianov Mar 24, 2024
e6cfea4
Optimizations
DylanTinianov Mar 24, 2024
99a5ba9
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Mar 25, 2024
41c12c8
update logging
DylanTinianov Mar 25, 2024
cc7e7d1
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Mar 25, 2024
9268f9d
Update tracker.go
DylanTinianov Mar 26, 2024
58c2bb6
Update CHANGELOG.md
DylanTinianov Mar 26, 2024
280e12a
Update common/txmgr/tracker.go
DylanTinianov Mar 26, 2024
bd51284
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Apr 1, 2024
554b11f
GetAbandonedTransactions
DylanTinianov Apr 3, 2024
ce6ed98
lint
DylanTinianov Apr 3, 2024
76cecf0
enabled address check
DylanTinianov Apr 3, 2024
7564bd2
Update tracker_test.go
DylanTinianov Apr 3, 2024
a40c5e6
Update tracker.go
DylanTinianov Apr 3, 2024
37c8844
Ignore confirmed txes
DylanTinianov Apr 10, 2024
68b0211
Don't resend abandoned txes
DylanTinianov Apr 10, 2024
c6b3476
Remove comment
DylanTinianov Apr 10, 2024
50d7594
Update tracker_test.go
DylanTinianov Apr 11, 2024
1cdd390
Remove unused block height
DylanTinianov Apr 16, 2024
a06b64c
changeset
DylanTinianov Apr 16, 2024
fed565a
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Apr 16, 2024
be94745
Update tracker_test.go
DylanTinianov Apr 16, 2024
986556c
Merge branch 'BCI-2638-re-enable-tracker' of https://github.com/smart…
DylanTinianov Apr 16, 2024
f91896c
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Apr 16, 2024
98b705f
Merge branch 'develop' into BCI-2638-re-enable-tracker
DylanTinianov Apr 17, 2024
fed4cf5
Merge branch 'develop' into BCI-2638-re-enable-tracker
prashantkumar1982 Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update tracker
  • Loading branch information
DylanTinianov committed Mar 22, 2024
commit 58de60addbd9ddaf06428285ec655ad18fb025af
34 changes: 24 additions & 10 deletions common/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,21 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startIntern
if err := tr.setEnabledAddresses(ctx); err != nil {
return fmt.Errorf("failed to set enabled addresses: %w", err)
}
tr.lggr.Infof("enabled addresses set for chainID %v", tr.chainID)

if err := tr.trackAbandonedTxes(ctx); err != nil {
return fmt.Errorf("failed to track abandoned txes: %w", err)
}

if len(tr.txCache) > 0 {
tr.lggr.Infof("%d abandoned txes found, starting runLoop", len(tr.txCache))
tr.wg.Add(1)
go tr.runLoop()
if len(tr.txCache) == 0 {
tr.lggr.Info("no abandoned txes found, skipping runLoop")
tr.isStarted = true
return nil
}

tr.lggr.Infof("%d abandoned txes found, starting runLoop", len(tr.txCache))
tr.wg.Add(1)
go tr.runLoop()
tr.isStarted = true
return nil
}
Expand All @@ -137,7 +141,7 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) closeIntern

tr.lggr.Info("stopping tracker")
if !tr.isStarted {
return fmt.Errorf("tracker not started")
return fmt.Errorf("tracker is not started: %w", services.ErrAlreadyStopped)
}

close(tr.chStop)
Expand All @@ -160,9 +164,14 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
if !exists {
break
}
tr.lggr.Infof("received blockHeight %v", blockHeight)
if err := tr.handleTxesByState(ctx, blockHeight); err != nil {
tr.lggr.Errorw(fmt.Errorf("failed to handle txes by state: %w", err).Error())
}
if len(tr.txCache) == 0 {
tr.lggr.Info("all abandoned txes handled, stopping runLoop")
return
}
}
case <-ttlExceeded.C:
tr.lggr.Info("ttl exceeded")
Expand All @@ -175,13 +184,13 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAbandonedAddresses() []ADDR {
tr.lock.Lock()
defer tr.lock.Unlock()

if !tr.isStarted {
return []ADDR{}
}

tr.lock.Lock()
defer tr.lock.Unlock()

abandonedAddrs := make([]ADDR, len(tr.txCache))
for _, atx := range tr.txCache {
abandonedAddrs = append(abandonedAddrs, atx.fromAddress)
Expand Down Expand Up @@ -209,10 +218,10 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledA
return nil
}

// trackAbandonedTxes called once to find and insert all abandoned txes into the tracker.
// trackAbandonedTxes called once on stratup to find and insert all abandoned txes into the tracker.
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) trackAbandonedTxes(ctx context.Context) (err error) {
if tr.isStarted {
return fmt.Errorf("tracker already started")
return fmt.Errorf("trackAbandonedTxes must only be called once on startup")
}

return sqlutil.Batch(func(offset, limit uint) (count uint, err error) {
Expand Down Expand Up @@ -282,6 +291,11 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleConfi
tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
blockHeight int64,
) error {
if blockHeight == 0 {
// Can't be sure if tx is finalized or not during initialization
return nil
}

finalized, err := tr.txStore.IsTxFinalized(ctx, blockHeight, tx.ID, tr.chainID)
if err != nil {
return fmt.Errorf("failed to check if tx is finalized: %w", err)
Expand Down
20 changes: 13 additions & 7 deletions core/chains/evm/txmgr/tracker_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package txmgr_test

import (
"context"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -48,8 +47,9 @@ func TestEvmTracker_Initialization(t *testing.T) {
t.Parallel()

tracker, _, _, _ := newTestEvmTrackerSetup(t)
ctx := testutils.Context(t)

require.NoError(t, tracker.Start(testutils.Context(t)))
require.NoError(t, tracker.Start(ctx))
require.True(t, tracker.IsStarted())

t.Run("stop tracker", func(t *testing.T) {
Expand All @@ -60,6 +60,7 @@ func TestEvmTracker_Initialization(t *testing.T) {

func TestEvmTracker_AddressTracking(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)

t.Run("track abandoned addresses", func(t *testing.T) {
ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
Expand All @@ -73,7 +74,7 @@ func TestEvmTracker_AddressTracking(t *testing.T) {
_ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1)
_ = mustCreateUnstartedTx(t, txStore, unstartedAddr, cltest.MustGenerateRandomKey(t).Address, []byte{}, 0, big.Int{}, ethClient.ConfiguredChainID())

err := tracker.Start(context.Background())
err := tracker.Start(ctx)
require.NoError(t, err)
defer func(tracker *txmgr.Tracker) {
err = tracker.Close()
Expand All @@ -92,13 +93,17 @@ func TestEvmTracker_AddressTracking(t *testing.T) {
confirmedAddr := cltest.MustGenerateRandomKey(t).Address
_ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1)

err := tracker.Start(context.Background())
err := tracker.Start(ctx)
require.NoError(t, err)
defer func(tracker *txmgr.Tracker) {
err = tracker.Close()
require.NoError(t, err)
}(tracker)

// deliver block before minConfirmations
tracker.XXXDeliverBlock(1)
time.Sleep(waitTime)

addrs := tracker.GetAbandonedAddresses()
require.Contains(t, addrs, confirmedAddr)

Expand All @@ -113,13 +118,14 @@ func TestEvmTracker_AddressTracking(t *testing.T) {

func TestEvmTracker_ExceedingTTL(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)

t.Run("confirmed but unfinalized transaction still tracked", func(t *testing.T) {
tracker, txStore, _, _ := newTestEvmTrackerSetup(t)
addr1 := cltest.MustGenerateRandomKey(t).Address
_ = mustInsertConfirmedEthTxWithReceipt(t, txStore, addr1, 123, 1)

err := tracker.Start(context.Background())
err := tracker.Start(ctx)
require.NoError(t, err)
defer func(tracker *txmgr.Tracker) {
err = tracker.Close()
Expand All @@ -137,7 +143,7 @@ func TestEvmTracker_ExceedingTTL(t *testing.T) {
tx2 := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, addr2)

tracker.XXXTestSetTTL(time.Nanosecond)
err := tracker.Start(context.Background())
err := tracker.Start(ctx)
require.NoError(t, err)
defer func(tracker *txmgr.Tracker) {
err = tracker.Close()
Expand All @@ -147,7 +153,7 @@ func TestEvmTracker_ExceedingTTL(t *testing.T) {
time.Sleep(waitTime)
require.NotContains(t, tracker.GetAbandonedAddresses(), addr1, addr2)

fatalTxes, err := txStore.GetFatalTransactions(context.Background())
fatalTxes, err := txStore.GetFatalTransactions(ctx)
require.NoError(t, err)
require.True(t, containsID(fatalTxes, tx1.ID))
require.True(t, containsID(fatalTxes, tx2.ID))
Expand Down
Loading