Skip to content

Commit

Permalink
tested writer clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Jan 30, 2023
1 parent 5c2b350 commit 12fd129
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 4 deletions.
6 changes: 6 additions & 0 deletions clients/pkg/promtail/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type WAL interface {
Sync() error
Dir() string
Close()
NextSegment() (int, error)
}

type wrapper struct {
Expand Down Expand Up @@ -97,3 +98,8 @@ func (w *wrapper) Sync() error {
func (w *wrapper) Dir() string {
return w.wal.Dir()
}

// NextSegment closes the current segment synchronously. Mainly used for testing.
func (w *wrapper) NextSegment() (int, error) {
return w.wal.NextSegmentSync()
}
16 changes: 12 additions & 4 deletions clients/pkg/promtail/wal/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (wrt *Writer) start(maxSegmentAge time.Duration) {
for {
select {
case <-trigger.C:
level.Debug(wrt.log).Log("msg", "Running wal old segments cleanup")
if err := wrt.cleanSegments(maxSegmentAge); err != nil {
level.Error(wrt.log).Log("msg", "Error cleaning old segments", "err", err)
}
Expand Down Expand Up @@ -117,7 +118,9 @@ func (wrt *Writer) Stop() {
}

// cleanSegments will remove segments older than maxAge from the WAL directory. If there's just one segment, none will be
// deleted since it's likely there's active readers on it.
// deleted since it's likely there's active readers on it. In case there's multiple segments, each will be deleted if:
// - It's not the last (highest numbered) segment
// - It's last modified date is older than the max allowed age
func (wrt *Writer) cleanSegments(maxAge time.Duration) error {
maxModifiedAt := time.Now().Add(-maxAge)
walDir := wrt.wal.Dir()
Expand All @@ -129,10 +132,15 @@ func (wrt *Writer) cleanSegments(maxAge time.Duration) error {
if len(segments) <= 1 {
return nil
}
// find the most recent, or head segment to avoid cleaning it up
lastSegment := -1
for _, segment := range segments {
// TODO: Should we avoid deleting the last segment as well? Maybe the reader side is far behind, even though it hasn't
// been written in the last hour
if segment.lastModified.Before(maxModifiedAt) {
if lastSegment < segment.number {
lastSegment = segment.number
}
}
for _, segment := range segments {
if segment.lastModified.Before(maxModifiedAt) && segment.number != lastSegment {
// segment is older than allowed age, cleaning up
if err := os.Remove(filepath.Join(walDir, segment.name)); err != nil {
level.Error(wrt.log).Log("msg", "Error old wal segment", "err", err, "segmentNum", segment.number)
Expand Down
142 changes: 142 additions & 0 deletions clients/pkg/promtail/wal/writer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package wal

import (
"github.com/go-kit/log/level"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -58,3 +60,143 @@ func TestWriter_EntriesAreWrittenToWALAndForwardedToClients(t *testing.T) {
require.Len(t, readEntries, len(lines), "written lines and seen differ")
require.Equal(t, testLabels, readEntries[0].Labels)
}

func TestWriter_OldSegmentsAreCleanedUp(t *testing.T) {
logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug())
dir := t.TempDir()

maxSegmentAge := time.Second * 2

writer, err := NewWriter(Config{
Dir: dir,
Enabled: true,
MaxSegmentAge: maxSegmentAge,
}, logger, prometheus.NewRegistry())
require.NoError(t, err)
defer func() {
writer.Stop()
}()

// write entries to wal and sync
var testLabels = model.LabelSet{
"testing": "log",
}
var lines = []string{
"some line",
"some other line",
"some other other line",
}

for _, line := range lines {
writer.Chan() <- api.Entry{
Labels: testLabels,
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: line,
},
}
}

// accessing the WAL inside, just for testing!
require.NoError(t, writer.wal.Sync(), "failed to sync wal")

// assert over WAL entries
readEntries, err := ReadWAL(dir)
require.NoError(t, err)
require.Len(t, readEntries, len(lines), "written lines and seen differ")
require.Equal(t, testLabels, readEntries[0].Labels)

watchAndLogDirEntries(t, dir)

// check segment is there
fileInfo, err := os.Stat(filepath.Join(dir, "00000000"))
require.NoError(t, err)
require.GreaterOrEqual(t, fileInfo.Size(), int64(0), "first segment size should be >= 0")

// force close segment, so that one is eventually cleaned up
_, err = writer.wal.NextSegment()
require.NoError(t, err, "error closing current segment")

// wait for segment to be cleaned
time.Sleep(maxSegmentAge * 2)

watchAndLogDirEntries(t, dir)

_, err = os.Stat(filepath.Join(dir, "00000000"))
require.Error(t, err)
require.ErrorIs(t, err, os.ErrNotExist, "expected file not exists error")
// Expect last, or "head" segment to still be alive
_, err = os.Stat(filepath.Join(dir, "00000001"))
require.NoError(t, err)
}

func TestWriter_NoSegmentIsCleanedUpIfTheresOnlyOne(t *testing.T) {
logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug())
dir := t.TempDir()

maxSegmentAge := time.Second * 2

writer, err := NewWriter(Config{
Dir: dir,
Enabled: true,
MaxSegmentAge: maxSegmentAge,
}, logger, prometheus.NewRegistry())
require.NoError(t, err)
defer func() {
writer.Stop()
}()

// write entries to wal and sync
var testLabels = model.LabelSet{
"testing": "log",
}
var lines = []string{
"some line",
}

for _, line := range lines {
writer.Chan() <- api.Entry{
Labels: testLabels,
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: line,
},
}
}

// accessing the WAL inside, just for testing!
require.NoError(t, writer.wal.Sync(), "failed to sync wal")

// assert over WAL entries
readEntries, err := ReadWAL(dir)
require.NoError(t, err)
require.Len(t, readEntries, len(lines), "written lines and seen differ")
require.Equal(t, testLabels, readEntries[0].Labels)

watchAndLogDirEntries(t, dir)

// check segment is there
fileInfo, err := os.Stat(filepath.Join(dir, "00000000"))
require.NoError(t, err)
require.GreaterOrEqual(t, fileInfo.Size(), int64(0), "first segment size should be >= 0")

// wait for segment to be cleaned
time.Sleep(maxSegmentAge * 2)

watchAndLogDirEntries(t, dir)

_, err = os.Stat(filepath.Join(dir, "00000000"))
require.NoError(t, err)
}

func watchAndLogDirEntries(t *testing.T, path string) {
dirs, err := os.ReadDir(path)
if len(dirs) == 0 {
t.Log("no dirs found")
return
}
require.NoError(t, err)
for _, dir := range dirs {
t.Logf("dir entry found: %s", dir.Name())
}
}

0 comments on commit 12fd129

Please sign in to comment.