Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TraceQL Metrics] histogram_over_time #3644

Merged
merged 27 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
50a6984
Add lang/parser support for quantile_over_time, fix missing stringify…
mdisibio Apr 24, 2024
7307d4c
First working draft of quantile_over_time implementation
mdisibio Apr 24, 2024
589d83f
Validate the query in the frontend
mdisibio Apr 25, 2024
6abee43
Histogram accumulate jobs by bucket as they come in instead of at the…
mdisibio Apr 25, 2024
923d7d8
Fix language definition to allow both floats or ints for quantiles
mdisibio Apr 25, 2024
2ccc75f
Remove layer of proto->seriesset conversion. Fix roundtrip of __bucke…
mdisibio Apr 25, 2024
e320bbb
Rename to SimpleAdditionCombiner, slight interval calc cleanup
mdisibio Apr 25, 2024
b845743
Fix p0 returning 1 instead of minimum value, comments cleanup
mdisibio Apr 25, 2024
ee0e5bc
Rename frontend param and fix handling of ints
mdisibio Apr 26, 2024
4c1db95
Fix pre-existing bug in metrics optimization when asserting multiple …
mdisibio Apr 26, 2024
656f525
Fix to support 3 flavors of the metrics pipeline: query-frontend, acr…
mdisibio Apr 26, 2024
81deb1b
Merge branch 'main' into quantile-engine
mdisibio Apr 29, 2024
cb31ed0
Update query_range frontend test for new behavior
mdisibio Apr 29, 2024
b49d994
Consolidate histogram code between traceql and traceqlmetrics. quanti…
mdisibio Apr 30, 2024
05c3af4
lint
mdisibio May 2, 2024
dbe0a18
changelog
mdisibio May 2, 2024
eaf1a5c
histogram_over_time
mdisibio May 2, 2024
5c79dc2
Redo histograms to set __bucket label to the actual value instead of …
mdisibio May 3, 2024
3c8507f
Revert all changes to traceqlmetrics package, was getting too noisy
mdisibio May 3, 2024
6f873a2
Merge branch 'quantile-engine' into histogram_over_time
mdisibio May 6, 2024
cbdadd1
Fix after merge
mdisibio May 6, 2024
eb5d548
lint
mdisibio May 6, 2024
402f4cb
Merge branch 'main' into histogram_over_time
mdisibio May 6, 2024
85e7db1
changelog
mdisibio May 6, 2024
77040db
Merge branch 'main' into histogram_over_time
mdisibio May 7, 2024
48020da
Add test for histogram_over_time
mdisibio May 7, 2024
cb03471
Change quantile_over_time test to use more exported methods
mdisibio May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Consolidate histogram code between traceql and traceqlmetrics. quanti…
…le_over_time test, code cleanup
  • Loading branch information
mdisibio committed Apr 30, 2024
commit b49d994c37d1f1e2147c17b986ab6b585a52ed4e
6 changes: 3 additions & 3 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
if d < 2 {
return Static{}, false
}
return NewStaticInt(int(math.Ceil(math.Log2(float64(d))))), true
return NewStaticInt(Log2Bucket(d)), true
}
default:
// Basic implementation for all other attributes
Expand All @@ -859,7 +859,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
if v.N < 2 {
return Static{}, false
}
return NewStaticInt(int(math.Ceil(math.Log2(float64(v.N))))), true
return NewStaticInt(Log2Bucket(uint64(v.N))), true
}
}
}
Expand All @@ -885,7 +885,7 @@ func (a *MetricsAggregate) initFinal(q *tempopb.QueryRangeRequest) {
if a.attr == IntrinsicDurationAttribute {
div = float64(time.Second)
}
a.seriesAgg = NewHistogramCombiner(q, a.floats, div)
a.seriesAgg = NewHistogramAggregator(q, a.floats, div)
default:
// These are simple additions by series
a.seriesAgg = NewSimpleAdditionCombiner(q)
Expand Down
15 changes: 15 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,28 @@ func newMockSpan(id []byte) *mockSpan {
}
}

func (m *mockSpan) WithStartTime(nanos uint64) *mockSpan {
m.startTimeUnixNanos = nanos
return m
}

func (m *mockSpan) WithDuration(nanos uint64) *mockSpan {
m.durationNanos = nanos
return m
}

func (m *mockSpan) WithNestedSetInfo(parentid, left, right int) *mockSpan {
m.parentID = parentid
m.left = left
m.right = right
return m
}

func (m *mockSpan) WithSpanString(key string, value string) *mockSpan {
m.attributes[NewScopedAttribute(AttributeScopeSpan, false, key)] = NewStaticString(value)
return m
}

func (m *mockSpan) WithAttrBool(key string, value bool) *mockSpan {
m.attributes[NewAttribute(key)] = NewStaticBool(value)
return m
Expand Down
18 changes: 14 additions & 4 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@
start, end, step uint64
}

func NewHistogramCombiner(req *tempopb.QueryRangeRequest, qs []float64, div float64) *HistogramAggregator {
func NewHistogramAggregator(req *tempopb.QueryRangeRequest, qs []float64, div float64) *HistogramAggregator {
return &HistogramAggregator{
div: div,
qs: qs,
Expand Down Expand Up @@ -908,22 +908,32 @@
labels = append(labels, Label{"p", NewStaticFloat(q)})
s := labels.String()

new := TimeSeries{

Check warning on line 911 in pkg/traceql/engine_metrics.go

View workflow job for this annotation

GitHub Actions / Lint

redefines-builtin-id: redefinition of the built-in function new (revive)
Labels: labels,
Values: make([]float64, len(in.hist)),
}
for i := range in.hist {
new.Values[i] = Percentile(q, in.hist[i]) / h.div
new.Values[i] = Log2Quantile(q, in.hist[i]) / h.div
}
results[s] = new
}
}
return results
}

// Percentile returns the p-value given powers-of-two bucket counts. Uses
// Log2Bucket returns which powers-of-two bucket the value lies in.
func Log2Bucket(v uint64) int {
if v < 2 {
return -1
}

return int(math.Ceil(math.Log2(float64(v))))
}

// Log2Quantile returns the quantile given powers-of-two bucket counts. Uses
// exponential interpolation. The original values are int64 so there are always 64 buckets.
func Percentile(p float64, buckets [64]int) float64 {
// Input comes from Log2Bucket
func Log2Quantile(p float64, buckets [64]int) float64 {
if math.IsNaN(p) ||
p < 0 ||
p > 1 {
Expand Down
130 changes: 130 additions & 0 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,133 @@
})
}
}

func TestQuantileOverTime(t *testing.T) {
req := &tempopb.QueryRangeRequest{
Start: uint64(1 * time.Second),
End: uint64(3 * time.Second),
Step: uint64(1 * time.Second),
}

var (
attr = IntrinsicDurationAttribute
qs = []float64{0, 0.5, 1}
by = []Attribute{NewScopedAttribute(AttributeScopeSpan, false, "foo")}
div = float64(time.Second)
)

// A variety of spans across times, durations, and series. All durations are powers of 2 for simplicity
in := []Span{
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(128),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(1*time.Second)).WithSpanString("foo", "bar").WithDuration(512),

newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),
newMockSpan(nil).WithStartTime(uint64(2*time.Second)).WithSpanString("foo", "bar").WithDuration(256),

newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
newMockSpan(nil).WithStartTime(uint64(3*time.Second)).WithSpanString("foo", "baz").WithDuration(512),
}

// Output series with quantiles per foo
// Prom labels are sorted alphabetically, traceql labels maintain original order.
out := SeriesSet{
`{p="0.00000", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: "p", Value: NewStaticFloat(0)},
},
Values: []float64{
0.000000128,
percentileHelper(0, div, 256, 256, 256, 256),
0,
},
},
`{p="0.50000", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: "p", Value: NewStaticFloat(0.5)},
},
Values: []float64{
0.000000256,
percentileHelper(0.5, div, 256, 256, 256, 256),
0,
},
},
`{p="1.00000", span.foo="bar"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("bar")},
{Name: "p", Value: NewStaticFloat(1)},
},
Values: []float64{0.000000512, 0.000000256, 0},
},
`{p="0.00000", span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
{Name: "p", Value: NewStaticFloat(0)},
},
Values: []float64{
0, 0,
percentileHelper(0, div, 512, 512, 512),
},
},
`{p="0.50000", span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
{Name: "p", Value: NewStaticFloat(0.5)},
},
Values: []float64{
0, 0,
percentileHelper(0.5, div, 512, 512, 512),
},
},
`{p="1.00000", span.foo="baz"}`: TimeSeries{
Labels: []Label{
{Name: "span.foo", Value: NewStaticString("baz")},
{Name: "p", Value: NewStaticFloat(1)},
},
Values: []float64{0, 0, 0.000000512},
},
}

// 3 layers of processing matches: query-frontend -> queriers -> generators -> blocks
layer1 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer1.init(req, AggregateModeRaw)

layer2 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer2.init(req, AggregateModeSum)

layer3 := newMetricsAggregateQuantileOverTime(attr, qs, by)
layer3.init(req, AggregateModeFinal)

// Pass spans to layer 1
for _, s := range in {
layer1.observe(s)
}

// Pass layer 1 to layer 2
// These are partial counts over time by bucket
res := layer1.result()
layer2.observeSeries(res.ToProto(req))

// Pass layer 2 to layer 3
// These are summed counts over time by bucket
res = layer2.result()
layer3.observeSeries(res.ToProto(req))

// Layer 3 final results
// The quantiles
final := layer3.result()
require.Equal(t, out, final)
}

func percentileHelper(q, div float64, values ...uint64) float64 {

Check failure on line 416 in pkg/traceql/engine_metrics_test.go

View workflow job for this annotation

GitHub Actions / Lint

`percentileHelper` - `div` always receives `div` (`1e+09`) (unparam)
b := [64]int{}
for _, v := range values {
b[Log2Bucket(v)]++
}
return Log2Quantile(q, b) / div
}
45 changes: 2 additions & 43 deletions pkg/traceqlmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"math"
"strings"

"github.com/grafana/tempo/pkg/traceql"
Expand All @@ -26,7 +25,7 @@ func (m *LatencyHistogram) Record(durationNanos uint64) {
// Increment bucket that matches log2(duration)
var bucket int
if durationNanos >= 2 {
bucket = int(math.Ceil(math.Log2(float64(durationNanos))))
bucket = traceql.Log2Bucket(durationNanos)
}
if bucket >= maxBuckets {
bucket = maxBuckets - 1
Expand All @@ -51,47 +50,7 @@ func (m *LatencyHistogram) Combine(other LatencyHistogram) {

// Percentile returns the estimated latency percentile in nanoseconds.
func (m *LatencyHistogram) Percentile(p float64) uint64 {
if math.IsNaN(p) ||
p < 0 ||
p > 1 ||
m.Count() == 0 {
return 0
}

// Maximum amount of samples to include. We round up to better handle
// percentiles on low sample counts (<100).
maxSamples := int(math.Ceil(p * float64(m.Count())))

// Find the bucket where the percentile falls in
// and the total sample count less than or equal
// to that bucket.
var total, bucket int
for b, count := range m.buckets {
if total+count <= maxSamples {
bucket = b
total += count

if total < maxSamples {
continue
}
}

// We have enough
break
}

// Fraction to interpolate between buckets, sample-count wise.
// 0.5 means halfway
var interp float64
if maxSamples-total > 0 {
interp = float64(maxSamples-total) / float64(m.buckets[bucket+1])
}

// Exponential interpolation between buckets
minDur := math.Pow(2, float64(bucket))
dur := minDur * math.Pow(2, interp)

return uint64(dur)
return uint64(traceql.Log2Quantile(p, m.buckets))
}

// Buckets returns the bucket counts for each power of 2.
Expand Down
Loading