diff --git a/internal/common/block.go b/internal/common/block.go index 48a4570..4bcbe04 100644 --- a/internal/common/block.go +++ b/internal/common/block.go @@ -36,4 +36,10 @@ type BlockData struct { Traces []Trace } +type BlockHeader struct { + Number *big.Int `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parent_hash"` +} + type RawBlock = map[string]interface{} diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 27ef783..b3bccbf 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -758,6 +758,122 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, return traces, nil } +func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'reorg', '%s')", c.cfg.Database, chainId, blockNumber.String()) + err := c.conn.Exec(context.Background(), query) + return err +} + +func (c *ClickHouseConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) { + query := fmt.Sprintf("SELECT number, hash, parent_hash FROM %s.blocks WHERE chain_id = %s AND number <= %s AND is_deleted = 0 ORDER BY number DESC", c.cfg.Database, chainId.String(), lookbackStart.String()) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + query += getLimitClause(limit) + + rows, err := c.conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var blockHeader common.BlockHeader + err := rows.Scan(&blockHeader.Number, &blockHeader.Hash, &blockHeader.ParentHash) + if err != nil { + return nil, err + } + blockHeaders = append(blockHeaders, blockHeader) + } + return blockHeaders, nil +} + +func (c *ClickHouseConnector) DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error { + var saveErr error + var saveErrMutex sync.Mutex + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "blocks", "chain_id, number, is_deleted"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "logs", "chain_id, block_number, is_deleted"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "transactions", "chain_id, block_number, is_deleted"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErrMutex.Unlock() + } + }() + + go func() { + defer wg.Done() + if err := c.deleteBatch(chainId, blockNumbers, "traces", "chain_id, block_number, is_deleted"); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErrMutex.Unlock() + } + }() + + wg.Wait() + + if saveErr != nil { + return saveErr + } + return nil +} + +func (c *ClickHouseConnector) deleteBatch(chainId *big.Int, blockNumbers []*big.Int, table string, columns string) error { + query := fmt.Sprintf("INSERT INTO %s.%s (%s)", c.cfg.Database, table, columns) + batch, err := c.conn.PrepareBatch(context.Background(), query) + if err != nil { + return err + } + for _, blockNumber := range blockNumbers { + err = batch.Append( + chainId, + blockNumber, + 1, + ) + if err != nil { + return err + } + } + return batch.Send() +} + // TODO make this atomic func (c *ClickHouseConnector) InsertDataForBlocks(data []common.BlockData) error { blocks := make([]common.Block, 0, len(data)) diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 27039af..22c613c 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -38,6 +38,8 @@ type IOrchestratorStorage interface { GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) StoreBlockFailures(failures []common.BlockFailure) error DeleteBlockFailures(failures []common.BlockFailure) error + GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) + SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error } type IStagingStorage interface { @@ -55,6 +57,11 @@ type IMainStorage interface { GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + /** + * Get block headers ordered from latest to oldest. + */ + LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) (blockHeaders []common.BlockHeader, err error) + DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error } func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error) { diff --git a/internal/storage/memory.go b/internal/storage/memory.go index 7de9a9b..80550e6 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -332,6 +332,24 @@ func traceAddressToString(traceAddress []uint64) string { return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]") } +func (m *MemoryConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + key := fmt.Sprintf("reorg_check:%s", chainId.String()) + value, ok := m.cache.Get(key) + if !ok { + return nil, fmt.Errorf("no reorg check block number found for chain %s", chainId.String()) + } + blockNumber, ok := new(big.Int).SetString(value, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", value) + } + return blockNumber, nil +} + +func (m *MemoryConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + m.cache.Add(fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String()) + return nil +} + func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error { blocks := make([]common.Block, 0, len(data)) logs := make([]common.Log, 0) @@ -359,3 +377,44 @@ func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error { } return nil } + +func (m *MemoryConnector) DeleteDataForBlocks(chainId *big.Int, blockNumbers []*big.Int) error { + blockNumbersToCheck := getBlockNumbersToCheck(QueryFilter{BlockNumbers: blockNumbers}) + for _, key := range m.cache.Keys() { + shouldDelete := isKeyForBlock(key, fmt.Sprintf("block:%s:", chainId), blockNumbersToCheck) || + isKeyForBlock(key, fmt.Sprintf("log:%s:", chainId), blockNumbersToCheck) || + isKeyForBlock(key, fmt.Sprintf("transaction:%s:", chainId), blockNumbersToCheck) || + isKeyForBlock(key, fmt.Sprintf("trace:%s:", chainId), blockNumbersToCheck) + if shouldDelete { + m.cache.Remove(key) + } + } + return nil +} + +func (m *MemoryConnector) LookbackBlockHeaders(chainId *big.Int, limit int, lookbackStart *big.Int) ([]common.BlockHeader, error) { + blockHeaders := []common.BlockHeader{} + for _, key := range m.cache.Keys() { + if strings.HasPrefix(key, fmt.Sprintf("block:%s:", chainId.String())) { + blockNumberStr := strings.Split(key, ":")[2] + blockNumber, ok := new(big.Int).SetString(blockNumberStr, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberStr) + } + if blockNumber.Cmp(lookbackStart) <= 0 { + value, _ := m.cache.Get(key) + block := common.Block{} + err := json.Unmarshal([]byte(value), &block) + if err != nil { + return nil, err + } + blockHeaders = append(blockHeaders, common.BlockHeader{ + Number: blockNumber, + Hash: block.Hash, + ParentHash: block.ParentHash, + }) + } + } + } + return blockHeaders, nil +} diff --git a/internal/storage/redis.go b/internal/storage/redis.go index 49e6e0c..3e11236 100644 --- a/internal/storage/redis.go +++ b/internal/storage/redis.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/big" "github.com/go-redis/redis/v8" "github.com/rs/zerolog/log" @@ -115,3 +116,22 @@ func (r *RedisConnector) DeleteBlockFailures(failures []common.BlockFailure) err } return nil } + +func (r *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { + ctx := context.Background() + blockNumberString, err := r.client.Get(ctx, fmt.Sprintf("reorg_check:%s", chainId.String())).Result() + if err != nil { + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (r *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ctx := context.Background() + r.client.Set(ctx, fmt.Sprintf("reorg_check:%s", chainId.String()), blockNumber.String(), 0) + return nil +} diff --git a/internal/tools/clickhouse_create_cursors_table.sql b/internal/tools/clickhouse_create_cursors_table.sql new file mode 100644 index 0000000..a33a7d6 --- /dev/null +++ b/internal/tools/clickhouse_create_cursors_table.sql @@ -0,0 +1,7 @@ +CREATE TABLE cursors ( + `chain_id` UInt256, + `cursor_type` String, + `cursor_value` String, + `insert_timestamp` DateTime DEFAULT now(), +) ENGINE = ReplacingMergeTree(insert_timestamp) +ORDER BY (chain_id, cursor_type); diff --git a/internal/tools/clickhouse_create_staging_table.sql b/internal/tools/clickhouse_create_staging_table.sql index 1f606b6..2ab6ec2 100644 --- a/internal/tools/clickhouse_create_staging_table.sql +++ b/internal/tools/clickhouse_create_staging_table.sql @@ -6,5 +6,5 @@ CREATE TABLE block_data ( `is_deleted` UInt8 DEFAULT 0, INDEX idx_block_number block_number TYPE minmax GRANULARITY 1, ) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted) -ORDER BY (chain_id, block_number) PRIMARY KEY (chain_id, block_number) +ORDER BY (chain_id, block_number) SETTINGS allow_experimental_replacing_merge_with_cleanup = 1; \ No newline at end of file