Skip to content

Commit

Permalink
feat: enable aggregates and group_by
Browse files Browse the repository at this point in the history
  • Loading branch information
catalyst17 committed Oct 18, 2024
1 parent bd55ca1 commit 2969e99
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 19 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type QueryParams struct {
// @Description Map of filter parameters
FilterParams map[string]string `schema:"-"`
// @Description Field to group results by
GroupBy string `schema:"group_by"`
GroupBy []string `schema:"group_by"`
// @Description Field to sort results by
SortBy string `schema:"sort_by"`
// @Description Sort order (asc or desc)
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/logs_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func handleLogsRequest(c *gin.Context, contractAddress, signature string) {

logs, err := mainStorage.GetLogs(storage.QueryFilter{
FilterParams: queryParams.FilterParams,
GroupBy: []string{queryParams.GroupBy},
GroupBy: queryParams.GroupBy,
SortBy: queryParams.SortBy,
SortOrder: queryParams.SortOrder,
Page: queryParams.Page,
Expand Down
2 changes: 1 addition & 1 deletion internal/handlers/transactions_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func handleTransactionsRequest(c *gin.Context, contractAddress, signature string

result, err := mainStorage.GetTransactions(storage.QueryFilter{
FilterParams: queryParams.FilterParams,
GroupBy: []string{queryParams.GroupBy},
GroupBy: queryParams.GroupBy,
SortBy: queryParams.SortBy,
SortOrder: queryParams.SortOrder,
Page: queryParams.Page,
Expand Down
45 changes: 41 additions & 4 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,19 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (QueryResult[commo
return executeQuery[common.Transaction](c, "transactions", columns, qf, scanTransaction)
}

func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[common.Log], error) {
columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
return executeQuery[common.Log](c, "logs", columns, qf, scanLog)
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult[map[string]interface{}], error) {
var columns string

if len(qf.GroupBy) > 0 || len(qf.Aggregates) > 0 {
// Build columns for SELECT when grouping or aggregating
selectColumns := append(qf.GroupBy, qf.Aggregates...)
columns = strings.Join(selectColumns, ", ")
} else {
// Default columns when not grouping
columns = "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
}

return executeQuery[map[string]interface{}](c, "logs", columns, qf, scanRowToMap)
}

func executeQuery[T any](c *ClickHouseConnector, table, columns string, qf QueryFilter, scanFunc func(driver.Rows) (T, error)) (QueryResult[T], error) {
Expand Down Expand Up @@ -346,7 +356,13 @@ func (c *ClickHouseConnector) buildQuery(table, columns string, qf QueryFilter)
query = addFilterParams(key, strings.ToLower(value), query)
}

// Add sort by clause
// Add GROUP BY clause if specified
if len(qf.GroupBy) > 0 {
groupByColumns := strings.Join(qf.GroupBy, ", ")
query += fmt.Sprintf(" GROUP BY %s", groupByColumns)
}

// Add ORDER BY clause
if qf.SortBy != "" {
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
}
Expand Down Expand Up @@ -505,6 +521,27 @@ func scanLog(rows driver.Rows) (common.Log, error) {
return log, nil
}

func scanRowToMap(rows driver.Rows) (map[string]interface{}, error) {
columns := rows.Columns()
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))

for i := range columns {
valuePtrs[i] = &values[i]
}

if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}

result := make(map[string]interface{})
for i, col := range columns {
result[col] = values[i]
}

return result, nil
}

func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) {
query := fmt.Sprintf("SELECT number FROM %s.blocks WHERE is_deleted = 0", c.cfg.Database)
if chainId.Sign() > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type IMainStorage interface {

GetBlocks(qf QueryFilter) (blocks []common.Block, err error)
GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error)
GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error)
GetLogs(qf QueryFilter) (logs QueryResult[map[string]interface{}], err error)
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
/**
Expand Down
16 changes: 8 additions & 8 deletions test/mocks/MockIMainStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/mocks/MockIOrchestratorStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/mocks/MockIRPCClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion test/mocks/MockIStagingStorage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2969e99

Please sign in to comment.