Skip to content

Commit

Permalink
[Grafana] geohash_grid: Support the only new aggregration from Graf…
Browse files Browse the repository at this point in the history
…ana (#1327)

Doesn't support `bounds` parameter, but will warn to logs if someone
uses it. Only `precision` is visible in Grafana, so doesn't really
matter there. Let's add that in another PR if needed.
  • Loading branch information
trzysiek authored Mar 3, 2025
1 parent d76ef28 commit 36380f1
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docs/public/docs/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Currently supported:
- most popular [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html),
including: `boolean`, `match`, `match phrase`, `multi-match`, `query string`, `nested`, `match all`, `exists`, `prefix`, `range`, `term`, `terms`, `wildcard`
- most popular [Aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html),
including: `avg`, `cardinality`, `max`, `min`, `percentile ranks`, `percentiles`, `stats`, `sum`, `top hits`, `top metrics`, `value counts`,
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `singificant terms`, `terms`, `ip prefix`, `ip range`
including: `avg`, `cardinality`, `max`, `min`, `percentile ranks`, `percentiles`, `stats`, `sum`, `top hits`, `top metrics`, `value count`,
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `singificant terms`, `terms`, `ip prefix`, `ip range`, `geohash_grid`

Which as a result allows you to run Kibana/OSD queries and dashboards on data residing in ClickHouse/Hydrolix.

Expand Down
2 changes: 1 addition & 1 deletion platform/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Geo-line | :x: | Filters | :white_check_mark: | Derivative | :white_check_mark: |
Cartesian-bounds | :x: | Frequent item sets | :x: | Extended stats bucket | :x: |
Cartesian-centroid | :x: | Geo-distance | :x: | Inference bucket | :x: |
Matrix stats | :x: | Geohash grid | :x: | Max bucket | :white_check_mark: |
Matrix stats | :x: | Geohash grid | :white_check_mark: | Max bucket | :white_check_mark: |
Max | :white_check_mark: | Geotile grid | :x: | Min bucket | :white_check_mark: |
Median absolute deviation | :x: | Global | :x: | Moving function | :wavy_dash: |
Min | :white_check_mark: | Histogram | :white_check_mark: | Moving percentiles | :x: |
Expand Down
103 changes: 103 additions & 0 deletions platform/model/bucket_aggregations/geohash_grid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/model"
"reflect"
)

type GeoHashGrid struct {
ctx context.Context
}

func NewGeoHashGrid(ctx context.Context) GeoHashGrid {
return GeoHashGrid{ctx: ctx}
}

func (query GeoHashGrid) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query GeoHashGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
buckets := make([]model.JsonMap, 0, len(rows))
for _, row := range rows {
if len(row.Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in geohash_grid aggregation response, len(rows[0].Cols): %d",
len(row.Cols),
)
return model.JsonMap{"buckets": []model.JsonMap{}}
}

buckets = append(buckets, model.JsonMap{
"key": row.SecondLastColValue(),
"doc_count": row.LastColValue(),
})
}

return model.JsonMap{
"buckets": buckets,
}
}

func (query GeoHashGrid) String() string {
return "geohash_grid"
}

// TODO make part of QueryType interface and implement for all aggregations
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsGeohashGrid(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{
"field": "string",
}
optionalParams := map[string]string{
"bounds": "map",
"precision": "float64", // TODO should be int, low priority for fixing
"shard_size": "float64", // TODO should be int, low priority for fixing
"size": "float64", // TODO should be int, low priority for fixing
}
logIfYouSeeThemParams := []string{"bounds", "shard_size"}

params, ok := paramsRaw.(model.JsonMap)
if !ok {
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in params", paramName)
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
}

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in GeoHash Grid params %v", paramName, params)
}
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
}
}
}

// log if you see them
for _, warnParam := range logIfYouSeeThemParams {
if _, exists := params[warnParam]; exists {
logger.WarnWithCtxAndThrottling(ctx, "geohash_grid", warnParam, "we didn't expect %s in GeoHash Grid params %v", warnParam, params)
}
}

return nil
}
4 changes: 4 additions & 0 deletions platform/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (r *QueryResultRow) LastColValue() any {
return r.Cols[len(r.Cols)-1].Value
}

func (r *QueryResultRow) SecondLastColValue() any {
return r.Cols[len(r.Cols)-2].Value
}

// SameSubsetOfColumns returns if r and other have the same values for columns with names in colNames
// They are results of the same query, so we can assume that the columns are in the same order.
func (r *QueryResultRow) SameSubsetOfColumns(other *QueryResultRow, colNames []string) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
{"range", cw.parseRangeAggregation},
{"auto_date_histogram", cw.parseAutoDateHistogram},
{"geotile_grid", cw.parseGeotileGrid},
{"geohash_grid", cw.parseGeohashGrid},
{"significant_terms", func(node *pancakeAggregationTreeNode, params QueryMap) error {
return cw.parseTermsAggregation(node, params, "significant_terms")
}},
Expand Down Expand Up @@ -379,6 +380,30 @@ func (cw *ClickhouseQueryTranslator) parseGeotileGrid(aggregation *pancakeAggreg
return nil
}

func (cw *ClickhouseQueryTranslator) parseGeohashGrid(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
const (
defaultSize = 10000
defaultPrecision = 5
)

if err := bucket_aggregations.CheckParamsGeohashGrid(cw.Ctx, params); err != nil {
return err
}

fieldName := cw.parseStringField(params, "field", "") // default doesn't matter, we checked it's present in CheckParamsGeohashGrid
lon := model.NewGeoLon(fieldName)
lat := model.NewGeoLat(fieldName)
precision := cw.parseIntField(params, "precision", defaultPrecision)

aggregation.queryType = bucket_aggregations.NewGeoHashGrid(cw.Ctx)
aggregation.selectedColumns = append(aggregation.selectedColumns,
model.NewFunction("geohashEncode", lon, lat, model.NewLiteral(precision)))
aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExpr(model.NewCountFunc(), model.DescOrder))
aggregation.limit = cw.parseSize(params, defaultSize)

return nil
}

// TODO: In geotile_grid, without order specidfied, Elastic returns sort by key (a/b/c earlier than x/y/z if a<x or (a=x && b<y), etc.)
// Maybe add some ordering, but doesn't seem to be very important.
func (cw *ClickhouseQueryTranslator) parseComposite(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
Expand Down
69 changes: 69 additions & 0 deletions platform/testdata/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,73 @@ var GrafanaAggregationTests = []AggregationTestCase{
WHERE ("aggr__2__order_1_rank"<=5 AND "aggr__2__3__order_1_rank"<=11)
ORDER BY "aggr__2__order_1_rank" ASC, "aggr__2__3__order_1_rank" ASC`,
},
{ // [3]
TestName: "simplest geotile_grid",
QueryRequestJson: `
{
"aggs": {
"2": {
"geohash_grid": {
"field": "geo.coordinates",
"precision": 2
}
}
},
"size": 0
}`,
ExpectedResponse: `
{
"aggregations": {
"2": {
"buckets": [
{
"doc_count": 25,
"key": "dp"
},
{
"doc_count": 21,
"key": "dn"
},
{
"doc_count": 21,
"key": "9z"
}
]
}
},
"hits": {
"hits": [],
"max_score": null,
"total": {
"relation": "eq",
"value": 231
}
},
"status": 200,
"timed_out": false,
"took": 51
}`,
ExpectedPancakeResults: []model.QueryResultRow{
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__2__key_0", "dp"),
model.NewQueryResultCol("aggr__2__count", int64(25)),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__2__key_0", "dn"),
model.NewQueryResultCol("aggr__2__count", int64(21)),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__2__key_0", "9z"),
model.NewQueryResultCol("aggr__2__count", int64(21)),
}},
},
ExpectedPancakeSQL: `
SELECT geohashEncode(__quesma_geo_lon("geo.coordinates"), __quesma_geo_lat(
"geo.coordinates"), 2) AS "aggr__2__key_0", count(*) AS "aggr__2__count"
FROM __quesma_table_name
GROUP BY geohashEncode(__quesma_geo_lon("geo.coordinates"), __quesma_geo_lat(
"geo.coordinates"), 2) AS "aggr__2__key_0"
ORDER BY "aggr__2__count" DESC, "aggr__2__key_0" ASC
LIMIT 10000`,
},
}
19 changes: 0 additions & 19 deletions platform/testdata/unsupported_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,6 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{
}
}`,
},
{ // [6]
TestName: "bucket aggregation: geohash_grid",
QueryType: "geohash_grid",
QueryRequestJson: `
{
"aggs": {
"zoomed-in": {
"aggs": {
"zoom1": {
"geohash_grid": {
"field": "location",
"precision": 8
}
}
}
}
}
}`,
},
{ // [7]
TestName: "bucket aggregation: geohex_grid",
QueryType: "geohex_grid",
Expand Down

0 comments on commit 36380f1

Please sign in to comment.