Skip to content

Commit

Permalink
[processor/logdeduplicationprocessor] Add logdedupprocessor (open-tel…
Browse files Browse the repository at this point in the history
…emetry#34465)

**Description:**

Starts the donation of the
[logdedupprocessor](https://github.com/observIQ/bindplane-agent/tree/release/v1.58.0/processor/logdeduplicationprocessor)
from ObserveIQ's Bindplane agent on behalf of @BinaryFissionGames.

**Link to tracking Issue:**

- Closes open-telemetry#34118 

**Testing:**

Includes unit tests.

**Documentation:**

---------

Co-authored-by: Brandon Johnson <binaryfissiongames@gmail.com>
Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
3 people authored and f7o committed Sep 12, 2024
1 parent cd61817 commit eb52553
Show file tree
Hide file tree
Showing 26 changed files with 2,174 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add-logdedupe-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logdedupeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new logdedupeprocessor processor that deduplicates log entries.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34118]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ processor/groupbyattrsprocessor/ @open-teleme
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez @djaglowski
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth
processor/logdeduplicationprocessor/ @open-telemetry/collector-contrib-approvers @BinaryFissionGames @MikeGoldsmith @djaglowski
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/metricstransformprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ body:
- processor/groupbytrace
- processor/interval
- processor/k8sattributes
- processor/logdeduplication
- processor/logstransform
- processor/metricsgeneration
- processor/metricstransform
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ body:
- processor/groupbytrace
- processor/interval
- processor/k8sattributes
- processor/logdeduplication
- processor/logstransform
- processor/metricsgeneration
- processor/metricstransform
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ body:
- processor/groupbytrace
- processor/interval
- processor/k8sattributes
- processor/logdeduplication
- processor/logstransform
- processor/metricsgeneration
- processor/metricstransform
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ body:
- processor/groupbytrace
- processor/interval
- processor/k8sattributes
- processor/logdeduplication
- processor/logstransform
- processor/metricsgeneration
- processor/metricstransform
Expand Down
1 change: 1 addition & 0 deletions processor/logdeduplicationprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
75 changes: 75 additions & 0 deletions processor/logdeduplicationprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Log DeDuplication Processor
This processor is used to deduplicate logs by detecting identical logs over a range of time and emitting a single log with the count of logs that were deduplicated.

## Supported pipelines
- Logs

## How It Works
1. The user configures the log deduplication processor in the desired logs pipeline.
2. All logs sent to the processor and aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes.
3. After the interval, the processor emits a single log with the count of logs that were deduplicated. The emitted log will have the same body, resource attributes, severity, and log attributes as the original log. The emitted log will also have the following new attributes:

- `log_count`: The count of logs that were deduplicated over the interval. The name of the attribute is configurable via the `log_count_attribute` parameter.
- `first_observed_timestamp`: The timestamp of the first log that was observed during the aggregation interval.
- `last_observed_timestamp`: The timestamp of the last log that was observed during the aggregation interval.

**Note**: The `ObservedTimestamp` and `Timestamp` of the emitted log will be the time that the aggregated log was emitted and will not be the same as the `ObservedTimestamp` and `Timestamp` of the original logs.

## Configuration
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. The available locations depend on the local IANA Time Zone database. [This page](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) contains many examples, such as `America/New_York`. |
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |


### Example Config
The following config is an example configuration for the log deduplication processor. It is configured with an aggregation interval of `60 seconds`, a timezone of `America/Los_Angeles`, and a log count attribute of `dedup_count`. It has no fields being excluded.
```yaml
receivers:
filelog:
include: [./example/*.log]
processors:
logdedup:
interval: 60s
log_count_attribute: dedup_count
timezone: 'America/Los_Angeles'
exporters:
googlecloud:

service:
pipelines:
logs:
receivers: [filelog]
processors: [logdedup]
exporters: [googlecloud]
```
### Example Config with Excluded Fields
The following config is an example configuration that excludes the following fields from being considered when searching for duplicate logs:
- `timestamp` field from the body
- `host.name` field from attributes
- `ip` nested attribute inside a map attribute named `src`

```yaml
receivers:
filelog:
include: [./example/*.log]
processors:
logdedup:
exclude_fields:
- body.timestamp
- attributes.host\.name
- attributes.src.ip
exporters:
googlecloud:
service:
pipelines:
logs:
receivers: [filelog]
processors: [logdedup]
exporters: [googlecloud]
```
102 changes: 102 additions & 0 deletions processor/logdeduplicationprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package logdeduplicationprocessor provides a processor that counts logs as metrics.
package logdeduplicationprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdeduplicationprocessor"

import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
)

// Config defaults
const (
// defaultInterval is the default export interval.
defaultInterval = 10 * time.Second

// defaultLogCountAttribute is the default log count attribute
defaultLogCountAttribute = "log_count"

// defaultTimezone is the default timezone
defaultTimezone = "UTC"

// bodyField is the name of the body field
bodyField = "body"

// attributeField is the name of the attribute field
attributeField = "attributes"
)

// Config errors
var (
errInvalidLogCountAttribute = errors.New("log_count_attribute must be set")
errInvalidInterval = errors.New("interval must be greater than 0")
errCannotExcludeBody = errors.New("cannot exclude the entire body")
)

// Config is the config of the processor.
type Config struct {
LogCountAttribute string `mapstructure:"log_count_attribute"`
Interval time.Duration `mapstructure:"interval"`
Timezone string `mapstructure:"timezone"`
ExcludeFields []string `mapstructure:"exclude_fields"`
}

// createDefaultConfig returns the default config for the processor.
func createDefaultConfig() component.Config {
return &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
}
}

// Validate validates the configuration
func (c Config) Validate() error {
if c.Interval <= 0 {
return errInvalidInterval
}

if c.LogCountAttribute == "" {
return errInvalidLogCountAttribute
}

_, err := time.LoadLocation(c.Timezone)
if err != nil {
return fmt.Errorf("timezone is invalid: %w", err)
}

return c.validateExcludeFields()
}

// validateExcludeFields validates that all the exclude fields
func (c Config) validateExcludeFields() error {
knownExcludeFields := make(map[string]struct{})

for _, field := range c.ExcludeFields {
// Special check to make sure the entire body is not excluded
if field == bodyField {
return errCannotExcludeBody
}

// Split and ensure the field starts with `body` or `attributes`
parts := strings.Split(field, fieldDelimiter)
if parts[0] != bodyField && parts[0] != attributeField {
return fmt.Errorf("an excludefield must start with %s or %s", bodyField, attributeField)
}

// If a field is valid make sure we haven't already seen it
if _, ok := knownExcludeFields[field]; ok {
return fmt.Errorf("duplicate exclude_field %s", field)
}

knownExcludeFields[field] = struct{}{}
}

return nil
}
109 changes: 109 additions & 0 deletions processor/logdeduplicationprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package logdeduplicationprocessor

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
)

func TestCreateDefaultProcessorConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.Equal(t, defaultInterval, cfg.Interval)
require.Equal(t, defaultLogCountAttribute, cfg.LogCountAttribute)
require.Equal(t, defaultTimezone, cfg.Timezone)
require.Equal(t, []string{}, cfg.ExcludeFields)
}

func TestValidateConfig(t *testing.T) {
testCases := []struct {
desc string
cfg *Config
expectedErr error
}{
{
desc: "invalid LogCountAttribute config",
cfg: &Config{
LogCountAttribute: "",
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
},
expectedErr: errInvalidLogCountAttribute,
},
{
desc: "invalid Interval config",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: -1,
Timezone: defaultTimezone,
ExcludeFields: []string{},
},
expectedErr: errInvalidInterval,
},
{
desc: "invalid Timezone config",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: "not a timezone",
ExcludeFields: []string{},
},
expectedErr: errors.New("timezone is invalid"),
},
{
desc: "invalid exclude entire body",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{bodyField},
},
expectedErr: errCannotExcludeBody,
},
{
desc: "invalid exclude field body",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{"not.value"},
},
expectedErr: errors.New("an excludefield must start with"),
},
{
desc: "invalid duplice exclude field",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{"body.thing", "body.thing"},
},
expectedErr: errors.New("duplicate exclude_field"),
},
{
desc: "valid config",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{"body.thing", "attributes.otherthing"},
},
expectedErr: nil,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
err := tc.cfg.Validate()
if tc.expectedErr != nil {
require.ErrorContains(t, err, tc.expectedErr.Error())
} else {
require.NoError(t, err)
}
})
}
}
Loading

0 comments on commit eb52553

Please sign in to comment.