Skip to content

Commit

Permalink
change main storage interface to handle inserts all at once
Browse files Browse the repository at this point in the history
  • Loading branch information
iuwqyir committed Oct 4, 2024
1 parent 5b416ef commit 4773497
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 79 deletions.
67 changes: 1 addition & 66 deletions internal/orchestrator/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"math/big"
"sort"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -151,7 +150,7 @@ func (c *Committer) commit(blockData []common.BlockData) error {
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))

// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
if err := c.saveDataToMainStorage(blockData); err != nil {
if err := c.storage.MainStorage.InsertDataForBlocks(blockData); err != nil {
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
return fmt.Errorf("error saving data to main storage: %v", err)
}
Expand All @@ -167,70 +166,6 @@ func (c *Committer) commit(blockData []common.BlockData) error {
return nil
}

func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error {
var commitWg sync.WaitGroup
commitWg.Add(4)

var commitErr error
var commitErrMutex sync.Mutex

blocks := make([]common.Block, 0, len(blockData))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, block := range blockData {
blocks = append(blocks, block.Block)
logs = append(logs, block.Logs...)
transactions = append(transactions, block.Transactions...)
traces = append(traces, block.Traces...)
}

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting blocks: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertLogs(logs); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting logs: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting transactions: %v", err)
commitErrMutex.Unlock()
}
}()

go func() {
defer commitWg.Done()
if err := c.storage.MainStorage.InsertTraces(traces); err != nil {
commitErrMutex.Lock()
commitErr = fmt.Errorf("error inserting traces: %v", err)
commitErrMutex.Unlock()
}
}()

commitWg.Wait()

if commitErr != nil {
return commitErr
}

return nil
}

func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error {
// increment the a gap counter in prometheus
metrics.GapCounter.Inc()
Expand Down
83 changes: 79 additions & 4 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -55,7 +56,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) {
return conn, nil
}

func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
func (c *ClickHouseConnector) insertBlocks(blocks []common.Block) error {
query := `
INSERT INTO ` + c.cfg.Database + `.blocks (
chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce,
Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error {
func (c *ClickHouseConnector) insertTransactions(txs []common.Transaction) error {
query := `
INSERT INTO ` + c.cfg.Database + `.transactions (
chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index,
Expand Down Expand Up @@ -142,7 +143,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error
return batch.Send()
}

func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error {
func (c *ClickHouseConnector) insertLogs(logs []common.Log) error {
query := `
INSERT INTO ` + c.cfg.Database + `.logs (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand Down Expand Up @@ -663,7 +664,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error {
return batch.Send()
}

func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error {
func (c *ClickHouseConnector) insertTraces(traces []common.Trace) error {
query := `
INSERT INTO ` + c.cfg.Database + `.traces (
chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index,
Expand Down Expand Up @@ -756,3 +757,77 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace,
}
return traces, nil
}

// TODO make this atomic
func (c *ClickHouseConnector) InsertDataForBlocks(data []common.BlockData) error {
blocks := make([]common.Block, 0, len(data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
}

var saveErr error
var saveErrMutex sync.Mutex
var wg sync.WaitGroup

if len(blocks) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertBlocks(blocks); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting blocks: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(logs) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertLogs(logs); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting logs: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(transactions) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTransactions(transactions); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting transactions: %v", err)
saveErrMutex.Unlock()
}
}()
}

if len(traces) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
if err := c.insertTraces(traces); err != nil {
saveErrMutex.Lock()
saveErr = fmt.Errorf("error deleting traces: %v", err)
saveErrMutex.Unlock()
}
}()
}

wg.Wait()

if saveErr != nil {
return saveErr
}
return nil
}
7 changes: 2 additions & 5 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,9 @@ type IStagingStorage interface {
}

type IMainStorage interface {
InsertBlocks(blocks []common.Block) error
InsertTransactions(txs []common.Transaction) error
InsertLogs(logs []common.Log) error
InsertTraces(traces []common.Trace) error
InsertDataForBlocks(data []common.BlockData) error

GetBlocks(qf QueryFilter) (logs []common.Block, err error)
GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
Expand Down
36 changes: 32 additions & 4 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er
return nil
}

func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error {
func (m *MemoryConnector) insertBlocks(blocks []common.Block) error {
for _, block := range blocks {
blockJson, err := json.Marshal(block)
if err != nil {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) {
return blocks, nil
}

func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error {
func (m *MemoryConnector) insertTransactions(txs []common.Transaction) error {
for _, tx := range txs {
txJson, err := json.Marshal(tx)
if err != nil {
Expand Down Expand Up @@ -143,7 +143,7 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction,
return txs, nil
}

func (m *MemoryConnector) InsertLogs(logs []common.Log) error {
func (m *MemoryConnector) insertLogs(logs []common.Log) error {
for _, log := range logs {
logJson, err := json.Marshal(log)
if err != nil {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (m *MemoryConnector) DeleteBlockData(data []common.BlockData) error {
return nil
}

func (m *MemoryConnector) InsertTraces(traces []common.Trace) error {
func (m *MemoryConnector) insertTraces(traces []common.Trace) error {
for _, trace := range traces {
traceJson, err := json.Marshal(trace)
if err != nil {
Expand Down Expand Up @@ -331,3 +331,31 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) {
func traceAddressToString(traceAddress []uint64) string {
return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]")
}

func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error {
blocks := make([]common.Block, 0, len(data))
logs := make([]common.Log, 0)
transactions := make([]common.Transaction, 0)
traces := make([]common.Trace, 0)

for _, blockData := range data {
blocks = append(blocks, blockData.Block)
logs = append(logs, blockData.Logs...)
transactions = append(transactions, blockData.Transactions...)
traces = append(traces, blockData.Traces...)
}

if err := m.insertBlocks(blocks); err != nil {
return err
}
if err := m.insertLogs(logs); err != nil {
return err
}
if err := m.insertTransactions(transactions); err != nil {
return err
}
if err := m.insertTraces(traces); err != nil {
return err
}
return nil
}

0 comments on commit 4773497

Please sign in to comment.