17
17
package core
18
18
19
19
import (
20
- "errors"
21
20
"fmt"
21
+ "sync/atomic"
22
22
23
23
"github.com/ethereum/go-ethereum/common"
24
24
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -47,26 +47,38 @@ type txIndexer struct {
47
47
// and all others shouldn't.
48
48
limit uint64
49
49
50
+ // The current head of blockchain for transaction indexing. This field
51
+ // is accessed by both the indexer and the indexing progress queries.
52
+ head atomic.Uint64
53
+
54
+ // The current tail of the indexed transactions, null indicates
55
+ // that no transactions have been indexed yet.
56
+ //
57
+ // This field is accessed by both the indexer and the indexing
58
+ // progress queries.
59
+ tail atomic.Pointer [uint64 ]
60
+
50
61
// cutoff denotes the block number before which the chain segment should
51
62
// be pruned and not available locally.
52
- cutoff uint64
53
- db ethdb.Database
54
- progress chan chan TxIndexProgress
55
- term chan chan struct {}
56
- closed chan struct {}
63
+ cutoff uint64
64
+ db ethdb.Database
65
+ term chan chan struct {}
66
+ closed chan struct {}
57
67
}
58
68
59
69
// newTxIndexer initializes the transaction indexer.
60
70
func newTxIndexer (limit uint64 , chain * BlockChain ) * txIndexer {
61
71
cutoff , _ := chain .HistoryPruningCutoff ()
62
72
indexer := & txIndexer {
63
- limit : limit ,
64
- cutoff : cutoff ,
65
- db : chain .db ,
66
- progress : make (chan chan TxIndexProgress ),
67
- term : make (chan chan struct {}),
68
- closed : make (chan struct {}),
73
+ limit : limit ,
74
+ cutoff : cutoff ,
75
+ db : chain .db ,
76
+ term : make (chan chan struct {}),
77
+ closed : make (chan struct {}),
69
78
}
79
+ indexer .head .Store (indexer .resolveHead ())
80
+ indexer .tail .Store (rawdb .ReadTxIndexTail (chain .db ))
81
+
70
82
go indexer .loop (chain )
71
83
72
84
var msg string
@@ -154,6 +166,7 @@ func (indexer *txIndexer) repair(head uint64) {
154
166
// A crash may occur between the two delete operations,
155
167
// potentially leaving dangling indexes in the database.
156
168
// However, this is considered acceptable.
169
+ indexer .tail .Store (nil )
157
170
rawdb .DeleteTxIndexTail (indexer .db )
158
171
rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
159
172
log .Warn ("Purge transaction indexes" , "head" , head , "tail" , * tail )
@@ -174,6 +187,7 @@ func (indexer *txIndexer) repair(head uint64) {
174
187
// Traversing the database directly within the transaction
175
188
// index namespace might be slow and expensive, but we
176
189
// have no choice.
190
+ indexer .tail .Store (nil )
177
191
rawdb .DeleteTxIndexTail (indexer .db )
178
192
rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
179
193
log .Warn ("Purge transaction indexes" , "head" , head , "cutoff" , indexer .cutoff )
@@ -187,6 +201,7 @@ func (indexer *txIndexer) repair(head uint64) {
187
201
// A crash may occur between the two delete operations,
188
202
// potentially leaving dangling indexes in the database.
189
203
// However, this is considered acceptable.
204
+ indexer .tail .Store (& indexer .cutoff )
190
205
rawdb .WriteTxIndexTail (indexer .db , indexer .cutoff )
191
206
rawdb .DeleteAllTxLookupEntries (indexer .db , func (txhash common.Hash , blob []byte ) bool {
192
207
n := rawdb .DecodeTxLookupEntry (blob , indexer .db )
@@ -216,16 +231,15 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
216
231
217
232
// Listening to chain events and manipulate the transaction indexes.
218
233
var (
219
- stop chan struct {} // Non-nil if background routine is active
220
- done chan struct {} // Non-nil if background routine is active
221
- head = indexer .resolveHead () // The latest announced chain head
222
-
234
+ stop chan struct {} // Non-nil if background routine is active
235
+ done chan struct {} // Non-nil if background routine is active
223
236
headCh = make (chan ChainHeadEvent )
224
237
sub = chain .SubscribeChainHeadEvent (headCh )
225
238
)
226
239
defer sub .Unsubscribe ()
227
240
228
241
// Validate the transaction indexes and repair if necessary
242
+ head := indexer .head .Load ()
229
243
indexer .repair (head )
230
244
231
245
// Launch the initial processing if chain is not empty (head != genesis).
@@ -238,17 +252,18 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
238
252
for {
239
253
select {
240
254
case h := <- headCh :
255
+ indexer .head .Store (h .Header .Number .Uint64 ())
241
256
if done == nil {
242
257
stop = make (chan struct {})
243
258
done = make (chan struct {})
244
259
go indexer .run (h .Header .Number .Uint64 (), stop , done )
245
260
}
246
- head = h . Header . Number . Uint64 ()
261
+
247
262
case <- done :
248
263
stop = nil
249
264
done = nil
250
- case ch := <- indexer .progress :
251
- ch <- indexer . report ( head )
265
+ indexer .tail . Store ( rawdb . ReadTxIndexTail ( indexer . db ))
266
+
252
267
case ch := <- indexer .term :
253
268
if stop != nil {
254
269
close (stop )
@@ -264,7 +279,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
264
279
}
265
280
266
281
// report returns the tx indexing progress.
267
- func (indexer * txIndexer ) report (head uint64 ) TxIndexProgress {
282
+ func (indexer * txIndexer ) report (head uint64 , tail * uint64 ) TxIndexProgress {
268
283
// Special case if the head is even below the cutoff,
269
284
// nothing to index.
270
285
if head < indexer .cutoff {
@@ -284,7 +299,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
284
299
}
285
300
// Compute how many blocks have been indexed
286
301
var indexed uint64
287
- tail := rawdb .ReadTxIndexTail (indexer .db )
288
302
if tail != nil {
289
303
indexed = head - * tail + 1
290
304
}
@@ -300,16 +314,12 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
300
314
}
301
315
}
302
316
303
- // txIndexProgress retrieves the tx indexing progress, or an error if the
304
- // background tx indexer is already stopped.
305
- func (indexer * txIndexer ) txIndexProgress () (TxIndexProgress , error ) {
306
- ch := make (chan TxIndexProgress , 1 )
307
- select {
308
- case indexer .progress <- ch :
309
- return <- ch , nil
310
- case <- indexer .closed :
311
- return TxIndexProgress {}, errors .New ("indexer is closed" )
312
- }
317
+ // txIndexProgress retrieves the transaction indexing progress. The reported
318
+ // progress may slightly lag behind the actual indexing state, as the tail is
319
+ // only updated at the end of each indexing operation. However, this delay is
320
+ // considered acceptable.
321
+ func (indexer * txIndexer ) txIndexProgress () TxIndexProgress {
322
+ return indexer .report (indexer .head .Load (), indexer .tail .Load ())
313
323
}
314
324
315
325
// close shutdown the indexer. Safe to be called for multiple times.
0 commit comments