Skip to content

Commit

Permalink
ci: add more linters and fix existing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Jun 26, 2021
1 parent 3965b62 commit 81e3ba8
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 37 deletions.
39 changes: 39 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
run:
timeout: 3m
allow-parallel-runners: true

linters-settings:
errcheck:
check-type-assertions: false
check-blank: true

gosimple:
go: "1.16"

maligned:
# print struct with more effective memory layout or not, false by default
suggest-new: true

unused:
go: "1.16"

linters:
disable-all: false
enable:
- gofmt
- deadcode
- unused
- errcheck
- goimports
- misspell
- noctx
disable:
- maligned
- prealloc
fast: false

issues:
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0
# Maximum count of issues with the same text. Set to 0 to disable. Default is 3.
max-same-issues: 0
6 changes: 1 addition & 5 deletions pkg/Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,4 @@ fmt:

.PHONY: lint
lint:
$(LINT) run \
--enable deadcode,unused,errcheck,goimports \
--allow-parallel-runners \
--max-same-issues 10 \
--timeout 3m
$(LINT) run
15 changes: 12 additions & 3 deletions pkg/extension/sumologicextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,23 @@ func (se *SumologicExtension) heartbeatLoop() {
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
// When the close channel is closed ...
<-se.closeChan
// ... cancel the ongoing heartbeat request.
cancel()
}()

se.logger.Info("Heartbeat heartbeat API initialized. Starting sending hearbeat requests")
for {
select {
case <-se.closeChan:
se.logger.Info("Heartbeat sender turn off")
return
default:
err := se.sendHeartbeat()
err := se.sendHeartbeat(ctx)
if err != nil {
se.logger.Error("Heartbeat error", zap.Error(err))
}
Expand All @@ -212,12 +221,12 @@ func (se *SumologicExtension) heartbeatLoop() {
}
}

func (se *SumologicExtension) sendHeartbeat() error {
func (se *SumologicExtension) sendHeartbeat(ctx context.Context) error {
u, err := url.Parse(se.baseUrl + heartbeatUrl)
if err != nil {
return fmt.Errorf("unable to parse heartbeat URL %w", err)
}
req, err := http.NewRequest(http.MethodPost, u.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
if err != nil {
return fmt.Errorf("unable to create HTTP request %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/processor/cascadingfilterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ var (
defaultProbabilisticFilteringRatio = float32(0.2)
)

func init() {
// TODO: this is hardcoding the metrics level
err := view.Register(CascadingFilterMetricViews(configtelemetry.LevelNormal)...)
if err != nil {
panic("failed to register cascadingfilterprocessor: " + err.Error())
}
}

// NewFactory returns a new factory for the Cascading Filter processor.
func NewFactory() component.ProcessorFactory {
// TODO: this is hardcoding the metrics level and skips error handling
_ = view.Register(CascadingFilterMetricViews(configtelemetry.LevelNormal)...)

return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
Expand Down
45 changes: 36 additions & 9 deletions pkg/processor/cascadingfilterprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,27 +211,36 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
if trace.SelectedByProbabilisticFilter {
selectedByProbabilisticFilterSpans += trace.SpanCount
}
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSampled)},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err))
}
} else {
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusExceededKey)},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err))
}
}
} else if provisionalDecision == sampling.SecondChance {
trace.FinalDecision = sampling.SecondChance
} else {
trace.FinalDecision = provisionalDecision
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusNotSampled)},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on first run tick", zap.Error(err))
}
}
}

Expand All @@ -245,17 +254,23 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
if trace.FinalDecision == sampling.SecondChance {
trace.FinalDecision = cfsp.updateRate(currSecond, trace.SpanCount)
if trace.FinalDecision == sampling.Sampled {
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceSampled)},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on second run tick", zap.Error(err))
}
} else {
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
cfsp.ctx,
[]tag.Mutator{tag.Insert(tagCascadingFilterDecisionKey, statusSecondChanceExceeded)},
statCascadingFilterDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on second run tick", zap.Error(err))
}
}
}

Expand All @@ -282,7 +297,10 @@ func (cfsp *cascadingFilterSpanProcessor) samplingPolicyOnTick() {
updateFilteringTag(allSpans)
}

_ = cfsp.nextConsumer.ConsumeTraces(cfsp.ctx, allSpans)
err := cfsp.nextConsumer.ConsumeTraces(cfsp.ctx, allSpans)
if err != nil {
cfsp.logger.Error("Sampling Policy Evaluation error on consuming traces", zap.Error(err))
}
} else {
metrics.decisionNotSampled++
}
Expand Down Expand Up @@ -367,30 +385,39 @@ func (cfsp *cascadingFilterSpanProcessor) makeProvisionalDecision(id pdata.Trace
trace.SelectedByProbabilisticFilter = true
}

_ = stats.RecordWithTags(
err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSampled)},
statPolicyDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Making provisional decision error", zap.Error(err))
}
case sampling.NotSampled:
if provisionalDecision == sampling.Unspecified {
provisionalDecision = sampling.NotSampled
}
_ = stats.RecordWithTags(
err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusNotSampled)},
statPolicyDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Making provisional decision error", zap.Error(err))
}
case sampling.SecondChance:
if provisionalDecision != sampling.Sampled {
provisionalDecision = sampling.SecondChance
}

_ = stats.RecordWithTags(
err := stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagPolicyDecisionKey, statusSecondChance)},
statPolicyDecision.M(int64(1)),
)
if err != nil {
cfsp.logger.Error("Making provisional decision error", zap.Error(err))
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/processor/cascadingfilterprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func TestSequentialTraceArrival(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
require.NoError(t, err)
tsp := sp.(*cascadingFilterSpanProcessor)
for _, batch := range batches {
assert.NoError(t, tsp.ConsumeTraces(context.Background(), batch))
Expand All @@ -75,7 +76,8 @@ func TestConcurrentTraceArrival(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
require.NoError(t, err)
tsp := sp.(*cascadingFilterSpanProcessor)
for _, batch := range batches {
// Add the same traceId twice.
Expand Down Expand Up @@ -113,7 +115,8 @@ func TestSequentialTraceMapSize(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
require.NoError(t, err)
tsp := sp.(*cascadingFilterSpanProcessor)
for _, batch := range batches {
if err := tsp.ConsumeTraces(context.Background(), batch); err != nil {
Expand All @@ -138,7 +141,8 @@ func TestConcurrentTraceMapSize(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
sp, err := newTraceProcessor(zap.NewNop(), consumertest.NewNop(), cfg)
require.NoError(t, err)
tsp := sp.(*cascadingFilterSpanProcessor)
for _, batch := range batches {
wg.Add(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -77,7 +78,8 @@ func TestNumericTagFilter(t *testing.T) {

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
u, _ := uuid.NewRandom()
u, err := uuid.NewRandom()
require.NoError(t, err)
decision := filter.Evaluate(pdata.NewTraceID(u), c.Trace)
assert.Equal(t, decision, c.Decision)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)
Expand All @@ -32,10 +33,12 @@ var (
minNumberOfSpans = 2
)

func newSpanPropertiesFilter(operationNamePattern *string, minDuration *time.Duration, minNumberOfSpans *int) policyEvaluator {
func newSpanPropertiesFilter(t *testing.T, operationNamePattern *string, minDuration *time.Duration, minNumberOfSpans *int) policyEvaluator {
var operationRe *regexp.Regexp
var err error
if operationNamePattern != nil {
operationRe, _ = regexp.Compile(*operationNamePattern)
operationRe, err = regexp.Compile(*operationNamePattern)
require.NoError(t, err)
}
return policyEvaluator{
logger: zap.NewNop(),
Expand All @@ -47,15 +50,16 @@ func newSpanPropertiesFilter(operationNamePattern *string, minDuration *time.Dur
}

func evaluate(t *testing.T, evaluator policyEvaluator, traces *TraceData, expectedDecision Decision) {
u, _ := uuid.NewRandom()
u, err := uuid.NewRandom()
require.NoError(t, err)
decision := evaluator.Evaluate(pdata.NewTraceID(u), traces)
assert.Equal(t, expectedDecision, decision)
}

func TestPartialSpanPropertiesFilter(t *testing.T) {
opFilter := newSpanPropertiesFilter(&operationNamePattern, nil, nil)
durationFilter := newSpanPropertiesFilter(nil, &minDuration, nil)
spansFilter := newSpanPropertiesFilter(nil, nil, &minNumberOfSpans)
opFilter := newSpanPropertiesFilter(t, &operationNamePattern, nil, nil)
durationFilter := newSpanPropertiesFilter(t, nil, &minDuration, nil)
spansFilter := newSpanPropertiesFilter(t, nil, nil, &minNumberOfSpans)

cases := []struct {
Desc string
Expand Down Expand Up @@ -122,7 +126,7 @@ func TestSpanPropertiesFilter(t *testing.T) {
for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
// Regular match
filter := newSpanPropertiesFilter(&operationNamePattern, &minDuration, &minNumberOfSpans)
filter := newSpanPropertiesFilter(t, &operationNamePattern, &minDuration, &minNumberOfSpans)
evaluate(t, filter, c.Trace, c.Decision)

// Invert match
Expand Down
18 changes: 14 additions & 4 deletions pkg/processor/sourceprocessor/source_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/sourceprocessor/observability"
)

var (
formatRegex *regexp.Regexp
)

func init() {
var err error
formatRegex, err = regexp.Compile(`\%\{(\w+)\}`)
if err != nil {
panic("failed to parse regex: " + err.Error())
}
}

type sourceTraceKeys struct {
annotationPrefix string
containerKey string
Expand Down Expand Up @@ -336,14 +348,12 @@ func (stp *sourceTraceProcessor) enrichPodName(atts *pdata.AttributeMap) {
}

func extractFormat(format string, name string, keys sourceTraceKeys) attributeFiller {
r, _ := regexp.Compile(`\%\{(\w+)\}`)

labels := make([]string, 0)
matches := r.FindAllStringSubmatch(format, -1)
matches := formatRegex.FindAllStringSubmatch(format, -1)
for _, matchset := range matches {
labels = append(labels, keys.convertKey(matchset[1]))
}
template := r.ReplaceAllString(format, "%s")
template := formatRegex.ReplaceAllString(format, "%s")

return attributeFiller{
name: name,
Expand Down
6 changes: 5 additions & 1 deletion pkg/processor/sumologicsyslogprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sumologicsyslogprocessor

import (
"context"
"fmt"
"regexp"
"strconv"

Expand Down Expand Up @@ -100,7 +101,10 @@ func (ssp *sumologicSyslogProcessor) ProcessLogs(ctx context.Context, ld pdata.L
match := ssp.syslogFacilityRegex.FindStringSubmatch(log.Body().StringVal())

if match != nil {
facility, _ := strconv.Atoi(match[1])
facility, err := strconv.Atoi(match[1])
if err != nil {
return ld, fmt.Errorf("failed to parse: %s, err: %w", match[1], err)
}
facility = facility / 8

value, ok = facilities[facility]
Expand Down

0 comments on commit 81e3ba8

Please sign in to comment.