Skip to content

[WIP]feat: support metric exemplars feature #4770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/metric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/zeromicro/go-zero/core/proc"
)

const exemplarLabelTraceID = "trace_id"

type (
// A HistogramVecOpts is a histogram vector options.
HistogramVecOpts struct {
Expand All @@ -23,6 +25,11 @@ type (
Observe(v int64, labels ...string)
// ObserveFloat allow to observe float64 values.
ObserveFloat(v float64, labels ...string)
// ObserveWithExemplar allows to observe float64 values with exemplar labels.
ObserveWithExemplar(v float64, exemplarLabels prom.Labels, labels ...string)
// ObserveWithTrace is shorthand for ObserveWithExemplar with traceID,
// will fallback to ObserveFloat if traceID is empty.
ObserveWithTrace(v float64, traceID string, labels ...string)
close() bool
}

Expand Down Expand Up @@ -68,6 +75,22 @@ func (hv *promHistogramVec) ObserveFloat(v float64, labels ...string) {
})
}

func (hv *promHistogramVec) ObserveWithExemplar(v float64, exemplarLabels prom.Labels, labels ...string) {
update(func() {
hv.histogram.
WithLabelValues(labels...).(prom.ExemplarObserver). // histogram is ExemplarObserver
ObserveWithExemplar(v, exemplarLabels)
})
}

func (hv *promHistogramVec) ObserveWithTrace(v float64, traceID string, labels ...string) {
if traceID == "" {
hv.ObserveFloat(v, labels...)
return
}
hv.ObserveWithExemplar(v, prom.Labels{exemplarLabelTraceID: traceID}, labels...)
}

func (hv *promHistogramVec) close() bool {
return prom.Unregister(hv.histogram)
}
68 changes: 68 additions & 0 deletions core/metric/histogram_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package metric

import (
"regexp"
"strings"
"testing"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -49,3 +52,68 @@ func TestHistogramObserve(t *testing.T) {
err := testutil.CollectAndCompare(hv.histogram, strings.NewReader(metadata+val))
assert.Nil(t, err)
}

func Test_promHistogramVec_ObserveWithExemplar(t *testing.T) {
startAgent()
histogramVec := NewHistogramVec(&HistogramVecOpts{
Name: "counts",
Help: "rpc server requests duration(ms).",
Buckets: []float64{1, 2, 3},
Labels: []string{"method"},
})
defer histogramVec.(*promHistogramVec).close()

histogramVec.ObserveWithExemplar(1.5, prom.Labels{"test": "test15"}, "/Users")
histogramVec.ObserveWithExemplar(2.5, prom.Labels{"test": "test25"}, "/Users")
histogramVec.ObserveWithExemplar(3.5, prom.Labels{"test": "test35"}, "/Users")
hv, _ := histogramVec.(*promHistogramVec)

expect := `# HELP counts rpc server requests duration(ms).
# TYPE counts histogram
counts_bucket{method="/Users",le="1.0"} 0
counts_bucket{method="/Users",le="2.0"} 1 # {test="test15"} 1.5
counts_bucket{method="/Users",le="3.0"} 2 # {test="test25"} 2.5
counts_bucket{method="/Users",le="+Inf"} 3 # {test="test35"} 3.5
counts_sum{method="/Users"} 7.5
counts_count{method="/Users"} 3
`
m, err := testutil.CollectAndFormat(hv.histogram, expfmt.TypeOpenMetrics, "counts")
assert.NoError(t, err)
assert.Equal(t, expect, removeTimestamp(string(m)))
}

func Test_promHistogramVec_ObserveWithTrace(t *testing.T) {
startAgent()
histogramVec := NewHistogramVec(&HistogramVecOpts{
Name: "counts",
Help: "rpc server requests duration(ms).",
Buckets: []float64{1, 2, 3},
Labels: []string{"method"},
})
defer histogramVec.(*promHistogramVec).close()

histogramVec.ObserveWithTrace(1.5, "4bf92f3577b34da6a3ce929d0e4e6b3a", "/Users")
histogramVec.ObserveWithTrace(2.5, "1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d", "/Users")
histogramVec.ObserveWithTrace(3.5, "8e7f6a5d4c3b2a1f0e9d8c7b6a5d4c3b", "/Users")
hv, _ := histogramVec.(*promHistogramVec)

expect := `# HELP counts rpc server requests duration(ms).
# TYPE counts histogram
counts_bucket{method="/Users",le="1.0"} 0
counts_bucket{method="/Users",le="2.0"} 1 # {trace_id="4bf92f3577b34da6a3ce929d0e4e6b3a"} 1.5
counts_bucket{method="/Users",le="3.0"} 2 # {trace_id="1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d"} 2.5
counts_bucket{method="/Users",le="+Inf"} 3 # {trace_id="8e7f6a5d4c3b2a1f0e9d8c7b6a5d4c3b"} 3.5
counts_sum{method="/Users"} 7.5
counts_count{method="/Users"} 3
`
m, err := testutil.CollectAndFormat(hv.histogram, expfmt.TypeOpenMetrics, "counts")
assert.NoError(t, err)
assert.Equal(t, expect, removeTimestamp(string(m)))
}

// removeTimestamp removes the timestamp from the OpenMetrics output,
// eg: counts_bucket{method="/Users",le="2.0"} 1 # {test="test15"} 1.5 1.7442025686415942e+09
func removeTimestamp(s string) string {
r := regexp.MustCompile(`\s+\d+\.\d+e[+-]\d+`)
return r.ReplaceAllString(s, "")
}
3 changes: 3 additions & 0 deletions internal/devserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ type Config struct {
EnableMetrics bool `json:",default=true"`
EnablePprof bool `json:",default=true"`
HealthResponse string `json:",default=OK"`
// EnableOpenMetrics expose promhttp.HandlerOpts.EnableOpenMetrics option to add the experimental OpenMetrics encoding,
// which is the only way to transmit exemplars.
EnableOpenMetrics bool `json:",optional"`
}
7 changes: 6 additions & 1 deletion internal/devserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/pprof"
"sync"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/prometheus"
Expand Down Expand Up @@ -45,7 +46,11 @@ func (s *Server) addRoutes(c Config) {
if s.config.EnableMetrics {
// enable prometheus global switch
prometheus.Enable()
s.handleFunc(s.config.MetricsPath, promhttp.Handler().ServeHTTP)
s.handleFunc(s.config.MetricsPath, promhttp.InstrumentMetricHandler(
prom.DefaultRegisterer, promhttp.HandlerFor(prom.DefaultGatherer, promhttp.HandlerOpts{
EnableOpenMetrics: s.config.EnableOpenMetrics,
}),
).ServeHTTP)
}

// pprof
Expand Down
3 changes: 2 additions & 1 deletion rest/handler/prometheushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/zeromicro/go-zero/core/metric"
"github.com/zeromicro/go-zero/core/timex"
"github.com/zeromicro/go-zero/internal/trace"
"github.com/zeromicro/go-zero/rest/internal/response"
)

Expand Down Expand Up @@ -38,7 +39,7 @@ func PrometheusHandler(path, method string) func(http.Handler) http.Handler {
cw := response.NewWithCodeResponseWriter(w)
defer func() {
code := strconv.Itoa(cw.Code)
metricServerReqDur.Observe(timex.Since(startTime).Milliseconds(), path, method, code)
metricServerReqDur.ObserveWithTrace(float64(timex.Since(startTime).Milliseconds()), trace.TraceIDFromContext(r.Context()), path, method, code)
metricServerReqCodeTotal.Inc(path, method, code)
}()

Expand Down
3 changes: 2 additions & 1 deletion zrpc/internal/clientinterceptors/prometheusinterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/zeromicro/go-zero/core/metric"
"github.com/zeromicro/go-zero/core/timex"
"github.com/zeromicro/go-zero/internal/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -36,7 +37,7 @@ func PrometheusInterceptor(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
startTime := timex.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
metricClientReqDur.Observe(timex.Since(startTime).Milliseconds(), method)
metricClientReqDur.ObserveWithTrace(float64(timex.Since(startTime).Milliseconds()), trace.TraceIDFromContext(ctx), method)
metricClientReqCodeTotal.Inc(method, strconv.Itoa(int(status.Code(err))))
return err
}
3 changes: 2 additions & 1 deletion zrpc/internal/serverinterceptors/prometheusinterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/zeromicro/go-zero/core/metric"
"github.com/zeromicro/go-zero/core/timex"
"github.com/zeromicro/go-zero/internal/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -36,7 +37,7 @@ func UnaryPrometheusInterceptor(ctx context.Context, req any,
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
startTime := timex.Now()
resp, err := handler(ctx, req)
metricServerReqDur.Observe(timex.Since(startTime).Milliseconds(), info.FullMethod)
metricServerReqDur.ObserveWithTrace(float64(timex.Since(startTime).Milliseconds()), trace.TraceIDFromContext(ctx), info.FullMethod)
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
return resp, err
}
Loading