Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downsampling based on MaxResolutionWindow #68

Merged
merged 10 commits into from
Dec 19, 2019
Merged
6 changes: 5 additions & 1 deletion cmd/geras/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ func main() {
prometheus.DefaultRegisterer.MustRegister(version.NewCollector("geras"))

// create openTSDBStore and expose its api on a grpc server
srv := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric)
srv, err := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric)
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
grpcSrv := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/G-Research/geras

require (
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a
github.com/go-kit/kit v0.9.0
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117
github.com/pkg/errors v0.8.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 h1:LCY7S5FtlqIWHXV0HBX4yGMelG7DCZX90gSPD1TrmqU=
github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d h1:gsMpaf4Y8Zm70kWestox0+M6jmThzV0zXH78wYQVCuE=
github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a h1:T3zoIUSc6BzpUMsiyw7afr3IiTrELVX4u+tkzk7RWbQ=
github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down
132 changes: 120 additions & 12 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ type OpenTSDBStore struct {
enableMetricSuggestions bool
storeLabels []storepb.Label
healthcheckMetric string
aggregateToDownsample map[storepb.Aggr]string
downsampleToAggregate map[string]storepb.Aggr
}

func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) *OpenTSDBStore {
func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) (*OpenTSDBStore, error) {
store := &OpenTSDBStore{
logger: log.With(logger, "component", "opentsdb"),
openTSDBClient: client,
Expand All @@ -52,8 +54,30 @@ func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prom
blockedMetricNames: blockedMetricNames,
healthcheckMetric: healthcheckMetric,
}
err := store.populateMaps()
if err != nil {
return nil, err
}
store.updateMetrics(context.Background(), logger)
return store
return store, nil
}

func (store *OpenTSDBStore) populateMaps() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is not dependent on external data I'd consider just doing this once in init, then the code doesn't have to pass errors around (just panic, as it will run on startup).

store.aggregateToDownsample = map[storepb.Aggr]string{
storepb.Aggr_COUNT: "count",
storepb.Aggr_SUM: "sum",
storepb.Aggr_MIN: "min",
storepb.Aggr_MAX: "max",
storepb.Aggr_COUNTER: "avg",
}
store.downsampleToAggregate = map[string]storepb.Aggr{}
for a, d := range store.aggregateToDownsample {
if _, exists := store.downsampleToAggregate[d]; exists {
return errors.New(fmt.Sprintf("Invalid aggregate/downsample mapping - not reversible for downsample function %s", d))
}
store.downsampleToAggregate[d] = a
}
return nil
}

type internalMetrics struct {
Expand Down Expand Up @@ -209,7 +233,7 @@ func (store *OpenTSDBStore) Series(
if respI.Error != nil {
return respI.Error
}
res, count, err := convertOpenTSDBResultsToSeriesResponse(respI)
res, count, err := convertOpenTSDBResultsToSeriesResponse(respI, store.downsampleToAggregate)
if err != nil {
return err
}
Expand Down Expand Up @@ -417,19 +441,75 @@ func (store *OpenTSDBStore) composeOpenTSDBQuery(req *storepb.SeriesRequest) (op
return opentsdb.QueryParam{}, nil, nil
}

subQueries := make([]opentsdb.SubQuery, len(metricNames))
aggregationCount := 0
needRawAggregation := true
var downsampleSecs int64
if req.MaxResolutionWindow != 0 {
needRawAggregation = false
for _, agg := range req.Aggregates {
switch agg {
case storepb.Aggr_RAW:
needRawAggregation = true
break
case storepb.Aggr_COUNT:
fallthrough
case storepb.Aggr_SUM:
fallthrough
case storepb.Aggr_MIN:
fallthrough
case storepb.Aggr_MAX:
fallthrough
case storepb.Aggr_COUNTER:
aggregationCount++
break
default:
level.Info(store.logger).Log("err", fmt.Sprintf("Unrecognised series aggregator: %v", agg))
needRawAggregation = true
break
}
}
downsampleSecs = req.MaxResolutionWindow / 1000
}
if needRawAggregation {
aggregationCount++
}
subQueries := make([]opentsdb.SubQuery, len(metricNames)*aggregationCount)
for i, mn := range metricNames {
subQueries[i] = opentsdb.SubQuery{
Aggregator: "none",
Metric: mn,
Fiters: tagFilters,
aggregationIndex := 0
if req.MaxResolutionWindow != 0 {
for _, agg := range req.Aggregates {
addAgg := true
var downsample string
if ds, exists := store.aggregateToDownsample[agg]; exists {
downsample = ds
} else {
addAgg = false
}
if addAgg {
subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{
Aggregator: "none",
Downsample: fmt.Sprintf("%vs-%s", downsampleSecs, downsample),
Metric: mn,
Fiters: tagFilters,
}
aggregationIndex++
}
}
}
if needRawAggregation {
subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{
Aggregator: "none",
Metric: mn,
Fiters: tagFilters,
}
}
}
query := opentsdb.QueryParam{
Start: req.MinTime,
End: req.MaxTime,
Queries: subQueries,
MsResolution: true,
ShowQuery: true,
}
level.Debug(store.logger).Log("tsdb-query", query.String())
return query, warnings, nil
Expand Down Expand Up @@ -464,7 +544,7 @@ func (store *OpenTSDBStore) checkMetricNames(metricNames []string, fullBlock boo
return allowed, warnings, nil
}

func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*storepb.SeriesResponse, int, error) {
func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem, downsampleToAggregate map[string]storepb.Aggr) (*storepb.SeriesResponse, int, error) {
seriesLabels := make([]storepb.Label, len(respI.Tags))
i := 0
for k, v := range respI.Tags {
Expand All @@ -473,6 +553,17 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto
}
seriesLabels = append(seriesLabels, storepb.Label{Name: "__name__", Value: respI.Metric})

downsampleFunction := "none"
if hyphenIndex := strings.Index(respI.Query.Downsample, "-"); hyphenIndex >= 0 {
downsampleFunction = respI.Query.Downsample[hyphenIndex+1:]
}
var aggregate storepb.Aggr
if v, exists := downsampleToAggregate[downsampleFunction]; exists {
aggregate = v
} else {
aggregate = storepb.Aggr_RAW
}

// Turn datapoints into chunks (Prometheus's tsdb encoding)
dps := respI.GetDataPoints()
chunks := []storepb.AggrChunk{}
Expand All @@ -493,11 +584,28 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto
}
a.Append(int64(dp.Timestamp), dp.Value.(float64))
}
chunks = append(chunks, storepb.AggrChunk{
chunk := &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}
aggrChunk := storepb.AggrChunk{
MinTime: minTime,
MaxTime: int64(dps[i-1].Timestamp),
Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()},
})
}
switch aggregate {
case storepb.Aggr_COUNT:
aggrChunk.Count = chunk
case storepb.Aggr_SUM:
aggrChunk.Sum = chunk
case storepb.Aggr_MIN:
aggrChunk.Min = chunk
case storepb.Aggr_MAX:
aggrChunk.Max = chunk
case storepb.Aggr_COUNTER:
aggrChunk.Counter = chunk
case storepb.Aggr_RAW:
fallthrough
default:
aggrChunk.Raw = chunk
}
chunks = append(chunks, aggrChunk)
}
return storepb.NewSeriesResponse(&storepb.Series{
Labels: seriesLabels,
Expand Down
Loading