Skip to content

Commit

Permalink
implement rpc metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Jan 29, 2023
1 parent 91816c6 commit 7a030fa
Show file tree
Hide file tree
Showing 19 changed files with 717 additions and 19 deletions.
1 change: 1 addition & 0 deletions examples/hotrod/cmd/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var customerCmd = &cobra.Command{
server := customer.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(customerPort)),
otelExporter,
metricsFactory,
logger,
)
return logError(zapLogger, server.Run())
Expand Down
1 change: 1 addition & 0 deletions examples/hotrod/cmd/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var driverCmd = &cobra.Command{
server := driver.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(driverPort)),
otelExporter,
metricsFactory,
logger,
)
return logError(zapLogger, server.Run())
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/cmd/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var frontendCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := frontend.NewServer(
options,
tracing.Init("frontend", otelExporter, logger),
tracing.Init("frontend", otelExporter, metricsFactory, logger),
logger,
)
return logError(zapLogger, server.Run())
Expand Down
10 changes: 5 additions & 5 deletions examples/hotrod/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"time"

"github.com/spf13/cobra"
"github.com/uber/jaeger-lib/metrics"
jexpvar "github.com/uber/jaeger-lib/metrics/expvar"
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/examples/hotrod/services/config"
"github.com/jaegertracing/jaeger/internal/metrics/expvar"
"github.com/jaegertracing/jaeger/internal/metrics/prometheus"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

var (
Expand Down Expand Up @@ -95,10 +95,10 @@ func init() {
func onInitialize() {
switch metricsBackend {
case "expvar":
metricsFactory = jexpvar.NewFactory(10) // 10 buckets for histograms
metricsFactory = expvar.NewFactory(10) // 10 buckets for histograms
logger.Info("Using expvar as metrics backend")
case "prometheus":
metricsFactory = jprom.New().Namespace(metrics.NSOptions{Name: "hotrod", Tags: nil})
metricsFactory = prometheus.New().Namespace(metrics.NSOptions{Name: "hotrod", Tags: nil})
logger.Info("Using Prometheus as metrics backend")
default:
logger.Fatal("unsupported metrics backend " + metricsBackend)
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/cmd/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var routeCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := route.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(routePort)),
tracing.Init("route", otelExporter, logger),
tracing.Init("route", otelExporter, metricsFactory, logger),
logger,
)
return logError(zapLogger, server.Run())
Expand Down
12 changes: 9 additions & 3 deletions examples/hotrod/pkg/tracing/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/tracing/rpcmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

var once sync.Once

// Init initializes OpenTelemetry SDK and uses OTel-OpenTracing Bridge
// to return an OpenTracing-compatible tracer.
func Init(serviceName string, exporterType string, logger log.Factory) opentracing.Tracer {
func Init(serviceName string, exporterType string, metricsFactory metrics.Factory, logger log.Factory) opentracing.Tracer {
once.Do(func() {
otel.SetTextMapPropagator(propagation.TraceContext{})
})
Expand All @@ -49,9 +51,13 @@ func Init(serviceName string, exporterType string, logger log.Factory) opentraci
if err != nil {
logger.Bg().Fatal("cannot create exporter", zap.String("exporterType", exporterType), zap.Error(err))
}
logger.Bg().Info("using " + exporterType + " trace exporter")

rpcmetricsObserver := rpcmetrics.NewObserver(metricsFactory, rpcmetrics.DefaultNameNormalizer)

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp, sdktrace.WithMaxExportBatchSize(1)),
sdktrace.WithBatcher(exp),
sdktrace.WithSpanProcessor(rpcmetricsObserver),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
Expand All @@ -78,7 +84,7 @@ func createOtelExporter(exporterType string) (sdktrace.SpanExporter, error) {
case "stdout":
exporter, err = stdouttrace.New()
default:
err = fmt.Errorf("unrecognized exporter type %s", exporterType)
return nil, fmt.Errorf("unrecognized exporter type %s", exporterType)
}
return exporter, err
}
3 changes: 3 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Package rpcmetrics implements an OpenTelemetry SpanProcessor that can be used to emit RPC metrics.

This package is copied from jaeger-client-go and adapted to work with OpenTelemtery SDK.
63 changes: 63 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2023 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import "sync"

// normalizedEndpoints is a cache for endpointName -> safeName mappings.
type normalizedEndpoints struct {
names map[string]string
maxSize int
normalizer NameNormalizer
mux sync.RWMutex
}

func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedEndpoints {
return &normalizedEndpoints{
maxSize: maxSize,
normalizer: normalizer,
names: make(map[string]string, maxSize),
}
}

// normalize looks up the name in the cache, if not found it uses normalizer
// to convert the name to a safe name. If called with more than maxSize unique
// names it returns "" for all other names beyond those already cached.
func (n *normalizedEndpoints) normalize(name string) string {
n.mux.RLock()
norm, ok := n.names[name]
l := len(n.names)
n.mux.RUnlock()
if ok {
return norm
}
if l >= n.maxSize {
return ""
}
return n.normalizeWithLock(name)
}

func (n *normalizedEndpoints) normalizeWithLock(name string) string {
norm := n.normalizer.Normalize(name)
n.mux.Lock()
defer n.mux.Unlock()
// cache may have grown while we were not holding the lock
if len(n.names) >= n.maxSize {
return ""
}
n.names[name] = norm
return norm
}
44 changes: 44 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2023 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNormalizedEndpoints(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)

assertLen := func(l int) {
n.mux.RLock()
defer n.mux.RUnlock()
assert.Len(t, n.names, l)
}

assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "one translation")
assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "cache hit")
assertLen(1)
assert.Equal(t, "", n.normalize("xys"), "cache overflow")
assertLen(1)
}

func TestNormalizedEndpointsDoubleLocking(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)
assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "fill out the cache")
assert.Equal(t, "", n.normalizeWithLock("xys"), "cache overflow")
}
125 changes: 125 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2023 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import (
"sync"

"github.com/jaegertracing/jaeger/pkg/metrics"
)

const (
otherEndpointsPlaceholder = "other"
endpointNameMetricTag = "endpoint"
)

// Metrics is a collection of metrics for an endpoint describing
// throughput, success, errors, and performance.
type Metrics struct {
// RequestCountSuccess is a counter of the total number of successes.
RequestCountSuccess metrics.Counter `metric:"requests" tags:"error=false"`

// RequestCountFailures is a counter of the number of times any failure has been observed.
RequestCountFailures metrics.Counter `metric:"requests" tags:"error=true"`

// RequestLatencySuccess is a latency histogram of successful requests.
RequestLatencySuccess metrics.Timer `metric:"request_latency" tags:"error=false"`

// RequestLatencyFailures is a latency histogram of failed requests.
RequestLatencyFailures metrics.Timer `metric:"request_latency" tags:"error=true"`

// HTTPStatusCode2xx is a counter of the total number of requests with HTTP status code 200-299
HTTPStatusCode2xx metrics.Counter `metric:"http_requests" tags:"status_code=2xx"`

// HTTPStatusCode3xx is a counter of the total number of requests with HTTP status code 300-399
HTTPStatusCode3xx metrics.Counter `metric:"http_requests" tags:"status_code=3xx"`

// HTTPStatusCode4xx is a counter of the total number of requests with HTTP status code 400-499
HTTPStatusCode4xx metrics.Counter `metric:"http_requests" tags:"status_code=4xx"`

// HTTPStatusCode5xx is a counter of the total number of requests with HTTP status code 500-599
HTTPStatusCode5xx metrics.Counter `metric:"http_requests" tags:"status_code=5xx"`
}

func (m *Metrics) recordHTTPStatusCode(statusCode int64) {
if statusCode >= 200 && statusCode < 300 {
m.HTTPStatusCode2xx.Inc(1)
} else if statusCode >= 300 && statusCode < 400 {
m.HTTPStatusCode3xx.Inc(1)
} else if statusCode >= 400 && statusCode < 500 {
m.HTTPStatusCode4xx.Inc(1)
} else if statusCode >= 500 && statusCode < 600 {
m.HTTPStatusCode5xx.Inc(1)
}
}

// MetricsByEndpoint is a registry/cache of metrics for each unique endpoint name.
// Only maxNumberOfEndpoints Metrics are stored, all other endpoint names are mapped
// to a generic endpoint name "other".
type MetricsByEndpoint struct {
metricsFactory metrics.Factory
endpoints *normalizedEndpoints
metricsByEndpoint map[string]*Metrics
mux sync.RWMutex
}

func newMetricsByEndpoint(
metricsFactory metrics.Factory,
normalizer NameNormalizer,
maxNumberOfEndpoints int,
) *MetricsByEndpoint {
return &MetricsByEndpoint{
metricsFactory: metricsFactory,
endpoints: newNormalizedEndpoints(maxNumberOfEndpoints, normalizer),
metricsByEndpoint: make(map[string]*Metrics, maxNumberOfEndpoints+1), // +1 for "other"
}
}

func (m *MetricsByEndpoint) get(endpoint string) *Metrics {
safeName := m.endpoints.normalize(endpoint)
if safeName == "" {
safeName = otherEndpointsPlaceholder
}
m.mux.RLock()
met := m.metricsByEndpoint[safeName]
m.mux.RUnlock()
if met != nil {
return met
}

return m.getWithWriteLock(safeName)
}

// split to make easier to test
func (m *MetricsByEndpoint) getWithWriteLock(safeName string) *Metrics {
m.mux.Lock()
defer m.mux.Unlock()

// it is possible that the name has been already registered after we released
// the read lock and before we grabbed the write lock, so check for that.
if met, ok := m.metricsByEndpoint[safeName]; ok {
return met
}

// it would be nice to create the struct before locking, since Init() is somewhat
// expensive, however some metrics backends (e.g. expvar) may not like duplicate metrics.
met := &Metrics{}
tags := map[string]string{endpointNameMetricTag: safeName}
metrics.Init(met, m.metricsFactory, tags)

m.metricsByEndpoint[safeName] = met
return met
}
Loading

0 comments on commit 7a030fa

Please sign in to comment.