Skip to content

Commit

Permalink
Add logql filter to match stages and drop capability (#1112)
Browse files Browse the repository at this point in the history
* Add logql filter to match stages and drop capability

* use const string instead and remove unused value

* Uses action property instead of drop_entries
  • Loading branch information
cyriltovena authored Oct 15, 2019
1 parent fa18893 commit 1f6f433
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 55 deletions.
15 changes: 7 additions & 8 deletions docs/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ stages:
2. Change the timestamp of the log line
3. Change the content of the log line
4. Create a metric based on the extracted data
4. **Filtering stages** optionally apply a subset of stages based on some
4. **Filtering stages** optionally apply a subset of stages or drop entries based on some
condition.

Typical pipelines will start with a parsing stage (such as a
Expand All @@ -28,7 +28,7 @@ something with that extracted data. The most common action stage will be a
[labels](./stages/labels.md) stage to turn extracted data into a label.

A common stage will also be the [match](./stages/match.md) stage to selectively
apply stages based on the current labels.
apply stages or drop entries based on a [LogQL stream selector and filter expressions](../../logql.md).

Note that pipelines can not currently be used to deduplicate logs; Loki will
receive the same log line multiple times if, for example:
Expand Down Expand Up @@ -76,9 +76,9 @@ scrape_configs:
source: timestamp

# This stage is only going to run if the scraped target has a label of
# "name" with a value of "nginx".
# "name" with a value of "nginx" and if the log line contains the word "GET"
- match:
selector: '{name="nginx"}'
selector: '{name="nginx"} |= "GET"'
stages:
# This regex stage extracts a new output by matching against some
# values and capturing the rest.
Expand Down Expand Up @@ -126,10 +126,10 @@ scrape_configs:
level:
component:

# This stage will only run if the scraped target has a label of "app"
# and a value of "some-app".
# This stage will only run if the scraped target has a label "app"
# with a value of "some-app" and the log line doesn't contains the word "info"
- match:
selector: '{app="some-app"}'
selector: '{app="some-app"} != "info"'
stages:
# The regex stage tries to extract a Go panic by looking for panic:
# in the log message.
Expand Down Expand Up @@ -215,4 +215,3 @@ Action stages:
Filtering stages:

* [match](./stages/match.md): Conditionally run stages based on the label set.

33 changes: 23 additions & 10 deletions docs/clients/promtail/stages/match.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# `match` stage

The match stage is a filtering stage that conditionally applies a set of stages
when a log entry matches a configurable [LogQL](../../../logql.md) stream
selector.
or drop entries when a log entry matches a configurable [LogQL](../../../logql.md)
stream selector and filter expressions.

## Schema

```yaml
match:
# LogQL stream selector.
# LogQL stream selector and filter expressions.
selector: <string>

# Names the pipeline. When defined, creates an additional label in
# the pipeline_duration_seconds histogram, where the value is
# concatenated with job_name using an underscore.
[pipieline_name: <string>]
[pipeline_name: <string>]

# When set to drop (default to keep), all entries matching the selector will
# be dropped. Stages must not be defined when dropping entries.
[action: <keep|drop>]

# Nested set of pipeline stages only if the selector
# matches the labels of the log entries:
Expand Down Expand Up @@ -46,40 +50,49 @@ pipeline_stages:
- labels:
app:
- match:
selector: "{app=\"loki\"}"
selector: '{app="loki"}'
stages:
- json:
expressions:
msg: message
- match:
pipeline_name: "app2"
selector: "{app=\"pokey\"}"
selector: '{app="pokey"}'
action: keep
stages:
- json:
expressions:
msg: msg
- match:
selector: '{app="promtail"} |~ ".*noisy error.*"'
action: drop
- output:
source: msg
```
And the given log line:
And given log lines:
```
```json
{ "time":"2012-11-01T22:08:41+00:00", "app":"loki", "component": ["parser","type"], "level" : "WARN", "message" : "app1 log line" }
{ "time":"2012-11-01T22:08:41+00:00", "app":"promtail", "component": ["parser","type"], "level" : "ERROR", "message" : "foo noisy error" }
```

The first stage will add `app` with a value of `loki` into the extracted map,
The first stage will add `app` with a value of `loki` into the extracted map for the first log line,
while the second stage will add `app` as a label (again with the value of `loki`).
The second line will follow the same flow and will be added the label `app` with a value of `promtail`.

The third stage uses LogQL to only execute the nested stages when there is a
label of `app` whose value is `loki`. This matches in our case; the nested
label of `app` whose value is `loki`. This matches the first line in our case; the nested
`json` stage then adds `msg` into the extracted map with a value of `app1 log
line`.

The fourth stage uses LogQL to only executed the nested stages when there is a
label of `app` whose value is `pokey`. This does **not** match in our case, so
the nested `json` stage is not ran.

The fifth stage will drop any entries from the application `promtail` that matches
the regex `.*noisy error`.

The final `output` stage changes the contents of the log line to be the value of
`msg` from the extracted map. In this case, the log line is changed to `app1 log
line`.
59 changes: 49 additions & 10 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package stages
import (
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logql"
)
Expand All @@ -19,17 +20,22 @@ const (
ErrSelectorRequired = "selector statement required for match stage"
ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'"
ErrSelectorSyntax = "invalid selector syntax for match stage"
ErrStagesWithDropLine = "match stage configured to drop entries cannot contains stages"
ErrUnknownMatchAction = "match stage action should be 'keep' or 'drop'"
MatchActionKeep = "keep"
MatchActionDrop = "drop"
)

// MatcherConfig contains the configuration for a matcherStage
type MatcherConfig struct {
PipelineName *string `mapstructure:"pipeline_name"`
Selector string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
Action string `mapstructure:"action"`
}

// validateMatcherConfig validates the MatcherConfig for the matcherStage
func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
func validateMatcherConfig(cfg *MatcherConfig) (logql.LogSelectorExpr, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
Expand All @@ -39,14 +45,26 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
if cfg.Stages == nil || len(cfg.Stages) == 0 {
switch cfg.Action {
case MatchActionKeep, MatchActionDrop:
case "":
cfg.Action = MatchActionKeep
default:
return nil, errors.New(ErrUnknownMatchAction)
}

if cfg.Action == MatchActionKeep && (cfg.Stages == nil || len(cfg.Stages) == 0) {
return nil, errors.New(ErrMatchRequiresStages)
}
matchers, err := logql.ParseMatchers(cfg.Selector)
if cfg.Action == MatchActionDrop && (cfg.Stages != nil && len(cfg.Stages) != 0) {
return nil, errors.New(ErrStagesWithDropLine)
}

selector, err := logql.ParseLogSelector(cfg.Selector)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
return matchers, nil
return selector, nil
}

// newMatcherStage creates a new matcherStage from config
Expand All @@ -56,7 +74,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
if err != nil {
return nil, err
}
matchers, err := validateMatcherConfig(cfg)
selector, err := validateMatcherConfig(cfg)
if err != nil {
return nil, err
}
Expand All @@ -67,21 +85,34 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
nPtr = &name
}

pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer)
var pl *Pipeline
if cfg.Action == MatchActionKeep {
var err error
pl, err = NewPipeline(logger, cfg.Stages, nPtr, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
}
}

filter, err := selector.Filter()
if err != nil {
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
return nil, errors.Wrap(err, "error parsing filter")
}

return &matcherStage{
matchers: matchers,
matchers: selector.Matchers(),
pipeline: pl,
action: cfg.Action,
filter: filter,
}, nil
}

// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
matchers []*labels.Matcher
filter logql.Filter
pipeline Stage
action string
}

// Process implements Stage
Expand All @@ -91,7 +122,15 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter
return
}
}
m.pipeline.Process(labels, extracted, t, entry)
if m.filter == nil || m.filter([]byte(*entry)) {
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
labels[dropLabel] = ""
case MatchActionKeep:
m.pipeline.Process(labels, extracted, t, entry)
}
}
}

// Name implements Stage
Expand Down
Loading

0 comments on commit 1f6f433

Please sign in to comment.