Skip to content

Commit e4b9b29

Browse files
authored
Refactor rpc for easier reuse (#88)
### TL;DR Refactored RPC to wrap the needed rpc calls and return already serialized data. This way we don't have to repeat the request creation and response handling whenever we need it. We have fixed queries so maintaining wrappers doesn't seem like a problem ### What changed? - Moved RPC-related functionality from `common` package to a new `rpc` package - Created a new `Client` struct in the `rpc` package to handle RPC operations - Refactored worker logic to use the new RPC client - Updated orchestrator components to use the new RPC client - Added utility functions for handling big.Int slices and RPC configurations ### How to test? 1. Run the indexer with the updated code 2. Verify that block fetching, log retrieval, and trace processing work as expected 3. Check that batch processing is functioning correctly by monitoring RPC calls 4. Ensure that the orchestrator components (poller, committer, failure recoverer) are operating properly with the new RPC client ### Why make this change? This refactoring improves the overall structure and performance of the indexer: 1. Better separation of concerns by moving RPC-related code to its own package 2. Improved maintainability and readability of the codebase 3. Easier extension and modification of RPC-related functionality in the future
2 parents 13398d9 + 49fe94d commit e4b9b29

File tree

17 files changed

+405
-334
lines changed

17 files changed

+405
-334
lines changed

cmd/orchestrator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"github.com/prometheus/client_golang/prometheus/promhttp"
77
"github.com/rs/zerolog/log"
88
"github.com/spf13/cobra"
9-
"github.com/thirdweb-dev/indexer/internal/common"
109
"github.com/thirdweb-dev/indexer/internal/orchestrator"
10+
"github.com/thirdweb-dev/indexer/internal/rpc"
1111
)
1212

1313
var (
@@ -23,7 +23,7 @@ var (
2323

2424
func RunOrchestrator(cmd *cobra.Command, args []string) {
2525
log.Info().Msg("Starting indexer")
26-
rpc, err := common.InitializeRPC()
26+
rpc, err := rpc.Initialize()
2727
if err != nil {
2828
log.Fatal().Err(err).Msg("Failed to initialize RPC")
2929
}

internal/common/block.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ type BlockData struct {
3535
Logs []Log
3636
Traces []Trace
3737
}
38+
39+
type RawBlock = map[string]interface{}

internal/common/log.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ type Log struct {
1616
Data string `json:"data"`
1717
Topics []string `json:"topics"`
1818
}
19+
20+
type RawLogs = []map[string]interface{}

internal/common/rpc.go

Lines changed: 0 additions & 125 deletions
This file was deleted.

internal/common/trace.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,5 @@ type Trace struct {
2727
RewardType string `json:"reward_type"`
2828
RefundAddress string `json:"refund_address"`
2929
}
30+
31+
type RawTraces = []map[string]interface{}

internal/common/utils.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package common
2+
3+
import "math/big"
4+
5+
func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
6+
if chunkSize >= len(values) || chunkSize <= 0 {
7+
return [][]*big.Int{values}
8+
}
9+
var chunks [][]*big.Int
10+
for i := 0; i < len(values); i += chunkSize {
11+
end := i + chunkSize
12+
if end > len(values) {
13+
end = len(values)
14+
}
15+
chunks = append(chunks, values[i:end])
16+
}
17+
return chunks
18+
}

internal/orchestrator/chain_tracker.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,18 @@ import (
55
"time"
66

77
"github.com/rs/zerolog/log"
8-
"github.com/thirdweb-dev/indexer/internal/common"
98
"github.com/thirdweb-dev/indexer/internal/metrics"
9+
"github.com/thirdweb-dev/indexer/internal/rpc"
1010
)
1111

1212
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes
1313

1414
type ChainTracker struct {
15-
rpc common.RPC
15+
rpc rpc.Client
1616
triggerIntervalMs int
1717
}
1818

19-
func NewChainTracker(rpc common.RPC) *ChainTracker {
20-
19+
func NewChainTracker(rpc rpc.Client) *ChainTracker {
2120
return &ChainTracker{
2221
rpc: rpc,
2322
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,

internal/orchestrator/committer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
config "github.com/thirdweb-dev/indexer/configs"
1212
"github.com/thirdweb-dev/indexer/internal/common"
1313
"github.com/thirdweb-dev/indexer/internal/metrics"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
1415
"github.com/thirdweb-dev/indexer/internal/storage"
1516
)
1617

@@ -22,10 +23,10 @@ type Committer struct {
2223
blocksPerCommit int
2324
storage storage.IStorage
2425
pollFromBlock *big.Int
25-
rpc common.RPC
26+
rpc rpc.Client
2627
}
2728

28-
func NewCommitter(rpc common.RPC, storage storage.IStorage) *Committer {
29+
func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
2930
triggerInterval := config.Cfg.Committer.Interval
3031
if triggerInterval == 0 {
3132
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL

internal/orchestrator/failure_recoverer.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
config "github.com/thirdweb-dev/indexer/configs"
1010
"github.com/thirdweb-dev/indexer/internal/common"
1111
"github.com/thirdweb-dev/indexer/internal/metrics"
12+
"github.com/thirdweb-dev/indexer/internal/rpc"
1213
"github.com/thirdweb-dev/indexer/internal/storage"
1314
"github.com/thirdweb-dev/indexer/internal/worker"
1415
)
@@ -20,10 +21,10 @@ type FailureRecoverer struct {
2021
failuresPerPoll int
2122
triggerIntervalMs int
2223
storage storage.IStorage
23-
rpc common.RPC
24+
rpc rpc.Client
2425
}
2526

26-
func NewFailureRecoverer(rpc common.RPC, storage storage.IStorage) *FailureRecoverer {
27+
func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
2728
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
2829
if failuresPerPoll == 0 {
2930
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
@@ -80,7 +81,7 @@ func (fr *FailureRecoverer) Start() {
8081
select {}
8182
}
8283

83-
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []worker.WorkerResult) {
84+
func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFailure, results []rpc.GetFullBlockResult) {
8485
log.Debug().Msgf("Failure Recoverer recovered %d blocks", len(results))
8586
blockFailureMap := make(map[*big.Int]common.BlockFailure)
8687
for _, failure := range blockFailures {
@@ -105,10 +106,10 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
105106
})
106107
} else {
107108
successfulResults = append(successfulResults, common.BlockData{
108-
Block: result.Block,
109-
Logs: result.Logs,
110-
Transactions: result.Transactions,
111-
Traces: result.Traces,
109+
Block: result.Data.Block,
110+
Logs: result.Data.Logs,
111+
Transactions: result.Data.Transactions,
112+
Traces: result.Data.Traces,
112113
})
113114
failuresToDelete = append(failuresToDelete, blockFailureForBlock)
114115
}

internal/orchestrator/orchestrator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import (
44
"sync"
55

66
config "github.com/thirdweb-dev/indexer/configs"
7-
"github.com/thirdweb-dev/indexer/internal/common"
7+
"github.com/thirdweb-dev/indexer/internal/rpc"
88
"github.com/thirdweb-dev/indexer/internal/storage"
99
)
1010

1111
type Orchestrator struct {
12-
rpc common.RPC
12+
rpc rpc.Client
1313
storage storage.IStorage
1414
pollerEnabled bool
1515
failureRecovererEnabled bool
1616
committerEnabled bool
1717
}
1818

19-
func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
19+
func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
2020
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
2121
if err != nil {
2222
return nil, err

internal/orchestrator/poller.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
config "github.com/thirdweb-dev/indexer/configs"
1212
"github.com/thirdweb-dev/indexer/internal/common"
1313
"github.com/thirdweb-dev/indexer/internal/metrics"
14+
"github.com/thirdweb-dev/indexer/internal/rpc"
1415
"github.com/thirdweb-dev/indexer/internal/storage"
1516
"github.com/thirdweb-dev/indexer/internal/worker"
1617
)
@@ -19,7 +20,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
1920
const DEFAULT_TRIGGER_INTERVAL = 1000
2021

2122
type Poller struct {
22-
rpc common.RPC
23+
rpc rpc.Client
2324
blocksPerPoll int64
2425
triggerIntervalMs int64
2526
storage storage.IStorage
@@ -32,7 +33,7 @@ type BlockNumberWithError struct {
3233
Error error
3334
}
3435

35-
func NewPoller(rpc common.RPC, storage storage.IStorage) *Poller {
36+
func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
3637
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
3738
if blocksPerPoll == 0 {
3839
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
@@ -169,9 +170,9 @@ func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int)
169170
return endBlock
170171
}
171172

172-
func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
173-
var successfulResults []worker.WorkerResult
174-
var failedResults []worker.WorkerResult
173+
func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
174+
var successfulResults []rpc.GetFullBlockResult
175+
var failedResults []rpc.GetFullBlockResult
175176

176177
for _, result := range results {
177178
if result.Error != nil {
@@ -185,17 +186,17 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
185186
blockData := make([]common.BlockData, 0, len(successfulResults))
186187
for _, result := range successfulResults {
187188
blockData = append(blockData, common.BlockData{
188-
Block: result.Block,
189-
Logs: result.Logs,
190-
Transactions: result.Transactions,
191-
Traces: result.Traces,
189+
Block: result.Data.Block,
190+
Logs: result.Data.Logs,
191+
Transactions: result.Data.Transactions,
192+
Traces: result.Data.Traces,
192193
})
193194
}
194195
if err := p.storage.StagingStorage.InsertBlockData(blockData); err != nil {
195196
e := fmt.Errorf("error inserting block data: %v", err)
196197
log.Error().Err(e)
197198
for _, result := range successfulResults {
198-
failedResults = append(failedResults, worker.WorkerResult{
199+
failedResults = append(failedResults, rpc.GetFullBlockResult{
199200
BlockNumber: result.BlockNumber,
200201
Error: e,
201202
})
@@ -208,7 +209,7 @@ func (p *Poller) handleWorkerResults(results []worker.WorkerResult) {
208209
}
209210
}
210211

211-
func (p *Poller) handleBlockFailures(results []worker.WorkerResult) {
212+
func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
212213
var blockFailures []common.BlockFailure
213214
for _, result := range results {
214215
if result.Error != nil {

0 commit comments

Comments
 (0)