From 81e3ba89436d95a363e6e8946e1812abc7aaae16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Sat, 26 Jun 2021 15:14:40 +0200 Subject: [PATCH] ci: add more linters and fix existing errors --- .golangci.yaml | 39 ++++++++++++++++ pkg/Makefile.Common | 6 +-- pkg/extension/sumologicextension/extension.go | 15 +++++-- .../cascadingfilterprocessor/factory.go | 11 +++-- .../cascadingfilterprocessor/processor.go | 45 +++++++++++++++---- .../processor_test.go | 12 +++-- .../sampling/numeric_tag_filter_test.go | 4 +- .../sampling/span_properties_filter_test.go | 18 +++++--- .../sourceprocessor/source_processor.go | 18 ++++++-- .../sumologicsyslogprocessor/processor.go | 6 ++- 10 files changed, 137 insertions(+), 37 deletions(-) create mode 100644 .golangci.yaml diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000000..32574c5cb9 --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,39 @@ +run: + timeout: 3m + allow-parallel-runners: true + +linters-settings: + errcheck: + check-type-assertions: false + check-blank: true + + gosimple: + go: "1.16" + + maligned: + # print struct with more effective memory layout or not, false by default + suggest-new: true + + unused: + go: "1.16" + +linters: + disable-all: false + enable: + - gofmt + - deadcode + - unused + - errcheck + - goimports + - misspell + - noctx + disable: + - maligned + - prealloc + fast: false + +issues: + # Maximum issues count per one linter. Set to 0 to disable. Default is 50. + max-issues-per-linter: 0 + # Maximum count of issues with the same text. Set to 0 to disable. Default is 3. + max-same-issues: 0 diff --git a/pkg/Makefile.Common b/pkg/Makefile.Common index 0a2b2946e1..8ae66ec105 100644 --- a/pkg/Makefile.Common +++ b/pkg/Makefile.Common @@ -12,8 +12,4 @@ fmt: .PHONY: lint lint: - $(LINT) run \ - --enable deadcode,unused,errcheck,goimports \ - --allow-parallel-runners \ - --max-same-issues 10 \ - --timeout 3m + $(LINT) run diff --git a/pkg/extension/sumologicextension/extension.go b/pkg/extension/sumologicextension/extension.go index 4e4e2b1ad5..c6e3ab0881 100644 --- a/pkg/extension/sumologicextension/extension.go +++ b/pkg/extension/sumologicextension/extension.go @@ -192,6 +192,15 @@ func (se *SumologicExtension) heartbeatLoop() { return } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + // When the close channel is closed ... + <-se.closeChan + // ... cancel the ongoing heartbeat request. + cancel() + }() + se.logger.Info("Heartbeat heartbeat API initialized. Starting sending hearbeat requests") for { select { @@ -199,7 +208,7 @@ func (se *SumologicExtension) heartbeatLoop() { se.logger.Info("Heartbeat sender turn off") return default: - err := se.sendHeartbeat() + err := se.sendHeartbeat(ctx) if err != nil { se.logger.Error("Heartbeat error", zap.Error(err)) } @@ -212,12 +221,12 @@ func (se *SumologicExtension) heartbeatLoop() { } } -func (se *SumologicExtension) sendHeartbeat() error { +func (se *SumologicExtension) sendHeartbeat(ctx context.Context) error { u, err := url.Parse(se.baseUrl + heartbeatUrl) if err != nil { return fmt.Errorf("unable to parse heartbeat URL %w", err) } - req, err := http.NewRequest(http.MethodPost, u.String(), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) if err != nil { return fmt.Errorf("unable to create HTTP request %w", err) } diff --git a/pkg/processor/cascadingfilterprocessor/factory.go b/pkg/processor/cascadingfilterprocessor/factory.go index dd401eb634..2736af74ec 100644 --- a/pkg/processor/cascadingfilterprocessor/factory.go +++ b/pkg/processor/cascadingfilterprocessor/factory.go @@ -37,11 +37,16 @@ var ( defaultProbabilisticFilteringRatio = float32(0.2) ) +func init() { + // TODO: this is hardcoding the metrics level + err := view.Register(CascadingFilterMetricViews(configtelemetry.LevelNormal)...) + if err != nil { + panic("failed to register cascadingfilterprocessor: " + err.Error()) + } +} + // NewFactory returns a new factory for the Cascading Filter processor. func NewFactory() component.ProcessorFactory { - // TODO: this is hardcoding the metrics level and skips error handling - _ = view.Register(CascadingFilterMetricViews(configtelemetry.LevelNormal)...) - return processorhelper.NewFactory( typeStr, createDefaultConfig, diff --git a/pkg/processor/cascadingfilterprocessor/processor.go b/pkg/processor/cascadingfilterprocessor/processor.go index 175395f069..d8549524aa 100644 --- a/pkg/processor/cascadingfilterprocessor/processor.go +++ b/pkg/processor/cascadingfilterprocessor/processor.go @@ -211,27 +211,36 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() { if trace.SelectedByProbabilisticFilter { selectedByProbabilisticFilterSpans += trace.SpanCount } - _ = stats.RecordWithTags( + err := stats.RecordWithTags( cfsp.ctx, []tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSampled)}, statCascadingFilterDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err)) + } } else { - _ = stats.RecordWithTags( + err := stats.RecordWithTags( cfsp.ctx, []tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusExceededKey)}, statCascadingFilterDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err)) + } } } else if provisionalDecision == sampling.SecondChance { trace.FinalDecision = sampling.SecondChance } else { trace.FinalDecision = provisionalDecision - _ = stats.RecordWithTags( + err := stats.RecordWithTags( cfsp.ctx, []tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusNotSampled)}, statCascadingFilterDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err)) + } } } @@ -245,17 +254,23 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() { if trace.FinalDecision == sampling.SecondChance { trace.FinalDecision = cfsp.updateRate(currSecond, trace.SpanCount) if trace.FinalDecision == sampling.Sampled { - _ = stats.RecordWithTags( + err := stats.RecordWithTags( cfsp.ctx, []tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceSampled)}, statCascadingFilterDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on second run tick", zap.Error(err)) + } } else { - _ = stats.RecordWithTags( + err := stats.RecordWithTags( cfsp.ctx, []tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceExceeded)}, statCascadingFilterDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on second run tick", zap.Error(err)) + } } } @@ -282,7 +297,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() { updateFilteringTag(allSpans) } - _ = cfsp.nextConsumer.ConsumeTraces(cfsp.ctx, allSpans) + err := cfsp.nextConsumer.ConsumeTraces(cfsp.ctx, allSpans) + if err != nil { + cfsp.logger.Error("Sampling Policy Evaluation error on consuming traces", zap.Error(err)) + } } else { metrics.decisionNotSampled++ } @@ -367,30 +385,39 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace trace.SelectedByProbabilisticFilter = true } - _ = stats.RecordWithTags( + err := stats.RecordWithTags( policy.ctx, []tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSampled)}, statPolicyDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Making provisional decision error", zap.Error(err)) + } case sampling.NotSampled: if provisionalDecision == sampling.Unspecified { provisionalDecision = sampling.NotSampled } - _ = stats.RecordWithTags( + err := stats.RecordWithTags( policy.ctx, []tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusNotSampled)}, statPolicyDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Making provisional decision error", zap.Error(err)) + } case sampling.SecondChance: if provisionalDecision != sampling.Sampled { provisionalDecision = sampling.SecondChance } - _ = stats.RecordWithTags( + err := stats.RecordWithTags( policy.ctx, []tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSecondChance)}, statPolicyDecision.M(int64(1)), ) + if err != nil { + cfsp.logger.Error("Making provisional decision error", zap.Error(err)) + } } } diff --git a/pkg/processor/cascadingfilterprocessor/processor_test.go b/pkg/processor/cascadingfilterprocessor/processor_test.go index f475656248..0dfaa31939 100644 --- a/pkg/processor/cascadingfilterprocessor/processor_test.go +++ b/pkg/processor/cascadingfilterprocessor/processor_test.go @@ -51,7 +51,8 @@ func TestSequentialTraceArrival(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + require.NoError(t, err) tsp := sp.(*cascadingFilterSpanProcessor) for _, batch := range batches { assert.NoError(t, tsp.ConsumeTraces(context.Background(), batch)) @@ -75,7 +76,8 @@ func TestConcurrentTraceArrival(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + require.NoError(t, err) tsp := sp.(*cascadingFilterSpanProcessor) for _, batch := range batches { // Add the same traceId twice. @@ -113,7 +115,8 @@ func TestSequentialTraceMapSize(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + require.NoError(t, err) tsp := sp.(*cascadingFilterSpanProcessor) for _, batch := range batches { if err := tsp.ConsumeTraces(context.Background(), batch); err != nil { @@ -138,7 +141,8 @@ func TestConcurrentTraceMapSize(t *testing.T) { ExpectedNewTracesPerSec: 64, PolicyCfgs: testPolicy, } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg) + require.NoError(t, err) tsp := sp.(*cascadingFilterSpanProcessor) for _, batch := range batches { wg.Add(1) diff --git a/pkg/processor/cascadingfilterprocessor/sampling/numeric_tag_filter_test.go b/pkg/processor/cascadingfilterprocessor/sampling/numeric_tag_filter_test.go index 590453a217..097b4a357e 100644 --- a/pkg/processor/cascadingfilterprocessor/sampling/numeric_tag_filter_test.go +++ b/pkg/processor/cascadingfilterprocessor/sampling/numeric_tag_filter_test.go @@ -20,6 +20,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" ) @@ -77,7 +78,8 @@ func TestNumericTagFilter(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { - u, _ := uuid.NewRandom() + u, err := uuid.NewRandom() + require.NoError(t, err) decision := filter.Evaluate(pdata.NewTraceID(u), c.Trace) assert.Equal(t, decision, c.Decision) }) diff --git a/pkg/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go b/pkg/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go index b1851634af..cd319dcf87 100644 --- a/pkg/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go +++ b/pkg/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" ) @@ -32,10 +33,12 @@ var ( minNumberOfSpans = 2 ) -func newSpanPropertiesFilter(operationNamePattern *string, minDuration *time.Duration, minNumberOfSpans *int) policyEvaluator { +func newSpanPropertiesFilter(t *testing.T, operationNamePattern *string, minDuration *time.Duration, minNumberOfSpans *int) policyEvaluator { var operationRe *regexp.Regexp + var err error if operationNamePattern != nil { - operationRe, _ = regexp.Compile(*operationNamePattern) + operationRe, err = regexp.Compile(*operationNamePattern) + require.NoError(t, err) } return policyEvaluator{ logger: zap.NewNop(), @@ -47,15 +50,16 @@ func newSpanPropertiesFilter(operationNamePattern *string, minDuration *time.Dur } func evaluate(t *testing.T, evaluator policyEvaluator, traces *TraceData, expectedDecision Decision) { - u, _ := uuid.NewRandom() + u, err := uuid.NewRandom() + require.NoError(t, err) decision := evaluator.Evaluate(pdata.NewTraceID(u), traces) assert.Equal(t, expectedDecision, decision) } func TestPartialSpanPropertiesFilter(t *testing.T) { - opFilter := newSpanPropertiesFilter(&operationNamePattern, nil, nil) - durationFilter := newSpanPropertiesFilter(nil, &minDuration, nil) - spansFilter := newSpanPropertiesFilter(nil, nil, &minNumberOfSpans) + opFilter := newSpanPropertiesFilter(t, &operationNamePattern, nil, nil) + durationFilter := newSpanPropertiesFilter(t, nil, &minDuration, nil) + spansFilter := newSpanPropertiesFilter(t, nil, nil, &minNumberOfSpans) cases := []struct { Desc string @@ -122,7 +126,7 @@ func TestSpanPropertiesFilter(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { // Regular match - filter := newSpanPropertiesFilter(&operationNamePattern, &minDuration, &minNumberOfSpans) + filter := newSpanPropertiesFilter(t, &operationNamePattern, &minDuration, &minNumberOfSpans) evaluate(t, filter, c.Trace, c.Decision) // Invert match diff --git a/pkg/processor/sourceprocessor/source_processor.go b/pkg/processor/sourceprocessor/source_processor.go index e1c4b2e25c..6d5bb6dfd5 100644 --- a/pkg/processor/sourceprocessor/source_processor.go +++ b/pkg/processor/sourceprocessor/source_processor.go @@ -28,6 +28,18 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/sourceprocessor/observability" ) +var ( + formatRegex *regexp.Regexp +) + +func init() { + var err error + formatRegex, err = regexp.Compile(`\%\{(\w+)\}`) + if err != nil { + panic("failed to parse regex: " + err.Error()) + } +} + type sourceTraceKeys struct { annotationPrefix string containerKey string @@ -336,14 +348,12 @@ func (stp *sourceTraceProcessor) enrichPodName(atts *pdata.AttributeMap) { } func extractFormat(format string, name string, keys sourceTraceKeys) attributeFiller { - r, _ := regexp.Compile(`\%\{(\w+)\}`) - labels := make([]string, 0) - matches := r.FindAllStringSubmatch(format, -1) + matches := formatRegex.FindAllStringSubmatch(format, -1) for _, matchset := range matches { labels = append(labels, keys.convertKey(matchset[1])) } - template := r.ReplaceAllString(format, "%s") + template := formatRegex.ReplaceAllString(format, "%s") return attributeFiller{ name: name, diff --git a/pkg/processor/sumologicsyslogprocessor/processor.go b/pkg/processor/sumologicsyslogprocessor/processor.go index cdb33206a0..d45c326ea1 100644 --- a/pkg/processor/sumologicsyslogprocessor/processor.go +++ b/pkg/processor/sumologicsyslogprocessor/processor.go @@ -16,6 +16,7 @@ package sumologicsyslogprocessor import ( "context" + "fmt" "regexp" "strconv" @@ -100,7 +101,10 @@ func (ssp *sumologicSyslogProcessor) ProcessLogs(ctx context.Context, ld pdata.L match := ssp.syslogFacilityRegex.FindStringSubmatch(log.Body().StringVal()) if match != nil { - facility, _ := strconv.Atoi(match[1]) + facility, err := strconv.Atoi(match[1]) + if err != nil { + return ld, fmt.Errorf("failed to parse: %s, err: %w", match[1], err) + } facility = facility / 8 value, ok = facilities[facility]