diff --git a/CHANGELOG.md b/CHANGELOG.md index 439cc42625..1af8310729 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,12 +11,17 @@ This release introduces the following breaking changes: - feat(sumologicextension): use hostname as default collector name [#918] +### Added + +- feat(sourceprocessor): add debug logs for source category filler [#944] + ### Fixed - fix(k8sprocessor): race condition when getting Pod data [#938] [#918]: https://github.com/SumoLogic/sumologic-otel-collector/pull/918 [#938]: https://github.com/SumoLogic/sumologic-otel-collector/pull/938 +[#944]: https://github.com/SumoLogic/sumologic-otel-collector/pull/944 [Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.70.0-sumo-0...main ## [v0.70.0-sumo-1] diff --git a/pkg/processor/sourceprocessor/factory.go b/pkg/processor/sourceprocessor/factory.go index 45e35aad4b..ea62e45dc0 100644 --- a/pkg/processor/sourceprocessor/factory.go +++ b/pkg/processor/sourceprocessor/factory.go @@ -89,7 +89,7 @@ func createTracesProcessor( oCfg := cfg.(*Config) - sp := newSourceProcessor(oCfg) + sp := newSourceProcessor(params, oCfg) return processorhelper.NewTracesProcessor( ctx, @@ -110,7 +110,7 @@ func createMetricsProcessor( ) (processor.Metrics, error) { oCfg := cfg.(*Config) - sp := newSourceProcessor(oCfg) + sp := newSourceProcessor(params, oCfg) return processorhelper.NewMetricsProcessor( ctx, params, @@ -130,7 +130,7 @@ func createLogsProcessor( ) (processor.Logs, error) { oCfg := cfg.(*Config) - sp := newSourceProcessor(oCfg) + sp := newSourceProcessor(params, oCfg) return processorhelper.NewLogsProcessor( ctx, params, diff --git a/pkg/processor/sourceprocessor/source_category_filler.go b/pkg/processor/sourceprocessor/source_category_filler.go index 108954f85f..22d6ed16c5 100644 --- a/pkg/processor/sourceprocessor/source_category_filler.go +++ b/pkg/processor/sourceprocessor/source_category_filler.go @@ -19,10 +19,12 @@ import ( "strings" "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/zap" ) // sourceCategoryFiller adds source category attribute to a collection of attributes. type sourceCategoryFiller struct { + logger *zap.Logger valueTemplate string templateAttributes []string prefix string @@ -33,12 +35,13 @@ type sourceCategoryFiller struct { } // newSourceCategoryFiller creates a new sourceCategoryFiller. -func newSourceCategoryFiller(cfg *Config) sourceCategoryFiller { +func newSourceCategoryFiller(cfg *Config, logger *zap.Logger) sourceCategoryFiller { valueTemplate := cfg.SourceCategory templateAttributes := extractTemplateAttributes(valueTemplate) return sourceCategoryFiller{ + logger: logger, valueTemplate: valueTemplate, templateAttributes: templateAttributes, prefix: cfg.SourceCategoryPrefix, @@ -103,6 +106,7 @@ func (f *sourceCategoryFiller) getSourceCategoryFromContainerAnnotation(attribut containerName, found := attributes.Get("k8s.container.name") if !found || containerName.Str() == "" { + f.logger.Debug("Couldn't fill source category from container annotation: k8s.container.name attribute not found.") return "" } @@ -110,9 +114,15 @@ func (f *sourceCategoryFiller) getSourceCategoryFromContainerAnnotation(attribut annotationKey := fmt.Sprintf("%s%s.sourceCategory", containerAnnotationPrefix, containerName.Str()) annotationValue := getAnnotationAttributeValue(f.annotationPrefix, annotationKey, attributes) if annotationValue != "" { + f.logger.Debug("Filled source category from container annotation", + zap.String("annotation", annotationKey), + zap.String("source_category", annotationValue), + zap.String("container", containerName.Str())) return annotationValue } } + + f.logger.Debug("Couldn't fill source category from container annotation: no matching annotation found for container.", zap.String("container", containerName.Str())) return "" } diff --git a/pkg/processor/sourceprocessor/source_category_filler_test.go b/pkg/processor/sourceprocessor/source_category_filler_test.go index 850d78a974..7f789c5648 100644 --- a/pkg/processor/sourceprocessor/source_category_filler_test.go +++ b/pkg/processor/sourceprocessor/source_category_filler_test.go @@ -19,13 +19,14 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/zap" ) func TestNewSourceCategoryFiller(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SourceCategory = "qwerty-%{k8s.namespace.name}-%{k8s.pod.uid}" - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) assert.Len(t, filler.templateAttributes, 2) assert.Equal(t, "k8s.namespace.name", filler.templateAttributes[0]) @@ -40,7 +41,7 @@ func TestFill(t *testing.T) { attrs.PutStr("k8s.namespace.name", "ns-1") attrs.PutStr("k8s.pod.uid", "123asd") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "kubernetes/source/ns/1/123asd/cat") @@ -56,7 +57,7 @@ func TestFillWithAnnotations(t *testing.T) { attrs.PutStr("k8s.pod.annotation.sumologic.com/sourceCategoryPrefix", "annoPrefix:") attrs.PutStr("k8s.pod.annotation.sumologic.com/sourceCategoryReplaceDash", "#") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "annoPrefix:sc#from#annot#ns#1#123asd") @@ -72,7 +73,7 @@ func TestFillWithContainerAnnotations(t *testing.T) { attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category") attrs.PutStr("k8s.container.name", "container-name-1") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "kubernetes/my/source/category") @@ -88,7 +89,7 @@ func TestFillWithContainerAnnotations(t *testing.T) { attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category") attrs.PutStr("k8s.container.name", "container-name-1") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "first_source-category") @@ -104,7 +105,7 @@ func TestFillWithContainerAnnotations(t *testing.T) { attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category") attrs.PutStr("k8s.container.name", "container-name-2") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "another/source-category") @@ -125,7 +126,7 @@ func TestFillWithContainerAnnotations(t *testing.T) { attrs.PutStr("k8s.pod.annotation.customAnno_prefix:container-name-3.sourceCategory", "THIRD_s-c!") attrs.PutStr("k8s.container.name", "container-name-3") - filler := newSourceCategoryFiller(cfg) + filler := newSourceCategoryFiller(cfg, zap.NewNop()) filler.fill(&attrs) assertAttribute(t, attrs, "_sourceCategory", "THIRD_s-c!") diff --git a/pkg/processor/sourceprocessor/source_processor.go b/pkg/processor/sourceprocessor/source_processor.go index 60fb864236..4db990ba62 100644 --- a/pkg/processor/sourceprocessor/source_processor.go +++ b/pkg/processor/sourceprocessor/source_processor.go @@ -26,6 +26,8 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/sourceprocessor/observability" ) @@ -45,6 +47,7 @@ type dockerLog struct { } type sourceProcessor struct { + logger *zap.Logger collector string sourceCategoryFiller sourceCategoryFiller sourceNameFiller attributeFiller @@ -85,7 +88,7 @@ func compileRegex(regex string) *regexp.Regexp { return re } -func newSourceProcessor(cfg *Config) *sourceProcessor { +func newSourceProcessor(set processor.CreateSettings, cfg *Config) *sourceProcessor { keys := sourceKeys{ annotationPrefix: cfg.AnnotationPrefix, podKey: cfg.PodKey, @@ -101,10 +104,11 @@ func newSourceProcessor(cfg *Config) *sourceProcessor { } return &sourceProcessor{ + logger: set.Logger, collector: cfg.Collector, keys: keys, sourceHostFiller: createSourceHostFiller(cfg), - sourceCategoryFiller: newSourceCategoryFiller(cfg), + sourceCategoryFiller: newSourceCategoryFiller(cfg, set.Logger), sourceNameFiller: createSourceNameFiller(cfg), exclude: exclude, } diff --git a/pkg/processor/sourceprocessor/source_processor_test.go b/pkg/processor/sourceprocessor/source_processor_test.go index d82e53ba05..270822e651 100644 --- a/pkg/processor/sourceprocessor/source_processor_test.go +++ b/pkg/processor/sourceprocessor/source_processor_test.go @@ -20,9 +20,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" ) @@ -176,7 +179,7 @@ func TestLogsSourceHostKey(t *testing.T) { pLogs := newLogsDataWithLogs(resourceAttrs, logAttrs) - sp := newSourceProcessor(config) + sp := newSourceProcessor(newProcessorCreateSettings(), config) out, err := sp.ProcessLogs(context.Background(), pLogs) require.NoError(t, err) @@ -208,7 +211,7 @@ func TestLogsSourceHostKey(t *testing.T) { pLogs := newLogsDataWithLogs(resourceAttrs, logAttrs) - sp := newSourceProcessor(config) + sp := newSourceProcessor(newProcessorCreateSettings(), config) out, err := sp.ProcessLogs(context.Background(), pLogs) require.NoError(t, err) @@ -229,7 +232,7 @@ func TestTraceSourceProcessor(t *testing.T) { want := newTraceData(mergedK8sLabels) test := newTraceData(k8sLabels) - rtp := newSourceProcessor(cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), cfg) td, err := rtp.ProcessTraces(context.Background(), test) assert.NoError(t, err) @@ -241,7 +244,7 @@ func TestTraceSourceProcessorEmpty(t *testing.T) { want := newTraceData(limitedLabelsWithMeta) test := newTraceData(limitedLabels) - rtp := newSourceProcessor(cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), cfg) td, err := rtp.ProcessTraces(context.Background(), test) assert.NoError(t, err) @@ -317,7 +320,7 @@ func TestTraceSourceFilteringOutByRegex(t *testing.T) { t.Run(tc.name, func(t *testing.T) { test := newTraceDataWithSpans(mergedK8sLabels, k8sLabels) - rtp := newSourceProcessor(tc.cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), tc.cfg) td, err := rtp.ProcessTraces(context.Background(), test) assert.NoError(t, err) @@ -336,7 +339,7 @@ func TestTraceSourceFilteringOutByExclude(t *testing.T) { want.ResourceSpans().At(0).ScopeSpans(). RemoveIf(func(ptrace.ScopeSpans) bool { return true }) - rtp := newSourceProcessor(cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), cfg) td, err := rtp.ProcessTraces(context.Background(), test) assert.NoError(t, err) @@ -355,7 +358,7 @@ func TestTraceSourceIncludePrecedence(t *testing.T) { cfg1.Exclude = map[string]string{ "pod": ".*", } - rtp := newSourceProcessor(cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), cfg) td, err := rtp.ProcessTraces(context.Background(), test) assert.NoError(t, err) @@ -368,7 +371,7 @@ func TestSourceHostAnnotation(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceHost"] = "sh:%{k8s.pod.uid}" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -380,7 +383,7 @@ func TestSourceNameAnnotation(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceName"] = "sn:%{k8s.pod.name}" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -393,7 +396,7 @@ func TestSourceCategoryAnnotations(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceCategory"] = "sc-%{k8s.namespace.name}" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -405,7 +408,7 @@ func TestSourceCategoryAnnotations(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceCategoryPrefix"] = "annot>" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -417,7 +420,7 @@ func TestSourceCategoryAnnotations(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceCategoryReplaceDash"] = "^" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -431,7 +434,7 @@ func TestSourceCategoryAnnotations(t *testing.T) { inputAttributes["pod_annotation_sumologic.com/sourceCategoryReplaceDash"] = "^" inputTraces := newTraceData(inputAttributes) - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -444,7 +447,7 @@ func TestSourceCategoryAnnotations(t *testing.T) { inputTraces := newTraceData(inputAttributes) cfg.ContainerAnnotations.Enabled = true - processedTraces, err := newSourceProcessor(cfg).ProcessTraces(context.Background(), inputTraces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), cfg).ProcessTraces(context.Background(), inputTraces) assert.NoError(t, err) processedAttributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -461,7 +464,7 @@ func TestSourceCategoryTemplateWithCustomAttribute(t *testing.T) { config := createDefaultConfig().(*Config) config.SourceCategory = "abc/%{someattr}/123" - processedTraces, err := newSourceProcessor(config).ProcessTraces(context.Background(), traces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), config).ProcessTraces(context.Background(), traces) assert.NoError(t, err) attributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -476,7 +479,7 @@ func TestSourceCategoryTemplateWithCustomAttribute(t *testing.T) { config := createDefaultConfig().(*Config) config.SourceCategory = "abc/%{some.attr}/123" - processedTraces, err := newSourceProcessor(config).ProcessTraces(context.Background(), traces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), config).ProcessTraces(context.Background(), traces) assert.NoError(t, err) attributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -490,7 +493,7 @@ func TestSourceCategoryTemplateWithCustomAttribute(t *testing.T) { config := createDefaultConfig().(*Config) config.SourceCategory = "abc/%{nonexistent.attr}/123" - processedTraces, err := newSourceProcessor(config).ProcessTraces(context.Background(), traces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), config).ProcessTraces(context.Background(), traces) assert.NoError(t, err) attributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -505,7 +508,7 @@ func TestSourceCategoryTemplateWithCustomAttribute(t *testing.T) { config.SourceCategory = "abc/%{_collector}/123" config.Collector = "my-collector" - processedTraces, err := newSourceProcessor(config).ProcessTraces(context.Background(), traces) + processedTraces, err := newSourceProcessor(newProcessorCreateSettings(), config).ProcessTraces(context.Background(), traces) assert.NoError(t, err) attributes := processedTraces.ResourceSpans().At(0).Resource().Attributes() @@ -590,7 +593,7 @@ func TestLogProcessorJson(t *testing.T) { Body(). SetStr(tc.body) - rtp := newSourceProcessor(cfg) + rtp := newSourceProcessor(newProcessorCreateSettings(), cfg) td, err := rtp.ProcessLogs(context.Background(), inputLog) assert.NoError(t, err) @@ -615,3 +618,11 @@ func TestLogProcessorJson(t *testing.T) { }) } } + +func newProcessorCreateSettings() processor.CreateSettings { + return processor.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + } +}