Skip to content

Commit

Permalink
[filelogreceiver]: Add ability to sort by mtime (open-telemetry#28850)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
* Adds a new `mtime` sort type, which will sort files by their modified
time
* Add a feature gate for `mtime` sort type

An optional follow-up performance improvement may be made here, to have
the finder return fs.DirEntry directly to query the mtime without making
an extra call to os.Stat for each file.

**Link to tracking Issue:** open-telemetry#27812

**Testing:**
* Added unit tests for new functionality

**Documentation:** 
* Added new `mode` parameter to filelogreceiver docs
  • Loading branch information
BinaryFissionGames authored and jmsnll committed Nov 12, 2023
1 parent 3f4762f commit 4176227
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 19 deletions.
27 changes: 27 additions & 0 deletions .chloggen/filelogreceiver_sort-by-mtime-rework.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add the ability to order files by mtime, to only read the most recently modified files

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27812]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 7 additions & 1 deletion pkg/stanza/fileconsumer/matcher/internal/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type Option interface {
// Returned error is for explanitory purposes only.
// Returned error is for explanatory purposes only.
// All options will be called regardless of error.
apply([]*item) ([]*item, error)
}
Expand Down Expand Up @@ -49,6 +49,12 @@ type item struct {
}

func newItem(value string, regex *regexp.Regexp) (*item, error) {
if regex == nil {
return &item{
value: value,
}, nil
}

match := regex.FindStringSubmatch(value)
if match == nil {
return nil, fmt.Errorf("'%s' does not match regex", value)
Expand Down
58 changes: 51 additions & 7 deletions pkg/stanza/fileconsumer/matcher/internal/filter/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filter // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"fmt"
"os"
"sort"
"strconv"
"time"
Expand All @@ -18,24 +19,24 @@ type parseFunc func(string) (any, error)

type compareFunc func(a, b any) bool

type sortOption struct {
type regexSortOption struct {
regexKey string
parseFunc
compareFunc
}

func newSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) {
func newRegexSortOption(regexKey string, parseFunc parseFunc, compareFunc compareFunc) (Option, error) {
if regexKey == "" {
return nil, fmt.Errorf("regex key must be specified")
}
return sortOption{
return regexSortOption{
regexKey: regexKey,
parseFunc: parseFunc,
compareFunc: compareFunc,
}, nil
}

func (o sortOption) apply(items []*item) ([]*item, error) {
func (o regexSortOption) apply(items []*item) ([]*item, error) {
// Special case where sort.Slice will not run the 'less' func.
// We still need to ensure it parses in order to ensure the file should be included.
if len(items) == 1 {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (o sortOption) apply(items []*item) ([]*item, error) {
}

func SortNumeric(regexKey string, ascending bool) (Option, error) {
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return strconv.Atoi(s)
},
Expand All @@ -94,7 +95,7 @@ func SortNumeric(regexKey string, ascending bool) (Option, error) {
}

func SortAlphabetical(regexKey string, ascending bool) (Option, error) {
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return s, nil
},
Expand All @@ -118,7 +119,7 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin
if err != nil {
return nil, fmt.Errorf("load location %s: %w", loc, err)
}
return newSortOption(regexKey,
return newRegexSortOption(regexKey,
func(s string) (any, error) {
return timeutils.ParseStrptime(layout, s, loc)
},
Expand All @@ -130,3 +131,46 @@ func SortTemporal(regexKey string, ascending bool, layout string, location strin
},
)
}

type mtimeSortOption struct{}

type mtimeItem struct {
mtime time.Time
path string
item *item
}

func (m mtimeSortOption) apply(items []*item) ([]*item, error) {
mtimeItems := make([]mtimeItem, 0, len(items))
var errs error
for _, item := range items {
path := item.value
fi, err := os.Stat(path)
if err != nil {
errs = multierr.Append(errs, err)
continue
}

mtimeItems = append(mtimeItems, mtimeItem{
mtime: fi.ModTime(),
path: path,
item: item,
})
}

sort.SliceStable(mtimeItems, func(i, j int) bool {
// This checks if item i > j, in order to reverse the sort (most recently modified file is first in the list)
return mtimeItems[i].mtime.After(mtimeItems[j].mtime)
})

filteredValues := make([]*item, 0, len(items))
for _, mtimeItem := range mtimeItems {
filteredValues = append(filteredValues, mtimeItem.item)
}

return filteredValues, errs
}

func SortMtime() Option {
return mtimeSortOption{}
}
71 changes: 71 additions & 0 deletions pkg/stanza/fileconsumer/matcher/internal/filter/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package filter

import (
"fmt"
"os"
"path/filepath"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -169,3 +172,71 @@ func TestSort(t *testing.T) {
}
}
}

func TestMTimeFilter(t *testing.T) {
epoch := time.Unix(0, 0)
cases := []struct {
name string
files []string
fileMTimes []time.Time
expectedErr string
expect []string
}{
{
name: "No files",
files: []string{},
fileMTimes: []time.Time{},
expect: []string{},
},
{
name: "Single file",
files: []string{"a.log"},
fileMTimes: []time.Time{epoch},
expect: []string{"a.log"},
},
{
name: "Multiple files",
files: []string{"a.log", "b.log"},
fileMTimes: []time.Time{epoch, epoch.Add(time.Hour)},
expect: []string{"b.log", "a.log"},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tmpDir := t.TempDir()
items := []*item{}
// Create files with specified mtime
for i, file := range tc.files {
mtime := tc.fileMTimes[i]
fullPath := filepath.Join(tmpDir, file)

f, err := os.Create(fullPath)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NoError(t, os.Chtimes(fullPath, epoch, mtime))

it, err := newItem(fullPath, nil)
require.NoError(t, err)

items = append(items, it)
}

f := SortMtime()
result, err := f.apply(items)
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
}

relativeResult := []string{}
for _, r := range result {
rel, err := filepath.Rel(tmpDir, r.value)
require.NoError(t, err)
relativeResult = append(relativeResult, rel)
}

require.Equal(t, tc.expect, relativeResult)
})
}
}
44 changes: 37 additions & 7 deletions pkg/stanza/fileconsumer/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"regexp"

"go.opentelemetry.io/collector/featuregate"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/filter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher/internal/finder"
)
Expand All @@ -16,12 +18,20 @@ const (
sortTypeNumeric = "numeric"
sortTypeTimestamp = "timestamp"
sortTypeAlphabetical = "alphabetical"
sortTypeMtime = "mtime"
)

const (
defaultOrderingCriteriaTopN = 1
)

var mtimeSortTypeFeatureGate = featuregate.GlobalRegistry().MustRegister(
"filelog.mtimeSortType",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, allows usage of `ordering_criteria.mode` = `mtime`."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27812"),
)

type Criteria struct {
Include []string `mapstructure:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty"`
Expand Down Expand Up @@ -63,10 +73,6 @@ func New(c Criteria) (*Matcher, error) {
}, nil
}

if c.OrderingCriteria.Regex == "" {
return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified")
}

if c.OrderingCriteria.TopN < 0 {
return nil, fmt.Errorf("'top_n' must be a positive integer")
}
Expand All @@ -75,9 +81,17 @@ func New(c Criteria) (*Matcher, error) {
c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN
}

regex, err := regexp.Compile(c.OrderingCriteria.Regex)
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
var regex *regexp.Regexp
if orderingCriteriaNeedsRegex(c.OrderingCriteria.SortBy) {
if c.OrderingCriteria.Regex == "" {
return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified")
}

var err error
regex, err = regexp.Compile(c.OrderingCriteria.Regex)
if err != nil {
return nil, fmt.Errorf("compile regex: %w", err)
}
}

var filterOpts []filter.Option
Expand All @@ -101,6 +115,11 @@ func New(c Criteria) (*Matcher, error) {
return nil, fmt.Errorf("timestamp sort: %w", err)
}
filterOpts = append(filterOpts, f)
case sortTypeMtime:
if !mtimeSortTypeFeatureGate.IsEnabled() {
return nil, fmt.Errorf("the %q feature gate must be enabled to use %q sort type", mtimeSortTypeFeatureGate.ID(), sortTypeMtime)
}
filterOpts = append(filterOpts, filter.SortMtime())
default:
return nil, fmt.Errorf("'sort_type' must be specified")
}
Expand All @@ -115,6 +134,17 @@ func New(c Criteria) (*Matcher, error) {
}, nil
}

// orderingCriteriaNeedsRegex returns true if any of the sort options require a regex to be set.
func orderingCriteriaNeedsRegex(sorts []Sort) bool {
for _, s := range sorts {
switch s.SortType {
case sortTypeNumeric, sortTypeAlphabetical, sortTypeTimestamp:
return true
}
}
return false
}

type Matcher struct {
include []string
exclude []string
Expand Down
Loading

0 comments on commit 4176227

Please sign in to comment.