Skip to content

Commit

Permalink
feat: sliding window and rt quantile metrics (#2356)
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark authored Jul 14, 2023
1 parent 33e78f2 commit 412bf7b
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 1 deletion.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.7.0
github.com/influxdata/tdigest v0.0.1
github.com/jinzhu/copier v0.3.5
github.com/knadh/koanf v1.5.0
github.com/kr/pretty v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
Expand Down Expand Up @@ -1630,8 +1632,10 @@ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNq
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
Expand Down
6 changes: 6 additions & 0 deletions metrics/prometheus/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const (
ipKey = constant.IpKey
methodKey = constant.MethodKey
versionKey = constant.VersionKey
)

const (
providerField = "provider"
consumerField = "consumer"

Expand All @@ -49,3 +51,7 @@ const (
processingField = "processing"
succeedField = "succeed"
)

var (
quantiles = []float64{0.5, 0.9, 0.95, 0.99}
)
17 changes: 17 additions & 0 deletions metrics/prometheus/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package prometheus

import (
"fmt"
"strconv"
"strings"
)

Expand Down Expand Up @@ -49,6 +51,7 @@ type rpcCommonMetrics struct {
rtMillisecondsSum *prometheus.CounterVec
rtMillisecondsAvg *GaugeVecWithSyncMap
rtMillisecondsLast *prometheus.GaugeVec
rtMillisecondsQuantiles *quantileGaugeVec
}

type providerMetrics struct {
Expand All @@ -64,6 +67,7 @@ func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) {
pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}

type consumerMetrics struct {
Expand All @@ -79,8 +83,10 @@ func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) {
cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}

// buildMetricsName builds metrics name split by "_".
func buildMetricsName(args ...string) string {
sb := strings.Builder{}
for _, arg := range args {
Expand All @@ -90,3 +96,14 @@ func buildMetricsName(args ...string) string {
res := strings.TrimPrefix(sb.String(), "_")
return res
}

// buildRTQuantilesMetricsNames is only used for building rt quantiles metric names.
func buildRTQuantilesMetricsNames(role string, quantiles []float64) []string {
res := make([]string, 0, len(quantiles))
for _, q := range quantiles {
quantileField := fmt.Sprintf("p%v", strconv.FormatFloat(q*100, 'f', -1, 64))
name := buildMetricsName(role, rtField, milliSecondsField, quantileField)
res = append(res, name)
}
return res
}
47 changes: 46 additions & 1 deletion metrics/prometheus/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec {
return prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -151,7 +155,7 @@ func newAutoSummaryVec(name, namespace string, labels []string, maxAge int64) *p

type GaugeVecWithSyncMap struct {
GaugeVec *prometheus.GaugeVec
SyncMap *sync.Map
SyncMap *sync.Map // key: labels, value: *atomic.Value
}

func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap {
Expand Down Expand Up @@ -253,3 +257,44 @@ func (gv *GaugeVecWithSyncMap) updateAvg(labels *prometheus.Labels, curValue int
}
}
}

type quantileGaugeVec struct {
gaugeVecSlice []*prometheus.GaugeVec
quantiles []float64
syncMap *sync.Map // key: labels string, value: TimeWindowQuantile
}

// Notice: names and quantiles should be the same length and same order.
func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec {
gvs := make([]*prometheus.GaugeVec, len(names))
for i, name := range names {
gvs[i] = newAutoGaugeVec(name, namespace, labels)
}
gv := &quantileGaugeVec{
gaugeVecSlice: gvs,
quantiles: quantiles,
syncMap: &sync.Map{},
}
return gv
}

func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) {
key := convertLabelsToMapKey(*labels)
cur := aggregate.NewTimeWindowQuantile(100, 10, 120)
cur.Add(float64(curValue))

updateFunc := func(td *aggregate.TimeWindowQuantile) {
qs := td.Quantiles(gv.quantiles)
for i, q := range qs {
gv.gaugeVecSlice[i].With(*labels).Set(q)
}
}

if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
store := actual.(*aggregate.TimeWindowQuantile)
store.Add(float64(curValue))
updateFunc(store)
} else {
updateFunc(cur)
}
}
2 changes: 2 additions & 0 deletions metrics/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr
go reporter.provider.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
case consumerField:
go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
}
}
42 changes: 42 additions & 0 deletions metrics/util/aggregate/pane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

// pane represents a window over a period of time.
// It uses interface{} to store any type of value.
type pane struct {
startInMs int64
endInMs int64
intervalInMs int64
value interface{}
}

func newPane(intervalInMs, startInMs int64, value interface{}) *pane {
return &pane{
startInMs: startInMs,
endInMs: startInMs + intervalInMs,
intervalInMs: intervalInMs,
value: value,
}
}

func (p *pane) resetTo(startInMs int64, value interface{}) {
p.startInMs = startInMs
p.endInMs = startInMs + p.intervalInMs
p.value = value
}
85 changes: 85 additions & 0 deletions metrics/util/aggregate/quantile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import (
"sync"
"time"
)

import (
"github.com/influxdata/tdigest"
)

// TimeWindowQuantile wrappers sliding window around T-Digest.
//
// It uses T-Digest algorithm to calculate quantile.
// The window is divided into several panes, and each pane's value is a TDigest instance.
type TimeWindowQuantile struct {
compression float64
window *slidingWindow
mux sync.RWMutex
}

func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds int64) *TimeWindowQuantile {
return &TimeWindowQuantile{
compression: compression,
window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
}
}

// Quantile returns a quantile of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantile(q float64) float64 {
return t.mergeTDigests().Quantile(q)
}

// Quantiles returns quantiles of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
td := t.mergeTDigests()

res := make([]float64, len(qs))
for i, q := range qs {
res[i] = td.Quantile(q)
}

return res
}

// mergeTDigests merges all panes' TDigests into one TDigest.
func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest {
t.mux.RLock()
defer t.mux.RUnlock()

td := tdigest.NewWithCompression(t.compression)
for _, v := range t.window.values(time.Now().UnixMilli()) {
td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
}
return td
}

// Add adds a value to the sliding window's current pane.
func (t *TimeWindowQuantile) Add(value float64) {
t.mux.Lock()
defer t.mux.Unlock()

t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1)
}

func (t *TimeWindowQuantile) newEmptyValue() interface{} {
return tdigest.NewWithCompression(t.compression)
}
60 changes: 60 additions & 0 deletions metrics/util/aggregate/quantile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import "testing"

func TestAddAndQuantile(t1 *testing.T) {
timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)
for i := 1; i <= 100; i++ {
timeWindowQuantile.Add(float64(i))
}

type args struct {
q float64
}

tests := []struct {
name string
args args
want float64
}{
{
name: "Quantile: 0.01",
args: args{
q: 0.01,
},
want: 1.5,
},
{
name: "Quantile: 0.99",
args: args{
q: 0.99,
},
want: 99.5,
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := timeWindowQuantile
if got := t.Quantile(tt.args.q); got != tt.want {
t1.Errorf("Quantile() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 412bf7b

Please sign in to comment.