Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Promtail stages #2996

Merged
merged 23 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixing tests, adding Stop to the interface.
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Nov 26, 2020
commit aa972c2abf9d09d6efcbe0ddb51662ce609f0bc8
32 changes: 23 additions & 9 deletions pkg/logentry/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -268,10 +270,14 @@ func Test_dropStage_Process(t *testing.T) {
m, err := newDropStage(util.Logger, tt.config, prometheus.DefaultRegisterer)
require.NoError(t, err)
out := processEntries(m, Entry{
Labels: tt.labels,
Line: tt.entry,
Extracted: tt.extracted,
Timestamp: tt.t,
Entry: api.Entry{
Labels: tt.labels,
Entry: logproto.Entry{
Timestamp: tt.t,
Line: tt.entry,
},
},
})
if tt.shouldDrop {
assert.Len(t, out, 0)
Expand All @@ -298,15 +304,23 @@ func TestDropPipeline(t *testing.T) {
require.NoError(t, err)

out := processEntries(pl, Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testMatchLogLineApp1,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testMatchLogLineApp1,
Timestamp: time.Now(),
},
},
}, Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testMatchLogLineApp2,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testMatchLogLineApp2,
Timestamp: time.Now(),
},
},
})

// Only the second line will go through.
Expand Down
22 changes: 16 additions & 6 deletions pkg/logentry/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -70,10 +72,14 @@ func TestNewDocker(t *testing.T) {
t.Fatalf("failed to create Docker parser: %s", err)
}
out := processEntries(p, Entry{
Labels: toLabelSet(tt.labels),
Extracted: map[string]interface{}{},
Line: tt.entry,
Timestamp: tt.t,
Entry: api.Entry{
Labels: toLabelSet(tt.labels),
Entry: logproto.Entry{
Line: tt.entry,
Timestamp: tt.t,
},
},
})[0]

assertLabels(t, tt.expectedLabels, out.Labels)
Expand Down Expand Up @@ -150,10 +156,14 @@ func TestNewCri(t *testing.T) {
}

out := processEntries(p, Entry{
Labels: toLabelSet(tt.labels),
Extracted: map[string]interface{}{},
Line: tt.entry,
Timestamp: tt.t,
Entry: api.Entry{
Labels: toLabelSet(tt.labels),
Entry: logproto.Entry{
Line: tt.entry,
Timestamp: tt.t,
},
},
})[0]

assertLabels(t, tt.expectedLabels, out.Labels)
Expand Down
22 changes: 16 additions & 6 deletions pkg/logentry/stages/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -88,10 +90,14 @@ func TestPipeline_JSON(t *testing.T) {
t.Fatal(err)
}
out := pl.Run(withInboundEntries(Entry{
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testData.entry,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testData.entry,
Timestamp: time.Now(),
},
},
}))
assert.Equal(t, testData.expectedExtract, (<-out).Extracted)
})
Expand Down Expand Up @@ -366,10 +372,14 @@ func TestJSONParser_Parse(t *testing.T) {
t.Fatalf("failed to create json parser: %s", err)
}
out := p.Run(withInboundEntries(Entry{
Labels: model.LabelSet{},
Extracted: tt.extracted,
Line: tt.entry,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: tt.entry,
Timestamp: time.Now(),
},
},
}))
assert.Equal(t, tt.expectedExtract, (<-out).Extracted)
})
Expand Down
5 changes: 4 additions & 1 deletion pkg/logentry/stages/labeldrop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,8 +65,10 @@ func Test_dropLabelStage_Process(t *testing.T) {
t.Fatal(err)
}
out := processEntries(st, Entry{
Labels: test.inputLabels,
Extracted: map[string]interface{}{},
Entry: api.Entry{
Labels: test.inputLabels,
},
})[0]
assert.Equal(t, test.expectedLabels, out.Labels)
})
Expand Down
26 changes: 19 additions & 7 deletions pkg/logentry/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,10 +55,14 @@ func TestLabelsPipeline_Labels(t *testing.T) {
}

out := processEntries(pl, Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testLabelsLogLine,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testLabelsLogLine,
Timestamp: time.Now(),
},
},
})[0]
assert.Equal(t, expectedLbls, out.Labels)
}
Expand All @@ -71,10 +77,14 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) {
}
Debug = true
_ = processEntries(pl, Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testLabelsLogLineWithMissingKey,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testLabelsLogLineWithMissingKey,
Timestamp: time.Now(),
},
},
})

expectedLog := "level=debug msg=\"failed to convert extracted label value to string\" err=\"Can't convert <nil> to string\" type=null"
Expand Down Expand Up @@ -192,8 +202,10 @@ func TestLabelStage_Process(t *testing.T) {
t.Fatal(err)
}
out := processEntries(st, Entry{
Labels: test.inputLabels,
Extracted: test.extractedData,
Entry: api.Entry{
Labels: test.inputLabels,
},
})[0]
assert.Equal(t, test.expectedLabels, out.Labels)
})
Expand Down
12 changes: 9 additions & 3 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -209,10 +211,14 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
e.Labels[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return Entry{
Labels: e.Labels,
Extracted: e.Extracted,
Line: string(newLine),
Timestamp: e.Timestamp,
Entry: api.Entry{
Labels: e.Labels,
Entry: logproto.Entry{
Line: string(newLine),
Timestamp: e.Timestamp,
},
},
}, true
}

Expand Down
22 changes: 16 additions & 6 deletions pkg/logentry/stages/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -68,10 +70,14 @@ func TestMatchPipeline(t *testing.T) {
out := pl.Run(in)

in <- Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testMatchLogLineApp1,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testMatchLogLineApp1,
Timestamp: time.Now(),
},
},
}
e := <-out

Expand Down Expand Up @@ -163,12 +169,16 @@ func TestMatcher(t *testing.T) {
if s != nil {

out := processEntries(s, Entry{
Labels: toLabelSet(tt.labels),
Extracted: map[string]interface{}{
"test_label": "unimportant value",
},
Line: "foo",
Timestamp: time.Now(),
Entry: api.Entry{
Labels: toLabelSet(tt.labels),
Entry: logproto.Entry{
Line: "foo",
Timestamp: time.Now(),
},
},
})

if tt.shouldDrop {
Expand Down
3 changes: 0 additions & 3 deletions pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ type metricStage struct {

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if _, ok := labels[dropLabel]; ok {
return
}
for name, collector := range m.metrics {
// There is a special case for counters where we count even if there is no match in the extracted map.
if c, ok := collector.(*metric.Counters); ok {
Expand Down
42 changes: 30 additions & 12 deletions pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/stretchr/testify/assert"

"github.com/grafana/loki/pkg/logentry/metric"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
)

var testMetricYaml = `
Expand Down Expand Up @@ -116,10 +118,14 @@ func TestMetricsPipeline(t *testing.T) {
}

out := <-pl.Run(withInboundEntries(Entry{
Labels: model.LabelSet{"test": "app"},
Line: testMetricLogLine1,
Timestamp: time.Now(),
Extracted: map[string]interface{}{},
Entry: api.Entry{
Labels: model.LabelSet{"test": "app"},
Entry: logproto.Entry{
Line: testMetricLogLine1,
Timestamp: time.Now(),
},
},
}))
out.Line = testMetricLogLine2
<-pl.Run(withInboundEntries(out))
Expand All @@ -140,10 +146,14 @@ func TestPipelineWithMissingKey_Metrics(t *testing.T) {
}
Debug = true
<-pl.Run(withInboundEntries(Entry{
Labels: model.LabelSet{},
Extracted: map[string]interface{}{},
Line: testMetricLogLineWithMissingKey,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Line: testMetricLogLineWithMissingKey,
Timestamp: time.Now(),
},
},
}))
expectedLog := "level=debug msg=\"failed to convert extracted value to string, can't perform value comparison\" metric=bloki_count err=\"can't convert <nil> to string\""
if !(strings.Contains(buf.String(), expectedLog)) {
Expand Down Expand Up @@ -190,10 +200,14 @@ func TestMetricsWithDropInPipeline(t *testing.T) {
in := make(chan Entry)
out := pl.Run(in)
in <- Entry{
Labels: lbls,
Extracted: map[string]interface{}{},
Line: testMetricLogLine1,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: lbls,
Entry: logproto.Entry{
Line: testMetricLogLine1,
Timestamp: time.Now(),
},
},
}
e := <-out
e.Labels = droppingLabels
Expand Down Expand Up @@ -386,10 +400,14 @@ func TestMetricStage_Process(t *testing.T) {
}

out := processEntries(jsonStage, Entry{
Labels: labelFoo,
Extracted: map[string]interface{}{},
Line: logFixture,
Timestamp: time.Now(),
Entry: api.Entry{
Labels: labelFoo,
Entry: logproto.Entry{
Line: logFixture,
Timestamp: time.Now(),
},
},
})
out[0].Line = regexLogFixture
out = processEntries(regexStage, out...)
Expand Down
Loading