Skip to content

Commit

Permalink
Support adding process tags in OTEL via env variable
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed May 6, 2020
1 parent 1b4bf08 commit 8927e8a
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 41 deletions.
20 changes: 13 additions & 7 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"
Expand All @@ -36,7 +37,6 @@ const (
httpThriftBinaryEndpoint = "localhost:14268"
udpThriftCompactEndpoint = "localhost:6831"
udpThriftBinaryEndpoint = "localhost:6832"
httpSamplingEndpoint = "localhost:5778"
)

// CollectorConfig creates default collector configuration.
Expand All @@ -56,17 +56,20 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config
recTypes = append(recTypes, string(v.Type()))
}
hc := factories.Extensions["health_check"].CreateDefaultConfig()
resProcessor := factories.Processors["resource"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Processors: configmodels.Processors{"resource": resProcessor},
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: recTypes,
Exporters: expTypes,
InputType: configmodels.TracesDataType,
Receivers: recTypes,
Processors: []string{"resource"},
Exporters: expTypes,
},
},
},
Expand Down Expand Up @@ -125,17 +128,20 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
func AgentConfig(factories config.Factories) *configmodels.Config {
jaegerExporter := factories.Exporters["jaeger"]
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
return &configmodels.Config{
Receivers: createAgentReceivers(factories),
Processors: configmodels.Processors{"resource": resProcessor},
Exporters: configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()},
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{"jaeger"},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{"jaeger"},
},
},
},
Expand Down
47 changes: 28 additions & 19 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -51,9 +52,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{elasticsearch.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{elasticsearch.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{elasticsearch.TypeStr},
},
},
},
Expand All @@ -63,9 +65,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{cassandra.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr},
},
},
},
Expand All @@ -75,9 +78,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{kafka.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{kafka.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{kafka.TypeStr},
},
},
},
Expand All @@ -87,9 +91,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
},
},
},
Expand All @@ -99,9 +104,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Exporters: []string{cassandra.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr},
},
},
},
Expand All @@ -128,6 +134,7 @@ func TestDefaultCollectorConfig(t *testing.T) {
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers))
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
assert.Equal(t, len(test.exporterTypes), len(cfg.Exporters))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])

types := []string{}
for _, v := range cfg.Exporters {
Expand All @@ -148,13 +155,15 @@ func TestDefaultAgentConfig(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{"jaeger"},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{"jaeger"},
},
},
}, cfg.Service)
assert.Equal(t, 0, len(cfg.Processors))
assert.Equal(t, 1, len(cfg.Processors))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
assert.Equal(t, 1, len(cfg.Exporters))
Expand Down
16 changes: 12 additions & 4 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/config"
otelJaegerEexporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
otelJaegerreceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
otelJaegerExporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
otelResourceProcessor "github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
otelJaegerReceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
Expand Down Expand Up @@ -61,16 +63,22 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp

jaegerRec := factories.Receivers["jaeger"].(*otelJaegerreceiver.Factory)
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
Wrapped: jaegerRec,
Viper: v,
}
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerEexporter.Factory)
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerExporter.Factory)
factories.Exporters["jaeger"] = &jaegerexporter.Factory{
Wrapped: jaegerExp,
Viper: v,
}

resourceProc := factories.Processors["resource"].(*otelResourceProcessor.Factory)
factories.Processors["resource"] = &resourceprocessor.Factory{
Wrapped: resourceProc,
Viper: v,
}
return factories
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)
Expand Down Expand Up @@ -51,4 +52,5 @@ func TestComponents(t *testing.T) {
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourceprocessor

import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
)

const (
resourceTags = "resource.tags"
resourceTagsLegacy = "jaeger.tags"
)

// Factory wraps resourceprocessor.Factory and makes the default config configurable via viper.
// For instance this enables using flags as default values in the config object.
type Factory struct {
Wrapped *resourceprocessor.Factory
Viper *viper.Viper
}

var _ component.ProcessorFactoryOld = (*Factory)(nil)

// Type returns the type of the receiver.
func (f Factory) Type() configmodels.Type {
return f.Wrapped.Type()
}

// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL component.ProcessorFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Processor {
cfg := f.Wrapped.CreateDefaultConfig().(*resourceprocessor.Config)
for k, v := range getTags(f.Viper) {
cfg.Labels[k] = v
}
return cfg
}

func getTags(v *viper.Viper) map[string]string {
tagsLegacy := flags.ParseJaegerTags(v.GetString(resourceTagsLegacy))
tags := flags.ParseJaegerTags(v.GetString(resourceTags))
for k, v := range tagsLegacy {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}
return tags
}

// CreateTraceProcessor creates resource processor.
// This function implements OTEL component.ProcessorFactoryOld interface.
func (f Factory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumerOld,
cfg configmodels.Processor,
) (component.TraceProcessorOld, error) {
return f.Wrapped.CreateTraceProcessor(logger, nextConsumer, cfg)
}

// CreateMetricsProcessor creates a resource processor.
// This function implements component.ProcessorFactoryOld.
func (f Factory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumerOld,
cfg configmodels.Processor,
) (component.MetricsProcessorOld, error) {
return f.Wrapped.CreateMetricsProcessor(logger, nextConsumer, cfg)
}

// AddFlags adds flags for Options.
func AddFlags(flags *flag.FlagSet) {
flags.String(resourceTagsLegacy, "", "(deprecated, use --resource.tags) One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
flags.String(resourceTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
}
Loading

0 comments on commit 8927e8a

Please sign in to comment.