Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sampler type tag to collector ingestion metrics #1576

Merged
merged 9 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 140 additions & 17 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package app

import (
"strings"
"sync"

"github.com/uber/jaeger-lib/metrics"
Expand All @@ -28,6 +29,25 @@ const (

// otherServices is the catch-all label when number of services exceeds maxServiceNames
otherServices = "other-services"

samplerTypeKey = "sampler_type"
samplerTypeConst = "const"
samplerTypeRemote = "remote"
guanw marked this conversation as resolved.
Show resolved Hide resolved
samplerTypeProbabilistic = "probabilistic"
samplerTypeRateLimiting = "ratelimiting"
samplerTypeLowerBound = "lowerbound"
samplerTypeUnknown = "unknown"
// types of samplers: const, remote, probabilistic, ratelimiting, lowerbound
numOfSamplerTypes = 5

concatenation = "$_$"

otherServicesConstSampler = otherServices + concatenation + samplerTypeConst
otherServicesRemoteSampler = otherServices + concatenation + samplerTypeRemote
otherServicesProbabilisticSampler = otherServices + concatenation + samplerTypeProbabilistic
otherServicesRateLimitingSampler = otherServices + concatenation + samplerTypeRateLimiting
otherServicesLowerBoundSampler = otherServices + concatenation + samplerTypeLowerBound
otherServicesUnknownSampler = otherServices + concatenation + samplerTypeUnknown
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand All @@ -50,7 +70,7 @@ type SpanProcessorMetrics struct {
spanCounts SpanCountsByFormat
}

type countsBySvc struct {
guanw marked this conversation as resolved.
Show resolved Hide resolved
type spanCountsBySvc struct {
counts map[string]metrics.Counter // counters per service
debugCounts map[string]metrics.Counter // debug counters per service
factory metrics.Factory
Expand All @@ -59,9 +79,19 @@ type countsBySvc struct {
category string
}

type traceCountsBySvc struct {
counts map[string]metrics.Counter // counters per service
debugCounts map[string]metrics.Counter // debug counters per service
factory metrics.Factory
lock *sync.Mutex
maxServices int // servicesNames * samplerTypes
category string
stringBuilderPool *sync.Pool
}

type metricsBySvc struct {
spans countsBySvc // number of spans received per service
traces countsBySvc // number of traces originated per service
spans spanCountsBySvc // number of spans received per service
traces traceCountsBySvc // number of traces originated per service
}

// InboundTransport identifies the transport used to receive spans.
Expand Down Expand Up @@ -136,19 +166,43 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc {
spansFactory := factory.Namespace(metrics.NSOptions{Name: "spans", Tags: nil})
tracesFactory := factory.Namespace(metrics.NSOptions{Name: "traces", Tags: nil})
return metricsBySvc{
spans: newCountsBySvc(spansFactory, category, maxServiceNames),
traces: newCountsBySvc(tracesFactory, category, maxServiceNames),
spans: newSpanCountsBySvc(spansFactory, category, maxServiceNames),
traces: newTraceCountsBySvc(tracesFactory, category, maxServiceNames),
}
}

func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc {
return countsBySvc{
counts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}),
},
debugCounts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}),
func newTraceCountsBySvc(factory metrics.Factory, category string, maxServices int) traceCountsBySvc {
return traceCountsBySvc{
counts: newTraceCountsOtherServices(factory, category, "false"),
debugCounts: newTraceCountsOtherServices(factory, category, "true"),
factory: factory,
lock: &sync.Mutex{},
maxServices: maxServices + numOfSamplerTypes, // numOfSamplerType is the offset added to maxServices threshold
guanw marked this conversation as resolved.
Show resolved Hide resolved
category: category,
// use sync.Pool to reduce allocation of stringBuilder
stringBuilderPool: &sync.Pool{
New: func() interface{} {
return new(strings.Builder)
},
},
}
}

func newTraceCountsOtherServices(factory metrics.Factory, category string, isDebug string) map[string]metrics.Counter {
return map[string]metrics.Counter{
otherServicesConstSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeConst}}),
otherServicesLowerBoundSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeLowerBound}}),
otherServicesProbabilisticSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeProbabilistic}}),
otherServicesRateLimitingSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeRateLimiting}}),
otherServicesRemoteSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeRemote}}),
otherServicesUnknownSampler: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": isDebug, samplerTypeKey: samplerTypeUnknown}}),
}
}

func newSpanCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) spanCountsBySvc {
return spanCountsBySvc{
counts: map[string]metrics.Counter{otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}})},
debugCounts: map[string]metrics.Counter{otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}})},
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServiceNames,
Expand Down Expand Up @@ -196,7 +250,9 @@ func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
}
m.countSpansByServiceName(serviceName, span.Flags.IsDebug())
if span.ParentSpanID() == 0 {
m.countTracesByServiceName(serviceName, span.Flags.IsDebug())

m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), span.
GetSamplerType())
}
}

Expand All @@ -207,11 +263,11 @@ func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool)

// countTracesByServiceName counts how many traces are received per service,
// i.e. the counter is only incremented for the root spans.
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) {
m.traces.countByServiceName(serviceName, isDebug)
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, samplerType string) {
m.traces.countByServiceName(serviceName, isDebug, samplerType)
}

// countByServiceName maintains a map of counters for each service name it's
// traceCountsBySvc.countByServiceName maintains a map of counters for each service name it's
// given and increments the respective counter when called. The service name
// are first normalized to safe-for-metrics format. If the number of counters
// exceeds maxServiceNames, new service names are ignored to avoid polluting
Expand All @@ -221,14 +277,70 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool)
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
func (m *traceCountsBySvc) countByServiceName(serviceName string, isDebug bool, samplerType string) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
counts = m.debugCounts
}
var counter metrics.Counter
m.lock.Lock()

// trace counter key is combination of serviceName and samplerType.
key := m.buildKey(serviceName, samplerType)

if c, ok := counts[key]; ok {
counter = c
} else if len(counts) < m.maxServices {
debugStr := "false"
if isDebug {
debugStr = "true"
}
// Only trace metrics have samplerType tag
tags := map[string]string{"svc": serviceName, "debug": debugStr, samplerTypeKey: samplerType}

c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[key] = c
counter = c
} else {
switch samplerType {
case samplerTypeConst:
counter = counts[otherServicesConstSampler]
case samplerTypeRemote:
counter = counts[otherServicesRemoteSampler]
case samplerTypeLowerBound:
counter = counts[otherServicesLowerBoundSampler]
case samplerTypeProbabilistic:
counter = counts[otherServicesProbabilisticSampler]
case samplerTypeRateLimiting:
counter = counts[otherServicesRateLimitingSampler]
default:
counter = counts[otherServicesUnknownSampler]
}
}
m.lock.Unlock()
counter.Inc(1)
}

// spanCountsBySvc.countByServiceName maintains a map of counters for each service name it's
// given and increments the respective counter when called. The service name
// are first normalized to safe-for-metrics format. If the number of counters
// exceeds maxServiceNames, new service names are ignored to avoid polluting
// the metrics namespace and overloading M3.
//
// The reportServiceNameCount() function runs on a timer and will report the
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *spanCountsBySvc) countByServiceName(serviceName string, isDebug bool) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
counts = m.debugCounts
}
var counter metrics.Counter
m.lock.Lock()

if c, ok := counts[serviceName]; ok {
counter = c
} else if len(counts) < m.maxServiceNames {
Expand All @@ -246,3 +358,14 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
m.lock.Unlock()
counter.Inc(1)
}

func (m *traceCountsBySvc) buildKey(serviceName, samplerType string) string {
keyBuilder := m.stringBuilderPool.Get().(*strings.Builder)
keyBuilder.Reset()
keyBuilder.WriteString(serviceName)
keyBuilder.WriteString(concatenation)
keyBuilder.WriteString(samplerType)
key := keyBuilder.String()
m.stringBuilderPool.Put(keyBuilder)
return key
}
50 changes: 46 additions & 4 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,48 @@ func TestProcessorMetrics(t *testing.T) {

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unknown|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|sampler_type=unknown|svc=fry|transport=tchannel"])
assert.Empty(t, gauges)
}

func TestNewCountsBySvc(t *testing.T) {
func TestNewTraceCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3)
metrics := newTraceCountsBySvc(baseMetrics, "not_on_my_level", 3)

metrics.countByServiceName("fry", false, "unknown")
metrics.countByServiceName("leela", false, "unknown")
metrics.countByServiceName("bender", false, "unknown")
metrics.countByServiceName("zoidberg", false, "unknown")

counters, _ := baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=fry"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=leela"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|sampler_type=unknown|svc=other-services"])

metrics.countByServiceName("zoidberg", true, "remote")
metrics.countByServiceName("bender", true, "const")
metrics.countByServiceName("leela", true, "probabilistic")
metrics.countByServiceName("fry", true, "ratelimiting")
metrics.countByServiceName("leela", true, "remote")
metrics.countByServiceName("fry", true, "const")
metrics.countByServiceName("elzar", true, "lowerbound")
metrics.countByServiceName("url", true, "unknown")

counters, _ = baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=remote|svc=zoidberg"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=const|svc=bender"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=probabilistic|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=ratelimiting|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=remote|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=const|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=lowerbound|svc=other-services"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|sampler_type=unknown|svc=other-services"])
}

func TestNewSpanCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newSpanCountsBySvc(baseMetrics, "not_on_my_level", 3)
metrics.countByServiceName("fry", false)
metrics.countByServiceName("leela", false)
metrics.countByServiceName("bender", false)
Expand All @@ -83,3 +116,12 @@ func TestNewCountsBySvc(t *testing.T) {
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"])
}

func TestBuildKey(t *testing.T) {
// This test checks if stringBuilder is reset every time buildKey is called.
tc := newTraceCountsBySvc(jaegerM.NullFactory, "received", 100)
key := tc.buildKey("sample-service", "unknown")
assert.Equal(t, "sample-service$_$unknown", key)
key = tc.buildKey("sample-service2", "const")
assert.Equal(t, "sample-service2$_$const", key)
}
4 changes: 2 additions & 2 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ func TestBySvcMetrics(t *testing.T) {
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|sampler_type=unknown|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|sampler_type=unknown|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
}
Expand Down
15 changes: 15 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
SampledFlag = Flags(1)
// DebugFlag is the bit set in Flags in order to define a span as a debug span
DebugFlag = Flags(2)

samplerType = "sampler.type"
samplerTypeUnknown = "unknown"
)

// Flags is a bit map of flags for a span
Expand All @@ -47,6 +50,18 @@ func (s *Span) HasSpanKind(kind ext.SpanKindEnum) bool {
return false
}

// GetSamplerType returns the sampler type for span
func (s *Span) GetSamplerType() string {
// There's no corresponding opentracing-go tag label corresponding to sampler.type
if tag, ok := KeyValues(s.Tags).FindByKey(samplerType); ok {
if tag.VStr == "" {
return samplerTypeUnknown
guanw marked this conversation as resolved.
Show resolved Hide resolved
}
return tag.VStr
guanw marked this conversation as resolved.
Show resolved Hide resolved
}
return samplerTypeUnknown
}

// IsRPCClient returns true if the span represents a client side of an RPC,
// as indicated by the `span.kind` tag set to `client`.
func (s *Span) IsRPCClient() bool {
Expand Down
9 changes: 9 additions & 0 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ func TestIsDebug(t *testing.T) {
assert.True(t, flags.IsDebug())
}

func TestSamplerType(t *testing.T) {
span := makeSpan(model.String("sampler.type", "lowerbound"))
assert.Equal(t, "lowerbound", span.GetSamplerType())
span = makeSpan(model.String("sampler.type", ""))
assert.Equal(t, "unknown", span.GetSamplerType())
span = makeSpan(model.KeyValue{})
assert.Equal(t, "unknown", span.GetSamplerType())
}

func TestIsSampled(t *testing.T) {
flags := model.Flags(0)
flags.SetSampled()
Expand Down