Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Draft: PoC metric rollups #1700

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions migration-tool/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw=
github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
Expand Down Expand Up @@ -998,8 +997,6 @@ github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9
github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/common v0.34.0 h1:RBmGO9d/FVjqHT0yUGQwBJhkwKV+wPCn7KGpvfab0uE=
github.com/prometheus/common v0.34.0/go.mod h1:gB3sOl7P0TvJabZpLY5uQMpUqRCPPCyRLCZYc7JZTNE=
github.com/prometheus/common v0.35.0 h1:Eyr+Pw2VymWejHqCugNaQXkAi6KayVNxaHeu6khmFBE=
github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA=
github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4=
Expand Down Expand Up @@ -1096,7 +1093,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.3 h1:dAm0YRdRQlWojc3CrCRgPBzG5f941d0zvAKu7qY4e+I=
github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down Expand Up @@ -1781,7 +1777,6 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func series(queryable promql.Queryable) http.HandlerFunc {
var sets []storage.SeriesSet
var warnings storage.Warnings
for _, mset := range matcherSets {
s, _ := q.Select(false, nil, nil, nil, mset...)
s, _, _ := q.Select(false, nil, nil, nil, mset...)
warnings = append(warnings, s.Warnings()...)
if s.Err() != nil {
respondError(w, http.StatusUnprocessableEntity, s.Err(), "execution")
Expand Down
5 changes: 3 additions & 2 deletions pkg/pgmodel/querier/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro

// QueryHints contain additional metadata which promscale requires
type QueryHints struct {
CurrentNode parser.Node
Lookback time.Duration
CurrentNode parser.Node
Lookback time.Duration
IsInstantQuery bool
}

func GetMetricNameSeriesIds(conn pgxconn.PgxConn, metadata *evalMetadata) (metrics, schemas []string, correspondingSeriesIDs [][]model.SeriesID, err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/querier/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgmodel/querier/rollup"
"github.com/timescale/promscale/pkg/prompb"
)

Expand Down Expand Up @@ -44,7 +45,7 @@ type RemoteReadQuerier interface {
// matching samples.
type SamplesQuerier interface {
// Select returns a series set containing the exemplar that matches the supplied query parameters.
Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node)
Select(mint, maxt int64, sortSeries bool, hints *storage.SelectHints, queryHints *QueryHints, path []parser.Node, ms ...*labels.Matcher) (SeriesSet, parser.Node, *rollup.Config)
}

// ExemplarQuerier queries data using the provided query data and returns the
Expand Down
2 changes: 2 additions & 0 deletions pkg/pgmodel/querier/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/pgmodel/querier/rollup"
)

// promqlMetadata is metadata received directly from our native PromQL engine.
Expand Down Expand Up @@ -39,6 +40,7 @@ type evalMetadata struct {
timeFilter timeFilter
clauses []string
values []interface{}
rollupConfig *rollup.Config
*promqlMetadata
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/pgmodel/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

type pgxQuerier struct {
tools *queryTools
tools *queryTools
samplesQuerier *querySamples
}

var _ Querier = (*pgxQuerier)(nil)
Expand All @@ -39,6 +40,7 @@ func NewQuerier(
rAuth: rAuth,
},
}
querier.samplesQuerier = newQuerySamples(querier)
return querier
}

Expand All @@ -47,7 +49,7 @@ func (q *pgxQuerier) RemoteReadQuerier() RemoteReadQuerier {
}

func (q *pgxQuerier) SamplesQuerier() SamplesQuerier {
return newQuerySamples(q)
return q.samplesQuerier
}

func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier {
Expand Down
6 changes: 5 additions & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,11 @@ func TestPGXQuerierQuery(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}}
querier := pgxQuerier{
tools: &queryTools{
conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()),
},
}

result, err := querier.RemoteReadQuerier().Query(c.query)

Expand Down
21 changes: 14 additions & 7 deletions pkg/pgmodel/querier/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/common/extension"
Expand Down Expand Up @@ -247,15 +248,21 @@ type aggregators struct {

// getAggregators returns the aggregator which should be used to fetch data for
// a single metric. It may apply pushdowns to functions.
func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node) {

agg, node, err := tryPushDown(metadata)
if err != nil {
log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err)
} else if agg != nil {
return agg, node
func getAggregators(metadata *promqlMetadata, usingRollup bool) (*aggregators, parser.Node) {
if !usingRollup {
agg, node, err := tryPushDown(metadata)
if err != nil {
log.Info("msg", "error while trying to push down, will skip pushdown optimization", "error", err)
} else if agg != nil {
return agg, node
}
}

//fmt.Println("start", metadata.selectHints.Start)
//fmt.Println("end", metadata.selectHints.End)
//fmt.Println("range", metadata.selectHints.Range)
//fmt.Println("end - start in mins", (metadata.selectHints.End-metadata.selectHints.Start)/(1000*60))

defaultAggregators := &aggregators{
timeClause: "array_agg(time)",
valueClause: "array_agg(value)",
Expand Down
79 changes: 68 additions & 11 deletions pkg/pgmodel/querier/query_builder_samples.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package querier

import (
Expand All @@ -8,6 +12,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
pgmodel "github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgmodel/querier/rollup"
)

const (
Expand Down Expand Up @@ -103,7 +108,7 @@ const (

// buildSingleMetricSamplesQuery builds a SQL query which fetches the data for
// one metric.
func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, error) {
func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, bool, error) {
// The basic structure of the SQL query which this function produces is:
// SELECT
// series.labels
Expand All @@ -127,7 +132,49 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{
// When pushdowns are available, the <array_aggregator> is a pushdown
// function which the promscale extension provides.

qf, node := getAggregators(metadata.promqlMetadata)
// TODO NEXT: Instant query with max_over_time(), min_over_time()

qf, node := getAggregators(metadata.promqlMetadata, metadata.rollupConfig != nil)

filter := metadata.timeFilter
column := filter.column
samplesSchema := filter.schema

var instantQueryAgg string
if metadata.rollupConfig != nil {
samplesSchema = metadata.rollupConfig.SchemaName()

rollupOptimizer := metadata.rollupConfig.GetOptimizer(metadata.metric)
typeOptimizer := rollup.Optimizer(rollupOptimizer.RangeQuery())
if metadata.queryHints.IsInstantQuery {
typeOptimizer = rollupOptimizer.InstantQuery()
}

column = typeOptimizer.RegularColumnName()

// See if we can optimize the query aggregation, like min_over_time(metric[1h])
path := metadata.promqlMetadata.path
if len(path) >= 2 {
// May contain a functional call. Let's check it out.
grandparent := path[len(path)-2]
if callNode, isPromQLFunc := grandparent.(*parser.Call); isPromQLFunc {
fnName := callNode.Func.Name

columnClause := typeOptimizer.GetColumnClause(fnName)
if columnClause != "" {
column = columnClause

if metadata.queryHints.IsInstantQuery {
instantQueryOptimizer := typeOptimizer.(rollup.InstantQuery)
if agg := instantQueryOptimizer.GetAggForInstantQuery(fnName); agg != "" {
instantQueryAgg = agg
node = grandparent // We have already evaluated the aggregation, hence no need to compute in PromQL engine. So, send this as a pushdown response.
}
}
}
}
}
}

var selectors, selectorClauses []string
values := metadata.values
Expand All @@ -137,17 +184,29 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{
var err error
timeClauseBound, values, err = setParameterNumbers(qf.timeClause, values, qf.timeParams...)
if err != nil {
return "", nil, nil, nil, err
return "", nil, nil, nil, false, err
}
selectors = append(selectors, "result.time_array")
selectorClauses = append(selectorClauses, timeClauseBound+" as time_array")

if instantQueryAgg != "" {
selectorClauses = append(selectorClauses, " current_timestamp::timestamptz as time_array")
} else {
selectorClauses = append(selectorClauses, timeClauseBound+" as time_array")
}
}
valueClauseBound, values, err := setParameterNumbers(qf.valueClause, values, qf.valueParams...)
if err != nil {
return "", nil, nil, nil, err
return "", nil, nil, nil, false, err
}
selectors = append(selectors, "result.value_array")
selectorClauses = append(selectorClauses, valueClauseBound+" as value_array")

valueWithoutAggregation := false
if instantQueryAgg != "" {
selectorClauses = append(selectorClauses, instantQueryAgg+" as value_array")
valueWithoutAggregation = true
} else {
selectorClauses = append(selectorClauses, valueClauseBound+" as value_array")
}

orderByClause := "ORDER BY time"
if qf.unOrdered {
Expand Down Expand Up @@ -201,7 +260,6 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{
// The selectHints' time range is calculated by `getTimeRangesForSelector`,
// which determines the correct `scan_start` for the current expression.

filter := metadata.timeFilter
sh := metadata.selectHints
var start, end string
// selectHints are non-nil when the query was initiated through the `query`
Expand All @@ -214,18 +272,17 @@ func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{
}

finalSQL := fmt.Sprintf(template,
pgx.Identifier{filter.schema, filter.metric}.Sanitize(),
pgx.Identifier{samplesSchema, filter.metric}.Sanitize(),
pgx.Identifier{schema.PromDataSeries, filter.seriesTable}.Sanitize(),
strings.Join(cases, " AND "),
start,
end,
strings.Join(selectorClauses, ", "),
strings.Join(selectors, ", "),
orderByClause,
pgx.Identifier{filter.column}.Sanitize(),
column,
)

return finalSQL, values, node, qf.tsSeries, nil
return finalSQL, values, node, qf.tsSeries, valueWithoutAggregation, nil
}

func buildMultipleMetricSamplesQuery(filter timeFilter, series []pgmodel.SeriesID) (string, error) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/pgmodel/querier/query_remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro
}

qrySamples := newQuerySamples(q.pgxQuerier)
sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers)
// TODO after PoC (harkishen): Do not use rollup in case of remote-read. This is because remote-read results are merged with local
// Prometheus data and using rollups here will cause evaluation problems.
sampleRows, _, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers)
if err != nil {
return nil, err
}
Expand Down
Loading