Skip to content

Commit eccdd0f

Browse files
authored
ci: fix panic in indexer (#1402)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> 3 fixes: 1. test not using a ctx with a cancel. 2. t.Parallel() was contributing to flakiness 3. added ctx done check in long running for loop <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved transaction processing to allow non-blocking behavior and better handle context cancellation. - Enhanced thread safety with the addition of mutex locks in various methods to prevent race conditions. - **Tests** - Updated test functions to include context management and cancellation capabilities for more robust testing scenarios. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 21e37e9 commit eccdd0f

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

mempool/clist_mempool.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type CListMempool struct {
2929

3030
// notify listeners (ie. consensus) when txs are available
3131
notifiedTxsAvailable bool
32+
txAvailMtx sync.Mutex
3233
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
3334

3435
config *config.MempoolConfig
@@ -501,6 +502,8 @@ func (mem *CListMempool) TxsAvailable() <-chan struct{} {
501502
}
502503

503504
func (mem *CListMempool) notifyTxsAvailable() {
505+
mem.txAvailMtx.Lock()
506+
defer mem.txAvailMtx.Unlock()
504507
if mem.Size() == 0 {
505508
panic("notified txs available but mempool is empty!")
506509
}

node/full_client_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ func TestGenesisChunked(t *testing.T) {
171171
mockApp.On("InitChain", mock.Anything, mock.Anything).Return(&abci.ResponseInitChain{}, nil)
172172
privKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
173173
signingKey, _, _ := crypto.GenerateEd25519Key(crand.Reader)
174-
n, _ := newFullNode(context.Background(), config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t))
174+
ctx, cancel := context.WithCancel(context.Background())
175+
defer cancel()
176+
n, _ := newFullNode(ctx, config.NodeConfig{DAAddress: MockServerAddr}, privKey, signingKey, proxy.NewLocalClientCreator(mockApp), genDoc, test.NewFileLogger(t))
175177

176178
rpc := NewFullClient(n)
177179

@@ -926,7 +928,6 @@ func TestStatus(t *testing.T) {
926928
}
927929

928930
func TestFutureGenesisTime(t *testing.T) {
929-
t.Parallel()
930931
assert := assert.New(t)
931932
require := require.New(t)
932933

state/txindex/indexer_service.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,26 @@ func (is *IndexerService) OnStart() error {
7474
batch := NewBatch(numTxs)
7575

7676
for i := int64(0); i < numTxs; i++ {
77-
msg2 := <-txsSub.Out()
78-
txResult := msg2.Data().(types.EventDataTx).TxResult
79-
80-
if err = batch.Add(&txResult); err != nil {
81-
is.Logger.Error(
82-
"failed to add tx to batch",
83-
"height", height,
84-
"index", txResult.Index,
85-
"err", err,
86-
)
87-
88-
if is.terminateOnError {
89-
if err := is.Stop(); err != nil {
90-
is.Logger.Error("failed to stop", "err", err)
77+
select {
78+
case <-is.ctx.Done():
79+
return
80+
case msg2 := <-txsSub.Out():
81+
txResult := msg2.Data().(types.EventDataTx).TxResult
82+
83+
if err = batch.Add(&txResult); err != nil {
84+
is.Logger.Error(
85+
"failed to add tx to batch",
86+
"height", height,
87+
"index", txResult.Index,
88+
"err", err,
89+
)
90+
91+
if is.terminateOnError {
92+
if err := is.Stop(); err != nil {
93+
is.Logger.Error("failed to stop", "err", err)
94+
}
95+
return
9196
}
92-
return
9397
}
9498
}
9599
}

0 commit comments

Comments
 (0)