Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race in TestLogPoller_Replay #14431

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add RWMutex around global head var
  • Loading branch information
reductionista committed Sep 17, 2024
commit be4325acf336d9c8c1f4a66247f1cedab7b265c0
20 changes: 20 additions & 0 deletions core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ func TestLogPoller_Replay(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr)

headMutex := sync.RWMutex{}
head := evmtypes.Head{Number: 4}
jmank88 marked this conversation as resolved.
Show resolved Hide resolved

events := []common.Hash{EmitterABI.Events["Log1"].ID}
log1 := types.Log{
Index: 0,
Expand All @@ -301,7 +303,9 @@ func TestLogPoller_Replay(t *testing.T) {

ec := evmclimocks.NewClient(t)
ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) {
headMutex.RLock()
headCopy := head
headMutex.RUnlock()
return &headCopy, nil
})
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Once()
Expand All @@ -318,7 +322,9 @@ func TestLogPoller_Replay(t *testing.T) {
headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)

headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) {
headMutex.RLock()
headCopy := head
headMutex.RUnlock()
finalized := &evmtypes.Head{Number: headCopy.Number - lpOpts.FinalityDepth}
return &headCopy, finalized, nil
})
Expand Down Expand Up @@ -394,7 +400,9 @@ func TestLogPoller_Replay(t *testing.T) {
var wg sync.WaitGroup
defer func() { wg.Wait() }()
ec.On("FilterLogs", mock.Anything, mock.Anything).Once().Return([]types.Log{log1}, nil).Run(func(args mock.Arguments) {
headMutex.Lock()
head = evmtypes.Head{Number: 4}
headMutex.Unlock()
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -421,7 +429,9 @@ func TestLogPoller_Replay(t *testing.T) {

ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms

headMutex.Lock()
head = evmtypes.Head{Number: 5}
headMutex.Unlock()
t.Cleanup(lp.reset)
servicetest.Run(t, lp)

Expand All @@ -448,7 +458,9 @@ func TestLogPoller_Replay(t *testing.T) {
go func() {
defer close(done)

headMutex.Lock()
head = evmtypes.Head{Number: 4} // Restore latest block to 4, so this matches the fromBlock requested
headMutex.Unlock()
select {
case lp.replayStart <- 4:
case <-ctx.Done():
Expand All @@ -469,7 +481,9 @@ func TestLogPoller_Replay(t *testing.T) {
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)

t.Cleanup(lp.reset)
headMutex.Lock()
head = evmtypes.Head{Number: 5} // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs()
headMutex.Unlock()
servicetest.Run(t, lp)

select {
Expand All @@ -482,7 +496,9 @@ func TestLogPoller_Replay(t *testing.T) {
// ReplayAsync should return as soon as replayStart is received
t.Run("ReplayAsync success", func(t *testing.T) {
t.Cleanup(lp.reset)
headMutex.Lock()
head = evmtypes.Head{Number: 5}
headMutex.Unlock()
ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
mockBatchCallContext(t, ec)
servicetest.Run(t, lp)
Expand All @@ -496,7 +512,9 @@ func TestLogPoller_Replay(t *testing.T) {
ctx := testutils.Context(t)
t.Cleanup(lp.reset)
servicetest.Run(t, lp)
headMutex.Lock()
head = evmtypes.Head{Number: 4}
headMutex.Unlock()

anyErr := pkgerrors.New("async error")
observedLogs.TakeAll()
Expand Down Expand Up @@ -528,7 +546,9 @@ func TestLogPoller_Replay(t *testing.T) {
err := lp.orm.DeleteLogsAndBlocksAfter(ctx, 0)
require.NoError(t, err)

headMutex.RLock()
err = lp.orm.InsertBlock(ctx, head.Hash, head.Number, head.Timestamp, head.Number)
headMutex.RUnlock()
require.NoError(t, err)

ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
Expand Down
Loading