Skip to content

Commit 7893288

Browse files
ceyonurDarioush Jalali
andauthored
Tx indexing fix (#1131)
* fix unindexor in state sync * fix lint * fix flaky test * Update core/blockchain_test.go Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org> Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> * Update core/test_blockchain.go Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org> Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> * Update core/test_blockchain.go Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org> Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> * Update core/test_blockchain.go Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org> Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> * fix reviews * readd delay for index init * Update core/rawdb/accessors_state_sync.go Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org> Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> * tx indexer fix: avoids using sleep in test (#1151) * avoids using sleep in test * track all goroutines * trying harder * add some debug information in case of fail * Update core/blockchain.go Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org> * Update core/blockchain.go Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org> --------- Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org> --------- Signed-off-by: Ceyhun Onur <ceyhunonur54@gmail.com> Signed-off-by: Darioush Jalali <darioush.jalali@avalabs.org> Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org>
1 parent c77ba56 commit 7893288

File tree

6 files changed

+217
-148
lines changed

6 files changed

+217
-148
lines changed

core/blockchain.go

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ type CacheConfig struct {
178178
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
179179
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
180180
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
181-
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
181+
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call (= StateSyncEnabled)
182182
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
183183
SnapshotVerify bool // Verify generated snapshots
184184
Preimages bool // Whether to store preimage of trie key to the disk
@@ -335,6 +335,9 @@ type BlockChain struct {
335335

336336
// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
337337
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]
338+
339+
// [txIndexTailLock] is used to synchronize the updating of the tx index tail.
340+
txIndexTailLock sync.Mutex
338341
}
339342

340343
// NewBlockChain returns a fully initialised block chain using information
@@ -444,13 +447,32 @@ func NewBlockChain(
444447
// Warm up [hc.acceptedNumberCache] and [acceptedLogsCache]
445448
bc.warmAcceptedCaches()
446449

450+
// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
451+
if bc.cacheConfig.TxLookupLimit != 0 {
452+
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
453+
bc.setTxIndexTail(latestStateSynced)
454+
}
455+
447456
// Start processing accepted blocks effects in the background
448457
go bc.startAcceptor()
449458

450459
// Start tx indexer/unindexer if required.
451460
if bc.cacheConfig.TxLookupLimit != 0 {
452461
bc.wg.Add(1)
453-
go bc.dispatchTxUnindexer()
462+
var (
463+
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
464+
sub = bc.SubscribeChainAcceptedEvent(headCh)
465+
)
466+
go func() {
467+
defer bc.wg.Done()
468+
if sub == nil {
469+
log.Warn("could not create chain accepted subscription to unindex txs")
470+
return
471+
}
472+
defer sub.Unsubscribe()
473+
474+
bc.maintainTxIndex(headCh)
475+
}()
454476
}
455477
return bc, nil
456478
}
@@ -459,9 +481,12 @@ func NewBlockChain(
459481
func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}) {
460482
start := time.Now()
461483
txLookupLimit := bc.cacheConfig.TxLookupLimit
484+
bc.txIndexTailLock.Lock()
462485
defer func() {
463486
txUnindexTimer.Inc(time.Since(start).Milliseconds())
487+
bc.txIndexTailLock.Unlock()
464488
close(done)
489+
bc.wg.Done()
465490
}()
466491

467492
// If head is 0, it means the chain is just initialized and no blocks are inserted,
@@ -476,12 +501,11 @@ func (bc *BlockChain) unindexBlocks(tail uint64, head uint64, done chan struct{}
476501
}
477502
}
478503

479-
// dispatchTxUnindexer is responsible for the deletion of the
480-
// transaction index.
504+
// maintainTxIndex is responsible for the deletion of the
505+
// transaction index. This does not support reconstruction of removed indexes.
481506
// Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
482507
// Meaning that this function should never be called.
483-
func (bc *BlockChain) dispatchTxUnindexer() {
484-
defer bc.wg.Done()
508+
func (bc *BlockChain) maintainTxIndex(headCh <-chan ChainEvent) {
485509
txLookupLimit := bc.cacheConfig.TxLookupLimit
486510

487511
// If the user just upgraded to a new version which supports transaction
@@ -492,26 +516,19 @@ func (bc *BlockChain) dispatchTxUnindexer() {
492516

493517
// Any reindexing done, start listening to chain events and moving the index window
494518
var (
495-
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
496-
headCh = make(chan ChainEvent, 1) // Buffered to avoid locking up the event feed
519+
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
497520
)
498-
sub := bc.SubscribeChainAcceptedEvent(headCh)
499-
if sub == nil {
500-
log.Warn("could not create chain accepted subscription to unindex txs")
501-
return
502-
}
503-
defer sub.Unsubscribe()
504521
log.Info("Initialized transaction unindexer", "limit", txLookupLimit)
505522

506-
// TODO: Uncomment this code when the tx-unindexer fix is ready.
507523
// Launch the initial processing if chain is not empty. This step is
508524
// useful in these scenarios that chain has no progress and indexer
509525
// is never triggered.
510-
// if head := bc.lastAccepted; head != nil && head.NumberU64() > txLookupLimit {
511-
// done = make(chan struct{})
512-
// tail := rawdb.ReadTxIndexTail(bc.db)
513-
// go bc.unindexBlocks(*tail, head.NumberU64(), done)
514-
// }
526+
if head := bc.CurrentBlock(); head != nil && head.Number.Uint64() > txLookupLimit {
527+
done = make(chan struct{})
528+
tail := rawdb.ReadTxIndexTail(bc.db)
529+
bc.wg.Add(1)
530+
go bc.unindexBlocks(*tail, head.Number.Uint64(), done)
531+
}
515532

516533
for {
517534
select {
@@ -525,13 +542,14 @@ func (bc *BlockChain) dispatchTxUnindexer() {
525542
done = make(chan struct{})
526543
// Note: tail will not be nil since it is initialized in this function.
527544
tail := rawdb.ReadTxIndexTail(bc.db)
545+
bc.wg.Add(1)
528546
go bc.unindexBlocks(*tail, headNum, done)
529547
}
530548
case <-done:
531549
done = nil
532550
case <-bc.quit:
533551
if done != nil {
534-
log.Info("Waiting background transaction indexer to exit")
552+
log.Info("Waiting background transaction unindexer to exit")
535553
<-done
536554
}
537555
return
@@ -545,15 +563,22 @@ func (bc *BlockChain) dispatchTxUnindexer() {
545563
// - updating the acceptor tip index
546564
func (bc *BlockChain) writeBlockAcceptedIndices(b *types.Block) error {
547565
batch := bc.db.NewBatch()
566+
if err := bc.batchBlockAcceptedIndices(batch, b); err != nil {
567+
return err
568+
}
569+
if err := batch.Write(); err != nil {
570+
return fmt.Errorf("%w: failed to write accepted indices entries batch", err)
571+
}
572+
return nil
573+
}
574+
575+
func (bc *BlockChain) batchBlockAcceptedIndices(batch ethdb.Batch, b *types.Block) error {
548576
if !bc.cacheConfig.SkipTxIndexing {
549577
rawdb.WriteTxLookupEntriesByBlock(batch, b)
550578
}
551579
if err := rawdb.WriteAcceptorTip(batch, b.Hash()); err != nil {
552580
return fmt.Errorf("%w: failed to write acceptor tip key", err)
553581
}
554-
if err := batch.Write(); err != nil {
555-
return fmt.Errorf("%w: failed to write tx lookup entries batch", err)
556-
}
557582
return nil
558583
}
559584

@@ -2164,6 +2189,8 @@ func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct
21642189
return blockRoots
21652190
}
21662191

2192+
// TODO: split extras to blockchain_extra.go
2193+
21672194
// ResetToStateSyncedBlock reinitializes the state of the blockchain
21682195
// to the trie represented by [block.Root()] after updating
21692196
// in-memory and on disk current block pointers to [block].
@@ -2174,7 +2201,9 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
21742201

21752202
// Update head block and snapshot pointers on disk
21762203
batch := bc.db.NewBatch()
2177-
rawdb.WriteAcceptorTip(batch, block.Hash())
2204+
if err := bc.batchBlockAcceptedIndices(batch, block); err != nil {
2205+
return err
2206+
}
21782207
rawdb.WriteHeadBlockHash(batch, block.Hash())
21792208
rawdb.WriteHeadHeaderHash(batch, block.Hash())
21802209
rawdb.WriteSnapshotBlockHash(batch, block.Hash())
@@ -2187,6 +2216,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
21872216
return err
21882217
}
21892218

2219+
// if txlookup limit is 0 (uindexing disabled), we don't need to repair the tx index tail.
2220+
if bc.cacheConfig.TxLookupLimit != 0 {
2221+
bc.setTxIndexTail(block.NumberU64())
2222+
}
2223+
21902224
// Update all in-memory chain markers
21912225
bc.lastAccepted = block
21922226
bc.acceptorTip = block
@@ -2219,3 +2253,20 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
22192253
func (bc *BlockChain) CacheConfig() *CacheConfig {
22202254
return bc.cacheConfig
22212255
}
2256+
2257+
func (bc *BlockChain) setTxIndexTail(newTail uint64) error {
2258+
bc.txIndexTailLock.Lock()
2259+
defer bc.txIndexTailLock.Unlock()
2260+
2261+
tailP := rawdb.ReadTxIndexTail(bc.db)
2262+
var tailV uint64
2263+
if tailP != nil {
2264+
tailV = *tailP
2265+
}
2266+
2267+
if newTail > tailV {
2268+
log.Info("Repairing tx index tail", "old", tailV, "new", newTail)
2269+
rawdb.WriteTxIndexTail(bc.db, newTail)
2270+
}
2271+
return nil
2272+
}

0 commit comments

Comments
 (0)