Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for agent level tag #1396

Merged
merged 18 commits into from
Mar 18, 2019
2 changes: 1 addition & 1 deletion cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func CreateCollectorProxy(
}
switch opts.ReporterType {
case reporter.GRPC:
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
return grpc.NewCollectorProxy(grpcRepOpts, opts.AgentTags, mFactory, logger)
case reporter.TCHANNEL:
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
default:
Expand Down
35 changes: 35 additions & 0 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ package reporter
import (
"flag"
"fmt"
"os"
"strings"

"github.com/spf13/viper"
)

const (
// Whether to use grpc or tchannel reporter.
reporterType = "reporter.type"
// Agent tags
agentTags = "jaeger.tags"
// TCHANNEL is name of tchannel reporter.
TCHANNEL Type = "tchannel"
// GRPC is name of gRPC reporter.
Expand All @@ -35,15 +40,45 @@ type Type string
// Options holds generic reporter configuration.
type Options struct {
ReporterType Type
AgentTags map[string]string
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(GRPC), string(TCHANNEL)))
flags.String(agentTags, "", fmt.Sprintf(`One or more tags to be added to the Process tags of all spans passing through this agent.
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
Ex: --jaeger.tags=key1=value1,key2=${envVar:defaultValue}`))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}

// InitFromViper initializes Options with properties retrieved from Viper.
func (b *Options) InitFromViper(v *viper.Viper) *Options {
b.ReporterType = Type(v.GetString(reporterType))
b.AgentTags = parseAgentTags(v.GetString(agentTags))
return b
}

// Parsing logic borrowed from jaegertracing/jaeger-client-go
func parseAgentTags(agentTags string) map[string]string {
if agentTags == "" {
return nil
}
tagPairs := strings.Split(string(agentTags), ",")
tags := make(map[string]string)
for _, p := range tagPairs {
kv := strings.SplitN(p, "=", 2)
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])

if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
e, d := ed[0], ed[1]
v = os.Getenv(e)
if v == "" && d != "" {
v = d
}
}

tags[k] = v
}

return tags
}
30 changes: 30 additions & 0 deletions cmd/agent/app/reporter/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package reporter

import (
"flag"
"os"
"testing"

"github.com/spf13/cobra"
Expand All @@ -24,6 +25,25 @@ import (
"github.com/stretchr/testify/require"
)

func TestBindFlags_NoJaegerTags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
flags := &flag.FlagSet{}
AddFlags(flags)
command.PersistentFlags().AddGoFlagSet(flags)
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--reporter.type=grpc",
})
require.NoError(t, err)

b := &Options{}
b.InitFromViper(v)
assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Equal(t, parseAgentTags(""), b.AgentTags)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}

func TestBindFlags(t *testing.T) {
v := viper.New()
command := cobra.Command{}
Expand All @@ -34,10 +54,20 @@ func TestBindFlags(t *testing.T) {

err := command.ParseFlags([]string{
"--reporter.type=grpc",
"--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}",
})
require.NoError(t, err)

b := &Options{}
os.Setenv("envKey1", "envVal1")
b.InitFromViper(v)

agentTags := make(map[string]string)
agentTags["key"] = "value"
agentTags["envVar1"] = "envVal1"
agentTags["envVar2"] = "defaultVal2"
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

assert.Equal(t, Type("grpc"), b.ReporterType)
assert.Equal(t, agentTags, b.AgentTags)
os.Unsetenv("envKey1")
}
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProxyBuilder struct {
var systemCertPool = x509.SystemCertPool // to allow overriding in unit test

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
if len(o.CollectorHostPort) == 0 {
return nil, errors.New("could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
return &ProxyBuilder{
conn: conn,
reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics),
reporter: aReporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics),
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics)}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25
-----END CERTIFICATE-----`

func TestProxyBuilderMissingAddress(t *testing.T) {
proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{}, nil, metrics.NullFactory, zap.NewNop())
require.Nil(t, proxy)
assert.EqualError(t, err, "could not create collector proxy, address is missing")
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestProxyBuilder(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(test.proxyOptions, nil, metrics.NullFactory, zap.NewNop())
if test.expectError {
require.Error(t, err)
} else {
Expand All @@ -125,7 +125,7 @@ func TestSystemCertPoolError(t *testing.T) {
_, err := NewCollectorProxy(&Options{
CollectorHostPort: []string{"foo", "bar"},
TLS: true,
}, nil, nil)
}, nil, nil, nil)
assert.Equal(t, fakeErr, err)
}

Expand All @@ -142,7 +142,7 @@ func TestMultipleCollectors(t *testing.T) {
defer s2.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, mFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
Expand Down
29 changes: 28 additions & 1 deletion cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (
// Reporter reports data to collector over gRPC.
type Reporter struct {
collector api_v2.CollectorServiceClient
agentTags []model.KeyValue
logger *zap.Logger
sanitizer zipkin2.Sanitizer
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter {
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
}
Expand All @@ -63,6 +65,7 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
}

func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
spans, process = addProcessTags(spans, process, r.agentTags)
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
_, err := r.collector.PostSpans(context.Background(), req)
Expand All @@ -71,3 +74,27 @@ func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
}
return err
}

// addTags appends jaeger tags for the agent to every span it sends to the collector.
func addProcessTags(spans []*model.Span, process *model.Process, agentTags []model.KeyValue) ([]*model.Span, *model.Process) {
if process != nil {
process.Tags = append(process.Tags, agentTags...)
return spans, process
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}
for _, span := range spans {
if len(agentTags) > 0 {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
span.Process.Tags = append(span.Process.Tags, agentTags...)
}
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return spans, nil
}

func makeModelKeyValue(agentTags map[string]string) []model.KeyValue {
tags := []model.KeyValue{}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range agentTags {
tag := model.String(k, v)
tags = append(tags, tag)
}

return tags
}
92 changes: 88 additions & 4 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())

rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
a := tm.Unix() * 1000 * 1000
Expand All @@ -71,7 +72,45 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
{in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
expected: model.Batch{
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1,
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}},
Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in})
if test.err != "" {
assert.EqualError(t, err, test.err)
} else {
assert.Equal(t, 1, len(handler.requests))
assert.Equal(t, test.expected, handler.requests[0].GetBatch())
}
}
}

func TestReporter_EmitZipkinBatch_WithAgentTags(t *testing.T) {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
handler := &mockSpanHandler{}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)

agentTags := make(map[string]string)
agentTags["hello"] = "world"
rep := NewReporter(conn, agentTags, zap.NewNop())

tm := time.Unix(158, 0)
a := tm.Unix() * 1000 * 1000
tests := []struct {
in *zipkincore.Span
expected model.Batch
err string
}{
{in: &zipkincore.Span{TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
expected: model.Batch{
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), Duration: time.Microsecond * 1,
Process: &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("hello", "world")}}, StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in})
Expand All @@ -93,7 +132,7 @@ func TestReporter_EmitBatch(t *testing.T) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
tests := []struct {
Expand All @@ -115,10 +154,55 @@ func TestReporter_EmitBatch(t *testing.T) {
}
}

func TestReporter_EmitBatch_WithAgentTags(t *testing.T) {
handler := &mockSpanHandler{}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
defer conn.Close()
require.NoError(t, err)

agentTags := make(map[string]string)
agentTags["hello"] = "world"
rep := NewReporter(conn, agentTags, zap.NewNop())

tm := time.Unix(158, 0)
tests := []struct {
in *jThrift.Batch
expected model.Batch
err string
}{
{in: &jThrift.Batch{Process: &jThrift.Process{ServiceName: "node"}, Spans: []*jThrift.Span{{OperationName: "foo", StartTime: int64(model.TimeAsEpochMicroseconds(tm))}}},
expected: model.Batch{Process: &model.Process{ServiceName: "node", Tags: []model.KeyValue{model.String("hello", "world")}},
Spans: []*model.Span{{OperationName: "foo", StartTime: tm.UTC()}}}},
}
for _, test := range tests {
err = rep.EmitBatch(test.in)
if test.err != "" {
assert.EqualError(t, err, test.err)
} else {
assert.Equal(t, 1, len(handler.requests))
assert.Equal(t, test.expected, handler.requests[0].GetBatch())
}
}
}

func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("", grpc.WithInsecure())
require.NoError(t, err)
rep := NewReporter(conn, zap.NewNop())
rep := NewReporter(conn, nil, zap.NewNop())
err = rep.send(nil, nil)
assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"")
}

func TestReporter_MakeModelKeyValue(t *testing.T) {
stringTags := make(map[string]string)
stringTags["hello"] = "world"
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
jaegerTags := makeModelKeyValue(stringTags)
expected := []model.KeyValue{model.String("hello", "world")}

assert.Equal(t, 1, len(jaegerTags))
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, expected, jaegerTags)
}
3 changes: 1 addition & 2 deletions cmd/agent/app/reporter/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestMetricsReporter(t *testing.T) {
err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}})
require.Error(t, err)
}, rep: &noopReporter{err: errors.New("foo")}},
{expectedCounters:
[]metricstest.ExpectedMetric{
{expectedCounters: []metricstest.ExpectedMetric{
{Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1},
{Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2},
}, expectedGauges: []metricstest.ExpectedMetric{
Expand Down