Skip to content

Commit cdaf53e

Browse files
committed
handle reorgs
1 parent d96a682 commit cdaf53e

File tree

6 files changed

+298
-0
lines changed

6 files changed

+298
-0
lines changed

README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,61 @@ committer:
294294
blocksPerCommit: 1000
295295
```
296296

297+
#### Reorg Handler
298+
Whether to enable the reorg handler. Default is `true`.
299+
300+
cmd: `--reorgHandler-enabled`
301+
env: `REORGHANDLER_ENABLED`
302+
yaml:
303+
```yaml
304+
reorgHandler:
305+
enabled: true
306+
```
307+
308+
#### Reorg Handler Interval
309+
Reorg handler trigger interval in milliseconds. Default is `1000`.
310+
311+
cmd: `--reorgHandler-interval`
312+
env: `REORGHANDLER_INTERVAL`
313+
yaml:
314+
```yaml
315+
reorgHandler:
316+
interval: 3000
317+
```
318+
319+
#### Reorg Handler Blocks Per Scan
320+
How many blocks to scan for reorgs. Default is `100`.
321+
322+
cmd: `--reorgHandler-blocks-per-scan`
323+
env: `REORGHANDLER_BLOCKSPERSCAN`
324+
yaml:
325+
```yaml
326+
reorgHandler:
327+
blocksPerScan: 1000
328+
```
329+
330+
#### Reorg Handler From Block
331+
From which block to start scanning for reorgs. Default is `0`.
332+
333+
cmd: `--reorgHandler-from-block`
334+
env: `REORGHANDLER_FROMBLOCK`
335+
yaml:
336+
```yaml
337+
reorgHandler:
338+
fromBlock: 20000000
339+
```
340+
341+
#### Reorg Handler Force From Block
342+
Whether to force the reorg handler to start from the block specified in `reorgHandler-from-block`. Default is `false`.
343+
344+
cmd: `--reorgHandler-force-from-block`
345+
env: `REORGHANDLER_FORCEFROMBLOCK`
346+
yaml:
347+
```yaml
348+
reorgHandler:
349+
forceFromBlock: true
350+
```
351+
297352
#### Failure Recoverer
298353
Whether to enable the failure recoverer. Default is `true`.
299354

cmd/root.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ func init() {
5555
rootCmd.PersistentFlags().Bool("committer-enabled", true, "Toggle committer")
5656
rootCmd.PersistentFlags().Int("committer-blocks-per-commit", 10, "How many blocks to commit each interval")
5757
rootCmd.PersistentFlags().Int("committer-interval", 1000, "How often to commit blocks in milliseconds")
58+
rootCmd.PersistentFlags().Bool("reorgHandler-enabled", true, "Toggle reorg handler")
59+
rootCmd.PersistentFlags().Int("reorgHandler-interval", 1000, "How often to run reorg handler in milliseconds")
60+
rootCmd.PersistentFlags().Int("reorgHandler-blocks-per-scan", 100, "How many blocks to scan for reorgs")
61+
rootCmd.PersistentFlags().Int("reorgHandler-from-block", 0, "From which block to start scanning for reorgs")
62+
rootCmd.PersistentFlags().Bool("reorgHandler-force-from-block", false, "Force the reorg handler to start from the block specified in `reorgHandler-from-block`")
5863
rootCmd.PersistentFlags().Bool("failure-recoverer-enabled", true, "Toggle failure recoverer")
5964
rootCmd.PersistentFlags().Int("failure-recoverer-blocks-per-run", 10, "How many blocks to run failure recoverer for")
6065
rootCmd.PersistentFlags().Int("failure-recoverer-interval", 1000, "How often to run failure recoverer in milliseconds")
@@ -98,6 +103,11 @@ func init() {
98103
viper.BindPFlag("committer.enabled", rootCmd.PersistentFlags().Lookup("committer-enabled"))
99104
viper.BindPFlag("committer.blocksPerCommit", rootCmd.PersistentFlags().Lookup("committer-blocks-per-commit"))
100105
viper.BindPFlag("committer.interval", rootCmd.PersistentFlags().Lookup("committer-interval"))
106+
viper.BindPFlag("reorgHandler.enabled", rootCmd.PersistentFlags().Lookup("reorgHandler-enabled"))
107+
viper.BindPFlag("reorgHandler.interval", rootCmd.PersistentFlags().Lookup("reorgHandler-interval"))
108+
viper.BindPFlag("reorgHandler.blocksPerScan", rootCmd.PersistentFlags().Lookup("reorgHandler-blocks-per-scan"))
109+
viper.BindPFlag("reorgHandler.fromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-from-block"))
110+
viper.BindPFlag("reorgHandler.forceFromBlock", rootCmd.PersistentFlags().Lookup("reorgHandler-force-from-block"))
101111
viper.BindPFlag("failureRecoverer.enabled", rootCmd.PersistentFlags().Lookup("failure-recoverer-enabled"))
102112
viper.BindPFlag("failureRecoverer.blocksPerRun", rootCmd.PersistentFlags().Lookup("failure-recoverer-blocks-per-run"))
103113
viper.BindPFlag("failureRecoverer.interval", rootCmd.PersistentFlags().Lookup("failure-recoverer-interval"))

configs/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ type CommitterConfig struct {
2828
BlocksPerCommit int `mapstructure:"blocksPerCommit"`
2929
}
3030

31+
type ReorgHandlerConfig struct {
32+
Enabled bool `mapstructure:"enabled"`
33+
Interval int `mapstructure:"interval"`
34+
BlocksPerScan int `mapstructure:"blocksPerScan"`
35+
FromBlock int `mapstructure:"fromBlock"`
36+
ForceFromBlock bool `mapstructure:"forceFromBlock"`
37+
}
38+
3139
type FailureRecovererConfig struct {
3240
Enabled bool `mapstructure:"enabled"`
3341
Interval int `mapstructure:"interval"`
@@ -101,6 +109,7 @@ type Config struct {
101109
Poller PollerConfig `mapstructure:"poller"`
102110
Committer CommitterConfig `mapstructure:"committer"`
103111
FailureRecoverer FailureRecovererConfig `mapstructure:"failureRecoverer"`
112+
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
104113
Storage StorageConfig `mapstructure:"storage"`
105114
API APIConfig `mapstructure:"api"`
106115
}

internal/metrics/metrics.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,16 @@ var (
6969
Help: "The first block number in the failure recoverer batch",
7070
})
7171
)
72+
73+
// Reorg Handler Metrics
74+
var (
75+
ReorgHandlerLastCheckedBlock = promauto.NewGauge(prometheus.GaugeOpts{
76+
Name: "reorg_handler_last_checked_block",
77+
Help: "The last block number that the reorg handler checked",
78+
})
79+
80+
ReorgCounter = promauto.NewCounter(prometheus.CounterOpts{
81+
Name: "reorg_handler_reorg_counter",
82+
Help: "The number of reorgs detected",
83+
})
84+
)

internal/orchestrator/orchestrator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type Orchestrator struct {
1414
pollerEnabled bool
1515
failureRecovererEnabled bool
1616
committerEnabled bool
17+
reorgHandlerEnabled bool
1718
}
1819

1920
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
@@ -28,6 +29,7 @@ func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
2829
pollerEnabled: config.Cfg.Poller.Enabled,
2930
failureRecovererEnabled: config.Cfg.FailureRecoverer.Enabled,
3031
committerEnabled: config.Cfg.Committer.Enabled,
32+
reorgHandlerEnabled: config.Cfg.ReorgHandler.Enabled,
3133
}, nil
3234
}
3335

@@ -61,6 +63,15 @@ func (o *Orchestrator) Start() {
6163
}()
6264
}
6365

66+
if o.reorgHandlerEnabled {
67+
wg.Add(1)
68+
go func() {
69+
defer wg.Done()
70+
reorgHandler := NewReorgHandler(o.rpc, o.storage)
71+
reorgHandler.Start()
72+
}()
73+
}
74+
6475
// The chain tracker is always running
6576
wg.Add(1)
6677
go func() {
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package orchestrator
2+
3+
import (
4+
"fmt"
5+
"math/big"
6+
"time"
7+
8+
"github.com/rs/zerolog/log"
9+
config "github.com/thirdweb-dev/indexer/configs"
10+
"github.com/thirdweb-dev/indexer/internal/common"
11+
"github.com/thirdweb-dev/indexer/internal/metrics"
12+
"github.com/thirdweb-dev/indexer/internal/rpc"
13+
"github.com/thirdweb-dev/indexer/internal/storage"
14+
"github.com/thirdweb-dev/indexer/internal/worker"
15+
)
16+
17+
type ReorgHandler struct {
18+
rpc rpc.Client
19+
storage storage.IStorage
20+
triggerInterval int
21+
blocksPerScan int
22+
lastCheckedBlock *big.Int
23+
worker *worker.Worker
24+
}
25+
26+
const DEFAULT_REORG_HANDLER_INTERVAL = 1000
27+
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100
28+
29+
func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
30+
triggerInterval := config.Cfg.ReorgHandler.Interval
31+
if triggerInterval == 0 {
32+
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
33+
}
34+
blocksPerScan := config.Cfg.ReorgHandler.BlocksPerScan
35+
if blocksPerScan == 0 {
36+
blocksPerScan = DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN
37+
}
38+
return &ReorgHandler{
39+
rpc: rpc,
40+
storage: storage,
41+
worker: worker.NewWorker(rpc),
42+
triggerInterval: triggerInterval,
43+
blocksPerScan: blocksPerScan,
44+
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
45+
}
46+
}
47+
48+
func getInitialCheckedBlockNumber(storage storage.IStorage, chainId *big.Int) *big.Int {
49+
bn := big.NewInt(int64(config.Cfg.ReorgHandler.FromBlock))
50+
if !config.Cfg.ReorgHandler.ForceFromBlock {
51+
storedFromBlock, err := storage.OrchestratorStorage.GetLastReorgCheckedBlockNumber(chainId)
52+
if err != nil {
53+
log.Debug().Err(err).Msgf("Error getting last reorg checked block number, using configured: %s", bn)
54+
return bn
55+
}
56+
if storedFromBlock.Sign() <= 0 {
57+
log.Debug().Msgf("Last reorg checked block number not found, using configured: %s", bn)
58+
return bn
59+
}
60+
log.Debug().Msgf("Last reorg checked block number found, using: %s", storedFromBlock)
61+
return storedFromBlock
62+
}
63+
log.Debug().Msgf("Force from block reorg check flag set, using configured: %s", bn)
64+
return bn
65+
}
66+
67+
func (rh *ReorgHandler) Start() {
68+
interval := time.Duration(rh.triggerInterval) * time.Millisecond
69+
ticker := time.NewTicker(interval)
70+
71+
log.Debug().Msgf("Reorg handler running")
72+
go func() {
73+
for range ticker.C {
74+
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
75+
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
76+
if err != nil {
77+
log.Error().Err(err).Msg("Error getting recent block headers")
78+
continue
79+
}
80+
if len(blockHeaders) == 0 {
81+
log.Warn().Msg("No block headers found")
82+
continue
83+
}
84+
mostRecentBlockHeader := blockHeaders[0]
85+
reorgEndIndex := findReorgEndIndex(blockHeaders)
86+
if reorgEndIndex == -1 {
87+
rh.lastCheckedBlock = mostRecentBlockHeader.Number
88+
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
89+
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
90+
continue
91+
}
92+
metrics.ReorgCounter.Inc()
93+
forkPoint, err := rh.findForkPoint(blockHeaders[reorgEndIndex:])
94+
if err != nil {
95+
log.Error().Err(err).Msg("Error while finding fork point")
96+
continue
97+
}
98+
err = rh.handleReorg(forkPoint, lookbackFrom)
99+
if err != nil {
100+
log.Error().Err(err).Msg("Error while handling reorg")
101+
continue
102+
}
103+
rh.lastCheckedBlock = mostRecentBlockHeader.Number
104+
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
105+
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
106+
}
107+
}()
108+
109+
// Keep the program running (otherwise it will exit)
110+
select {}
111+
}
112+
113+
func findReorgEndIndex(reversedBlockHeaders []common.BlockHeader) (index int) {
114+
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
115+
currentBlock := reversedBlockHeaders[i]
116+
previousBlock := reversedBlockHeaders[i+1]
117+
118+
if currentBlock.ParentHash != previousBlock.Hash {
119+
log.Debug().
120+
Str("currentBlockNumber", currentBlock.Number.String()).
121+
Str("currentBlockHash", currentBlock.Hash).
122+
Str("currentBlockParentHash", currentBlock.ParentHash).
123+
Str("previousBlockNumber", previousBlock.Number.String()).
124+
Str("previousBlockHash", previousBlock.Hash).
125+
Msg("Reorg detected: parent hash mismatch")
126+
return i
127+
}
128+
}
129+
return -1
130+
}
131+
132+
func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader) (forkPoint *big.Int, err error) {
133+
newBlocksByNumber, err := rh.getNewBlocksByNumber(reversedBlockHeaders)
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
for i := 0; i < len(reversedBlockHeaders)-1; i++ {
139+
blockHeader := reversedBlockHeaders[i]
140+
block, ok := (*newBlocksByNumber)[blockHeader.Number.String()]
141+
if !ok {
142+
return nil, fmt.Errorf("block not found: %s", blockHeader.Number.String())
143+
}
144+
if block.Hash == blockHeader.Hash {
145+
previousBlock := reversedBlockHeaders[i+1]
146+
return previousBlock.Number, nil
147+
}
148+
}
149+
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
150+
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
151+
if err != nil {
152+
return nil, fmt.Errorf("error getting next headers batch: %w", err)
153+
}
154+
return rh.findForkPoint(nextHeadersBatch)
155+
}
156+
157+
func (rh *ReorgHandler) getNewBlocksByNumber(reversedBlockHeaders []common.BlockHeader) (*map[string]common.Block, error) {
158+
blockNumbers := make([]*big.Int, 0, len(reversedBlockHeaders))
159+
for _, header := range reversedBlockHeaders {
160+
blockNumbers = append(blockNumbers, header.Number)
161+
}
162+
blockResults := rh.rpc.GetBlocks(blockNumbers)
163+
fetchedBlocksByNumber := make(map[string]common.Block)
164+
for _, blockResult := range blockResults {
165+
if blockResult.Error != nil {
166+
return nil, fmt.Errorf("error fetching block %s: %w", blockResult.BlockNumber.String(), blockResult.Error)
167+
}
168+
fetchedBlocksByNumber[blockResult.BlockNumber.String()] = blockResult.Data
169+
}
170+
return &fetchedBlocksByNumber, nil
171+
}
172+
173+
func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) error {
174+
blockRange := make([]*big.Int, 0, new(big.Int).Sub(reorgEnd, reorgStart).Int64())
175+
for i := new(big.Int).Set(reorgStart); i.Cmp(reorgEnd) <= 0; i.Add(i, big.NewInt(1)) {
176+
blockRange = append(blockRange, new(big.Int).Set(i))
177+
}
178+
179+
results := rh.worker.Run(blockRange)
180+
data := make([]common.BlockData, 0, len(results))
181+
for _, result := range results {
182+
if result.Error != nil {
183+
return fmt.Errorf("cannot fix reorg: failed block %s: %w", result.BlockNumber.String(), result.Error)
184+
}
185+
data = append(data, common.BlockData{
186+
Block: result.Data.Block,
187+
Logs: result.Data.Logs,
188+
Transactions: result.Data.Transactions,
189+
Traces: result.Data.Traces,
190+
})
191+
}
192+
// TODO make delete and insert atomic
193+
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
194+
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
195+
}
196+
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
197+
return fmt.Errorf("error saving data to main storage: %w", err)
198+
}
199+
return nil
200+
}

0 commit comments

Comments
 (0)