Skip to content

Refactor RPC client to use interface #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func RunOrchestrator(cmd *cobra.Command, args []string) {
log.Fatal().Err(err).Msg("Failed to initialize RPC")
}

orchestrator, err := orchestrator.NewOrchestrator(*rpc)
orchestrator, err := orchestrator.NewOrchestrator(rpc)
if err != nil {
log.Fatal().Err(err).Msg("Failed to create orchestrator")
}
Expand Down
18 changes: 5 additions & 13 deletions internal/orchestrator/chain_tracker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package orchestrator

import (
"context"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -12,11 +11,11 @@ import (
const DEFAULT_CHAIN_TRACKER_POLL_INTERVAL = 300000 // 5 minutes

type ChainTracker struct {
rpc rpc.Client
rpc rpc.IRPCClient
triggerIntervalMs int
}

func NewChainTracker(rpc rpc.Client) *ChainTracker {
func NewChainTracker(rpc rpc.IRPCClient) *ChainTracker {
return &ChainTracker{
rpc: rpc,
triggerIntervalMs: DEFAULT_CHAIN_TRACKER_POLL_INTERVAL,
Expand All @@ -30,23 +29,16 @@ func (ct *ChainTracker) Start() {
log.Debug().Msgf("Chain tracker running")
go func() {
for range ticker.C {
latestBlockNumber, err := ct.getLatestBlockNumber()
latestBlockNumber, err := ct.rpc.GetLatestBlockNumber()
if err != nil {
log.Error().Err(err).Msg("Error getting latest block number")
continue
}
metrics.ChainHead.Set(float64(latestBlockNumber) / 100)
latestBlockNumberFloat, _ := latestBlockNumber.Float64()
metrics.ChainHead.Set(latestBlockNumberFloat)
}
}()

// Keep the program running (otherwise it will exit)
select {}
}

func (ct *ChainTracker) getLatestBlockNumber() (uint64, error) {
blockNumber, err := ct.rpc.EthClient.BlockNumber(context.Background())
if err != nil {
return 0, err
}
return blockNumber, nil
}
12 changes: 6 additions & 6 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type Committer struct {
blocksPerCommit int
storage storage.IStorage
pollFromBlock *big.Int
rpc rpc.Client
rpc rpc.IRPCClient
}

func NewCommitter(rpc rpc.Client, storage storage.IStorage) *Committer {
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
triggerInterval := config.Cfg.Committer.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITTER_TRIGGER_INTERVAL
Expand Down Expand Up @@ -71,7 +71,7 @@ func (c *Committer) Start() {
}

func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.ChainID)
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
if err != nil {
return nil, err
Expand Down Expand Up @@ -103,7 +103,7 @@ func (c *Committer) getSequentialBlockDataToCommit() (*[]common.BlockData, error
return nil, nil
}

blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.ChainID})
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
if err != nil {
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
}
log.Debug().Msgf("Detected %d missing blocks between blocks %s and %s", missingBlockCount, expectedStartBlockNumber.String(), actualFirstBlock.Number.String())

existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.ChainID})
existingBlockFailures, err := c.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{BlockNumbers: missingBlockNumbers, ChainId: c.rpc.GetChainID()})
if err != nil {
return fmt.Errorf("error getting block failures while handling gap: %v", err)
}
Expand All @@ -197,7 +197,7 @@ func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBloc
if _, ok := existingBlockFailuresMap[blockNumberStr]; !ok {
blockFailures = append(blockFailures, common.BlockFailure{
BlockNumber: blockNumber,
ChainId: c.rpc.ChainID,
ChainId: c.rpc.GetChainID(),
FailureTime: time.Now(),
FailureCount: 1,
FailureReason: "Gap detected for this block",
Expand Down
8 changes: 4 additions & 4 deletions internal/orchestrator/failure_recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type FailureRecoverer struct {
failuresPerPoll int
triggerIntervalMs int
storage storage.IStorage
rpc rpc.Client
rpc rpc.IRPCClient
}

func NewFailureRecoverer(rpc rpc.Client, storage storage.IStorage) *FailureRecoverer {
func NewFailureRecoverer(rpc rpc.IRPCClient, storage storage.IStorage) *FailureRecoverer {
failuresPerPoll := config.Cfg.FailureRecoverer.BlocksPerRun
if failuresPerPoll == 0 {
failuresPerPoll = DEFAULT_FAILURES_PER_POLL
Expand All @@ -49,7 +49,7 @@ func (fr *FailureRecoverer) Start() {
go func() {
for range ticker.C {
blockFailures, err := fr.storage.OrchestratorStorage.GetBlockFailures(storage.QueryFilter{
ChainId: fr.rpc.ChainID,
ChainId: fr.rpc.GetChainID(),
Limit: fr.failuresPerPoll,
})
if err != nil {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (fr *FailureRecoverer) handleWorkerResults(blockFailures []common.BlockFail
BlockNumber: result.BlockNumber,
FailureReason: result.Error.Error(),
FailureTime: time.Now(),
ChainId: fr.rpc.ChainID,
ChainId: fr.rpc.GetChainID(),
FailureCount: failureCount,
})
} else {
Expand Down
4 changes: 2 additions & 2 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
)

type Orchestrator struct {
rpc rpc.Client
rpc rpc.IRPCClient
storage storage.IStorage
pollerEnabled bool
failureRecovererEnabled bool
committerEnabled bool
reorgHandlerEnabled bool
}

func NewOrchestrator(rpc rpc.Client) (*Orchestrator, error) {
func NewOrchestrator(rpc rpc.IRPCClient) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&config.Cfg.Storage)
if err != nil {
return nil, err
Expand Down
19 changes: 5 additions & 14 deletions internal/orchestrator/poller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package orchestrator

import (
"context"
"fmt"
"math/big"
"sync"
Expand All @@ -20,7 +19,7 @@ const DEFAULT_BLOCKS_PER_POLL = 10
const DEFAULT_TRIGGER_INTERVAL = 1000

type Poller struct {
rpc rpc.Client
rpc rpc.IRPCClient
blocksPerPoll int64
triggerIntervalMs int64
storage storage.IStorage
Expand All @@ -33,7 +32,7 @@ type BlockNumberWithError struct {
Error error
}

func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
func NewPoller(rpc rpc.IRPCClient, storage storage.IStorage) *Poller {
blocksPerPoll := config.Cfg.Poller.BlocksPerPoll
if blocksPerPoll == 0 {
blocksPerPoll = DEFAULT_BLOCKS_PER_POLL
Expand All @@ -44,7 +43,7 @@ func NewPoller(rpc rpc.Client, storage storage.IStorage) *Poller {
}
untilBlock := big.NewInt(int64(config.Cfg.Poller.UntilBlock))
pollFromBlock := big.NewInt(int64(config.Cfg.Poller.FromBlock))
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.ChainID, untilBlock)
lastPolledBlock, err := storage.StagingStorage.GetLastStagedBlockNumber(rpc.GetChainID(), untilBlock)
if err != nil || lastPolledBlock == nil || lastPolledBlock.Sign() <= 0 {
lastPolledBlock = new(big.Int).Sub(pollFromBlock, big.NewInt(1)) // needs to include the first block
log.Warn().Err(err).Msgf("No last polled block found, setting to %s", lastPolledBlock.String())
Expand Down Expand Up @@ -124,7 +123,7 @@ func (p *Poller) reachedPollLimit(blockNumber *big.Int) bool {
}

func (p *Poller) getBlockRange() ([]*big.Int, error) {
latestBlock, err := p.getLatestBlockNumber()
latestBlock, err := p.rpc.GetLatestBlockNumber()
if err != nil {
return nil, err
}
Expand All @@ -150,14 +149,6 @@ func (p *Poller) getBlockRange() ([]*big.Int, error) {
return blockNumbers, nil
}

func (p *Poller) getLatestBlockNumber() (*big.Int, error) {
latestBlockUint64, err := p.rpc.EthClient.BlockNumber(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get latest block number: %v", err)
}
return new(big.Int).SetUint64(latestBlockUint64), nil
}

func (p *Poller) getEndBlockForRange(startBlock *big.Int, latestBlock *big.Int) *big.Int {
endBlock := new(big.Int).Add(startBlock, big.NewInt(p.blocksPerPoll-1))
if endBlock.Cmp(latestBlock) > 0 {
Expand Down Expand Up @@ -217,7 +208,7 @@ func (p *Poller) handleBlockFailures(results []rpc.GetFullBlockResult) {
BlockNumber: result.BlockNumber,
FailureReason: result.Error.Error(),
FailureTime: time.Now(),
ChainId: p.rpc.ChainID,
ChainId: p.rpc.GetChainID(),
FailureCount: 1,
})
}
Expand Down
16 changes: 8 additions & 8 deletions internal/orchestrator/reorg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type ReorgHandler struct {
rpc rpc.Client
rpc rpc.IRPCClient
storage storage.IStorage
triggerInterval int
blocksPerScan int
Expand All @@ -26,7 +26,7 @@ type ReorgHandler struct {
const DEFAULT_REORG_HANDLER_INTERVAL = 1000
const DEFAULT_REORG_HANDLER_BLOCKS_PER_SCAN = 100

func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
func NewReorgHandler(rpc rpc.IRPCClient, storage storage.IStorage) *ReorgHandler {
triggerInterval := config.Cfg.ReorgHandler.Interval
if triggerInterval == 0 {
triggerInterval = DEFAULT_REORG_HANDLER_INTERVAL
Expand All @@ -41,7 +41,7 @@ func NewReorgHandler(rpc rpc.Client, storage storage.IStorage) *ReorgHandler {
worker: worker.NewWorker(rpc),
triggerInterval: triggerInterval,
blocksPerScan: blocksPerScan,
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.ChainID),
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.GetChainID()),
}
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func (rh *ReorgHandler) Start() {
go func() {
for range ticker.C {
lookbackFrom := new(big.Int).Add(rh.lastCheckedBlock, big.NewInt(int64(rh.blocksPerScan)))
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
blockHeaders, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
log.Error().Err(err).Msg("Error getting recent block headers")
continue
Expand All @@ -85,7 +85,7 @@ func (rh *ReorgHandler) Start() {
reorgEndIndex := findReorgEndIndex(blockHeaders)
if reorgEndIndex == -1 {
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
continue
}
Expand All @@ -101,7 +101,7 @@ func (rh *ReorgHandler) Start() {
continue
}
rh.lastCheckedBlock = mostRecentBlockHeader.Number
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.ChainID, mostRecentBlockHeader.Number)
rh.storage.OrchestratorStorage.SetLastReorgCheckedBlockNumber(rh.rpc.GetChainID(), mostRecentBlockHeader.Number)
metrics.ReorgHandlerLastCheckedBlock.Set(float64(mostRecentBlockHeader.Number.Int64()))
}
}()
Expand Down Expand Up @@ -147,7 +147,7 @@ func (rh *ReorgHandler) findForkPoint(reversedBlockHeaders []common.BlockHeader)
}
}
lookbackFrom := reversedBlockHeaders[len(reversedBlockHeaders)-1].Number
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.ChainID, rh.blocksPerScan, lookbackFrom)
nextHeadersBatch, err := rh.storage.MainStorage.LookbackBlockHeaders(rh.rpc.GetChainID(), rh.blocksPerScan, lookbackFrom)
if err != nil {
return nil, fmt.Errorf("error getting next headers batch: %w", err)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (rh *ReorgHandler) handleReorg(reorgStart *big.Int, reorgEnd *big.Int) erro
})
}
// TODO make delete and insert atomic
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.ChainID, blockRange); err != nil {
if err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blockRange); err != nil {
return fmt.Errorf("error deleting data for blocks %v: %w", blockRange, err)
}
if err := rh.storage.MainStorage.InsertBlockData(&data); err != nil {
Expand Down
Loading
Loading