85
85
acceptedBlockGasUsedCounter = metrics .NewRegisteredCounter ("chain/block/gas/used/accepted" , nil )
86
86
badBlockCounter = metrics .NewRegisteredCounter ("chain/block/bad/count" , nil )
87
87
88
+ txUnindexTimer = metrics .NewRegisteredCounter ("chain/txs/unindex" , nil )
88
89
acceptedTxsCounter = metrics .NewRegisteredCounter ("chain/txs/accepted" , nil )
89
90
processedTxsCounter = metrics .NewRegisteredCounter ("chain/txs/processed" , nil )
90
91
92
+ acceptedLogsCounter = metrics .NewRegisteredCounter ("chain/logs/accepted" , nil )
93
+ processedLogsCounter = metrics .NewRegisteredCounter ("chain/logs/processed" , nil )
94
+
91
95
ErrRefuseToCorruptArchiver = errors .New ("node has operated with pruning disabled, shutting down to prevent missing tries" )
92
96
93
97
errFutureBlockUnsupported = errors .New ("future block insertion not supported" )
@@ -102,7 +106,6 @@ const (
102
106
feeConfigCacheLimit = 256
103
107
coinbaseConfigCacheLimit = 256
104
108
badBlockLimit = 10
105
- TriesInMemory = 128
106
109
107
110
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
108
111
//
@@ -173,6 +176,7 @@ type CacheConfig struct {
173
176
SkipSnapshotRebuild bool // Whether to skip rebuilding the snapshot in favor of returning an error (only set to true for tests)
174
177
Preimages bool // Whether to store preimage of trie key to the disk
175
178
AcceptedCacheSize int // Depth of accepted headers cache and accepted logs cache at the accepted tip
179
+ TxLookupLimit uint64 // Number of recent blocks for which to maintain transaction lookup indices
176
180
}
177
181
178
182
var DefaultCacheConfig = & CacheConfig {
@@ -269,9 +273,8 @@ type BlockChain struct {
269
273
// during shutdown and in tests.
270
274
acceptorWg sync.WaitGroup
271
275
272
- // [rejournalWg] is used to wait for the trie clean rejournaling to complete.
273
- // This is used during shutdown.
274
- rejournalWg sync.WaitGroup
276
+ // [wg] is used to wait for the async blockchain processes to finish on shutdown.
277
+ wg sync.WaitGroup
275
278
276
279
// quit channel is used to listen for when the blockchain is shut down to close
277
280
// async processes.
@@ -354,6 +357,13 @@ func NewBlockChain(
354
357
// Create the state manager
355
358
bc .stateManager = NewTrieWriter (bc .stateCache .TrieDB (), cacheConfig )
356
359
360
+ // loadLastState writes indices, so we should start the tx indexer after that.
361
+ // Start tx indexer/unindexer here.
362
+ if bc .cacheConfig .TxLookupLimit != 0 {
363
+ bc .wg .Add (1 )
364
+ go bc .dispatchTxUnindexer ()
365
+ }
366
+
357
367
// Re-generate current block state if it is missing
358
368
if err := bc .loadLastState (lastAcceptedHash ); err != nil {
359
369
return nil , err
@@ -401,16 +411,82 @@ func NewBlockChain(
401
411
log .Info ("Starting to save trie clean cache periodically" , "journalDir" , bc .cacheConfig .TrieCleanJournal , "freq" , bc .cacheConfig .TrieCleanRejournal )
402
412
403
413
triedb := bc .stateCache .TrieDB ()
404
- bc .rejournalWg .Add (1 )
414
+ bc .wg .Add (1 )
405
415
go func () {
406
- defer bc .rejournalWg .Done ()
416
+ defer bc .wg .Done ()
407
417
triedb .SaveCachePeriodically (bc .cacheConfig .TrieCleanJournal , bc .cacheConfig .TrieCleanRejournal , bc .quit )
408
418
}()
409
419
}
410
420
411
421
return bc , nil
412
422
}
413
423
424
+ // dispatchTxUnindexer is responsible for the deletion of the
425
+ // transaction index.
426
+ // Invariant: If TxLookupLimit is 0, it means all tx indices will be preserved.
427
+ // Meaning that this function should never be called.
428
+ func (bc * BlockChain ) dispatchTxUnindexer () {
429
+ defer bc .wg .Done ()
430
+ txLookupLimit := bc .cacheConfig .TxLookupLimit
431
+
432
+ // If the user just upgraded to a new version which supports transaction
433
+ // index pruning, write the new tail and remove anything older.
434
+ if rawdb .ReadTxIndexTail (bc .db ) == nil {
435
+ rawdb .WriteTxIndexTail (bc .db , 0 )
436
+ }
437
+
438
+ // unindexes transactions depending on user configuration
439
+ unindexBlocks := func (tail uint64 , head uint64 , done chan struct {}) {
440
+ start := time .Now ()
441
+ defer func () {
442
+ txUnindexTimer .Inc (time .Since (start ).Milliseconds ())
443
+ done <- struct {}{}
444
+ }()
445
+
446
+ // Update the transaction index to the new chain state
447
+ if head - txLookupLimit + 1 >= tail {
448
+ // Unindex a part of stale indices and forward index tail to HEAD-limit
449
+ rawdb .UnindexTransactions (bc .db , tail , head - txLookupLimit + 1 , bc .quit )
450
+ }
451
+ }
452
+ // Any reindexing done, start listening to chain events and moving the index window
453
+ var (
454
+ done chan struct {} // Non-nil if background unindexing or reindexing routine is active.
455
+ headCh = make (chan ChainEvent , 1 ) // Buffered to avoid locking up the event feed
456
+ )
457
+ sub := bc .SubscribeChainAcceptedEvent (headCh )
458
+ if sub == nil {
459
+ log .Warn ("could not create chain accepted subscription to unindex txs" )
460
+ return
461
+ }
462
+ defer sub .Unsubscribe ()
463
+
464
+ for {
465
+ select {
466
+ case head := <- headCh :
467
+ headNum := head .Block .NumberU64 ()
468
+ if headNum < txLookupLimit {
469
+ break
470
+ }
471
+
472
+ if done == nil {
473
+ done = make (chan struct {})
474
+ // Note: tail will not be nil since it is initialized in this function.
475
+ tail := rawdb .ReadTxIndexTail (bc .db )
476
+ go unindexBlocks (* tail , headNum , done )
477
+ }
478
+ case <- done :
479
+ done = nil
480
+ case <- bc .quit :
481
+ if done != nil {
482
+ log .Info ("Waiting background transaction indexer to exit" )
483
+ <- done
484
+ }
485
+ return
486
+ }
487
+ }
488
+ }
489
+
414
490
// writeBlockAcceptedIndices writes any indices that must be persisted for accepted block.
415
491
// This includes the following:
416
492
// - transaction lookup indices
@@ -532,6 +608,9 @@ func (bc *BlockChain) startAcceptor() {
532
608
533
609
acceptorWorkTimer .Inc (time .Since (start ).Milliseconds ())
534
610
acceptorWorkCount .Inc (1 )
611
+ // Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because
612
+ // the logs are already processed in the acceptor queue.
613
+ acceptedLogsCounter .Inc (int64 (len (logs )))
535
614
}
536
615
}
537
616
@@ -555,8 +634,8 @@ func (bc *BlockChain) addAcceptorQueue(b *types.Block) {
555
634
// DrainAcceptorQueue blocks until all items in [acceptorQueue] have been
556
635
// processed.
557
636
func (bc * BlockChain ) DrainAcceptorQueue () {
558
- bc .acceptorClosingLock .Lock ()
559
- defer bc .acceptorClosingLock .Unlock ()
637
+ bc .acceptorClosingLock .RLock ()
638
+ defer bc .acceptorClosingLock .RUnlock ()
560
639
561
640
if bc .acceptorClosed {
562
641
return
@@ -782,7 +861,8 @@ func (bc *BlockChain) ValidateCanonicalChain() error {
782
861
// Transactions are only indexed beneath the last accepted block, so we only check
783
862
// that the transactions have been indexed, if we are checking below the last accepted
784
863
// block.
785
- if current .NumberU64 () <= bc .lastAccepted .NumberU64 () {
864
+ shouldIndexTxs := bc .cacheConfig .TxLookupLimit == 0 || bc .lastAccepted .NumberU64 () < current .NumberU64 ()+ bc .cacheConfig .TxLookupLimit
865
+ if current .NumberU64 () <= bc .lastAccepted .NumberU64 () && shouldIndexTxs {
786
866
// Ensure that all of the transactions have been stored correctly in the canonical
787
867
// chain
788
868
for txIndex , tx := range txs {
@@ -840,7 +920,6 @@ func (bc *BlockChain) Stop() {
840
920
return
841
921
}
842
922
843
- // Wait for accepted feed to process all remaining items
844
923
log .Info ("Closing quit channel" )
845
924
close (bc .quit )
846
925
// Wait for accepted feed to process all remaining items
@@ -868,9 +947,9 @@ func (bc *BlockChain) Stop() {
868
947
log .Info ("Closing scope" )
869
948
bc .scope .Close ()
870
949
871
- // Waiting for clean trie re-journal to complete
872
- log .Info ("Waiting for trie re-journal to complete" )
873
- bc .rejournalWg .Wait ()
950
+ // Waiting for background processes to complete
951
+ log .Info ("Waiting for background processes to complete" )
952
+ bc .wg .Wait ()
874
953
875
954
log .Info ("Blockchain stopped" )
876
955
}
@@ -1313,6 +1392,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
1313
1392
1314
1393
processedBlockGasUsedCounter .Inc (int64 (block .GasUsed ()))
1315
1394
processedTxsCounter .Inc (int64 (block .Transactions ().Len ()))
1395
+ processedLogsCounter .Inc (int64 (len (logs )))
1316
1396
blockInsertCount .Inc (1 )
1317
1397
return nil
1318
1398
}
0 commit comments