Skip to content

Commit

Permalink
[pkg/stanza] support gzip compressed log files for file log receiver (#…
Browse files Browse the repository at this point in the history
…33406)

**Description:** This PR adds support for reading gzip compressed log
files for the file log receiver. This is done by, if enabled via the
`gzip_file_suffix` parameter, creating a `gzip.Reader` on top of the
file handle of a compressed file.

**Link to tracking Issue:** #2328

**Testing:** Added unit tests for the new functionality. Manually tested
using the following configuration for the filelog receiver:

```
  filelog:
    include: [ ./simple.log*.gz ]
    start_at: beginning
    gzip_file_suffix: ".gz"
    operators:
      - type: regex_parser
        regex: '^(?P<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$'
        timestamp:
          parse_from: attributes.time
          layout: '%Y-%m-%d %H:%M:%S'
        severity:
          parse_from: attributes.sev
```

**Documentation:** Added documentation in the readme of the file log
receiver

---------

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
Co-authored-by: Dan Jaglowski <jaglows3@gmail.com>
  • Loading branch information
bacherfl and djaglowski authored Jun 14, 2024
1 parent 69e9141 commit 01efffd
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 42 deletions.
27 changes: 27 additions & 0 deletions .chloggen/filelog_receiver_compressed_files.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for gzip compressed log files

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [2328]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -166,6 +167,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
Attributes: c.Resolver,
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
Compression: c.Compression,
}

var t tracker.Tracker
Expand Down
6 changes: 6 additions & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,12 @@ func (c *Config) withHeader(headerMatchPattern, extractRegex string) *Config {
return c
}

// withGzipFileSuffix is a builder-like helper for quickly setting up support for gzip compressed log files
func (c *Config) withGzip() *Config {
c.Compression = "gzip"
return c
}

const mockOperatorType = "mock"

func init() {
Expand Down
66 changes: 66 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fileconsumer

import (
"compress/gzip"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1528,3 +1529,68 @@ func symlinkTestCreateLogFile(t *testing.T, tempDir string, fileIdx, numLogLines
temp1.Close()
return tokens
}

// TestReadGzipCompressedLogsFromBeginning tests that, when starting from beginning of a gzip compressed file, we
// read all the lines that are already there
func TestReadGzipCompressedLogsFromBeginning(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir).withGzip()
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")
writer := gzip.NewWriter(temp)

_, err := writer.Write([]byte("testlog1\ntestlog2\n"))
require.NoError(t, err)

require.NoError(t, writer.Close())

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

sink.ExpectToken(t, []byte("testlog1"))
sink.ExpectToken(t, []byte("testlog2"))
}

// TestReadGzipCompressedLogsFromEnd tests that, when starting at the end of a gzip compressed file, we
// read all the lines that are added afterward
func TestReadGzipCompressedLogsFromEnd(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir).withGzip()
cfg.StartAt = "end"
operator, sink := testManager(t, cfg)

// Create a file, then start
temp := filetest.OpenTempWithPattern(t, tempDir, "*.gz")

appendToLog := func(t *testing.T, content string) {
writer := gzip.NewWriter(temp)
_, err := writer.Write([]byte(content))
require.NoError(t, err)
require.NoError(t, writer.Close())
}

appendToLog(t, "testlog1\ntestlog2\n")

// poll for the first time - this should not lead to emitted
// logs as those were already in the existing file
operator.poll(context.TODO())

// append new content to the log and poll again - this should be picked up
appendToLog(t, "testlog3\n")
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog3"))

// do another iteration to verify correct setting of compressed reader offset
appendToLog(t, "testlog4\n")
operator.poll(context.TODO())
sink.ExpectToken(t, []byte("testlog4"))
}
5 changes: 4 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Factory struct {
EmitFunc emit.Callback
Attributes attrs.Resolver
DeleteAtEOF bool
Compression string
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand Down Expand Up @@ -73,14 +74,15 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
deleteAtEOF: f.DeleteAtEOF,
compression: f.Compression,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

if r.Fingerprint.Len() > r.fingerprintSize {
// User has reconfigured fingerprint_size
shorter, rereadErr := fingerprint.NewFromFile(file, r.fingerprintSize)
if rereadErr != nil {
return nil, fmt.Errorf("reread fingerprint: %w", err)
return nil, fmt.Errorf("reread fingerprint: %w", rereadErr)
}
if !r.Fingerprint.StartsWith(shorter) {
return nil, errors.New("file truncated")
Expand Down Expand Up @@ -119,5 +121,6 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
for k, v := range attributes {
r.FileAttributes[k] = v
}

return r, nil
}
37 changes: 36 additions & 1 deletion pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont

import (
"bufio"
"compress/gzip"
"context"
"errors"
"io"
"os"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -34,6 +36,7 @@ type Reader struct {
set component.TelemetrySettings
fileName string
file *os.File
reader io.Reader
fingerprintSize int
initialBufferSize int
maxLogSize int
Expand All @@ -45,10 +48,42 @@ type Reader struct {
emitFunc emit.Callback
deleteAtEOF bool
needsUpdateFingerprint bool
compression string
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
// SectionReader can only read a fixed window (from previous offset to EOF).
info, err := r.file.Stat()
if err != nil {
r.set.Logger.Error("Failed to stat", zap.Error(err))
return
}
currentEOF := info.Size()

// use a gzip Reader with an underlying SectionReader to pick up at the last
// offset of a gzip compressed file
gzipReader, err := gzip.NewReader(io.NewSectionReader(r.file, r.Offset, currentEOF))
if err != nil {
if !errors.Is(err, io.EOF) {
r.set.Logger.Error("Failed to create gzip reader", zap.Error(err))
}
return
} else {
r.reader = gzipReader
}
// Offset tracking in an uncompressed file is based on the length of emitted tokens, but in this case
// we need to set the offset to the end of the file.
defer func() {
r.Offset = currentEOF
}()
default:
r.reader = r.file
}

if _, err := r.file.Seek(r.Offset, 0); err != nil {
r.set.Logger.Error("Failed to seek", zap.Error(err))
return
Expand Down Expand Up @@ -154,7 +189,7 @@ func (r *Reader) close() {

// Read from the file and update the fingerprint if necessary
func (r *Reader) Read(dst []byte) (n int, err error) {
n, err = r.file.Read(dst)
n, err = r.reader.Read(dst)
if n == 0 || err != nil {
return
}
Expand Down
Loading

0 comments on commit 01efffd

Please sign in to comment.