Skip to content

Commit

Permalink
feat(sourceprocessor): add debug logs for source category filler
Browse files Browse the repository at this point in the history
  • Loading branch information
andrzej-stencel committed Feb 8, 2023
1 parent 7909c61 commit ca2274b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 32 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ This release introduces the following breaking changes:

- feat(sumologicextension): use hostname as default collector name [#918]

### Added

- feat(sourceprocessor): add debug logs for source category filler [#944]

### Fixed

- fix(k8sprocessor): race condition when getting Pod data [#938]

[#918]: https://github.com/SumoLogic/sumologic-otel-collector/pull/918
[#938]: https://github.com/SumoLogic/sumologic-otel-collector/pull/938
[#944]: https://github.com/SumoLogic/sumologic-otel-collector/pull/944
[Unreleased]: https://github.com/SumoLogic/sumologic-otel-collector/compare/v0.70.0-sumo-0...main

## [v0.70.0-sumo-1]
Expand Down
6 changes: 3 additions & 3 deletions pkg/processor/sourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func createTracesProcessor(

oCfg := cfg.(*Config)

sp := newSourceProcessor(oCfg)
sp := newSourceProcessor(params, oCfg)

return processorhelper.NewTracesProcessor(
ctx,
Expand All @@ -110,7 +110,7 @@ func createMetricsProcessor(
) (processor.Metrics, error) {
oCfg := cfg.(*Config)

sp := newSourceProcessor(oCfg)
sp := newSourceProcessor(params, oCfg)
return processorhelper.NewMetricsProcessor(
ctx,
params,
Expand All @@ -130,7 +130,7 @@ func createLogsProcessor(
) (processor.Logs, error) {
oCfg := cfg.(*Config)

sp := newSourceProcessor(oCfg)
sp := newSourceProcessor(params, oCfg)
return processorhelper.NewLogsProcessor(
ctx,
params,
Expand Down
12 changes: 11 additions & 1 deletion pkg/processor/sourceprocessor/source_category_filler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
)

// sourceCategoryFiller adds source category attribute to a collection of attributes.
type sourceCategoryFiller struct {
logger *zap.Logger
valueTemplate string
templateAttributes []string
prefix string
Expand All @@ -33,12 +35,13 @@ type sourceCategoryFiller struct {
}

// newSourceCategoryFiller creates a new sourceCategoryFiller.
func newSourceCategoryFiller(cfg *Config) sourceCategoryFiller {
func newSourceCategoryFiller(cfg *Config, logger *zap.Logger) sourceCategoryFiller {

valueTemplate := cfg.SourceCategory
templateAttributes := extractTemplateAttributes(valueTemplate)

return sourceCategoryFiller{
logger: logger,
valueTemplate: valueTemplate,
templateAttributes: templateAttributes,
prefix: cfg.SourceCategoryPrefix,
Expand Down Expand Up @@ -103,16 +106,23 @@ func (f *sourceCategoryFiller) getSourceCategoryFromContainerAnnotation(attribut

containerName, found := attributes.Get("k8s.container.name")
if !found || containerName.Str() == "" {
f.logger.Debug("Couldn't fill source category from container annotation: k8s.container.name attribute not found.")
return ""
}

for _, containerAnnotationPrefix := range f.containerAnnotationsPrefixes {
annotationKey := fmt.Sprintf("%s%s.sourceCategory", containerAnnotationPrefix, containerName.Str())
annotationValue := getAnnotationAttributeValue(f.annotationPrefix, annotationKey, attributes)
if annotationValue != "" {
f.logger.Debug("Filled source category from container annotation",
zap.String("annotation", annotationKey),
zap.String("source_category", annotationValue),
zap.String("container", containerName.Str()))
return annotationValue
}
}

f.logger.Debug("Couldn't fill source category from container annotation: no matching annotation found for container.", zap.String("container", containerName.Str()))
return ""
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/processor/sourceprocessor/source_category_filler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
)

func TestNewSourceCategoryFiller(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SourceCategory = "qwerty-%{k8s.namespace.name}-%{k8s.pod.uid}"

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())

assert.Len(t, filler.templateAttributes, 2)
assert.Equal(t, "k8s.namespace.name", filler.templateAttributes[0])
Expand All @@ -40,7 +41,7 @@ func TestFill(t *testing.T) {
attrs.PutStr("k8s.namespace.name", "ns-1")
attrs.PutStr("k8s.pod.uid", "123asd")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "kubernetes/source/ns/1/123asd/cat")
Expand All @@ -56,7 +57,7 @@ func TestFillWithAnnotations(t *testing.T) {
attrs.PutStr("k8s.pod.annotation.sumologic.com/sourceCategoryPrefix", "annoPrefix:")
attrs.PutStr("k8s.pod.annotation.sumologic.com/sourceCategoryReplaceDash", "#")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "annoPrefix:sc#from#annot#ns#1#123asd")
Expand All @@ -72,7 +73,7 @@ func TestFillWithContainerAnnotations(t *testing.T) {
attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category")
attrs.PutStr("k8s.container.name", "container-name-1")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "kubernetes/my/source/category")
Expand All @@ -88,7 +89,7 @@ func TestFillWithContainerAnnotations(t *testing.T) {
attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category")
attrs.PutStr("k8s.container.name", "container-name-1")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "first_source-category")
Expand All @@ -104,7 +105,7 @@ func TestFillWithContainerAnnotations(t *testing.T) {
attrs.PutStr("k8s.pod.annotation.sumologic.com/container-name-2.sourceCategory", "another/source-category")
attrs.PutStr("k8s.container.name", "container-name-2")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "another/source-category")
Expand All @@ -125,7 +126,7 @@ func TestFillWithContainerAnnotations(t *testing.T) {
attrs.PutStr("k8s.pod.annotation.customAnno_prefix:container-name-3.sourceCategory", "THIRD_s-c!")
attrs.PutStr("k8s.container.name", "container-name-3")

filler := newSourceCategoryFiller(cfg)
filler := newSourceCategoryFiller(cfg, zap.NewNop())
filler.fill(&attrs)

assertAttribute(t, attrs, "_sourceCategory", "THIRD_s-c!")
Expand Down
8 changes: 6 additions & 2 deletions pkg/processor/sourceprocessor/source_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"

"github.com/SumoLogic/sumologic-otel-collector/pkg/processor/sourceprocessor/observability"
)
Expand All @@ -45,6 +47,7 @@ type dockerLog struct {
}

type sourceProcessor struct {
logger *zap.Logger
collector string
sourceCategoryFiller sourceCategoryFiller
sourceNameFiller attributeFiller
Expand Down Expand Up @@ -85,7 +88,7 @@ func compileRegex(regex string) *regexp.Regexp {
return re
}

func newSourceProcessor(cfg *Config) *sourceProcessor {
func newSourceProcessor(set processor.CreateSettings, cfg *Config) *sourceProcessor {
keys := sourceKeys{
annotationPrefix: cfg.AnnotationPrefix,
podKey: cfg.PodKey,
Expand All @@ -101,10 +104,11 @@ func newSourceProcessor(cfg *Config) *sourceProcessor {
}

return &sourceProcessor{
logger: set.Logger,
collector: cfg.Collector,
keys: keys,
sourceHostFiller: createSourceHostFiller(cfg),
sourceCategoryFiller: newSourceCategoryFiller(cfg),
sourceCategoryFiller: newSourceCategoryFiller(cfg, set.Logger),
sourceNameFiller: createSourceNameFiller(cfg),
exclude: exclude,
}
Expand Down
Loading

0 comments on commit ca2274b

Please sign in to comment.