Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promtail: add "max_age" field to configure cutoff for journal reading #921

Merged
merged 3 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/promtail-examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ clients:
scrape_configs:
- job_name: journal
journal:
max_age: 12h
path: /var/log/journal
labels:
job: systemd-journal
Expand All @@ -103,6 +104,11 @@ Just like the Docker example, the `scrape_configs` sections holds various
jobs for parsing logs. A job with a `journal` key configures it for systemd
journal reading.

`max_age` is an optional string specifying the earliest entry that will be
read. If unspecified, `max_age` defaults to `7h`. Even if the position in the
journal is saved, if the entry corresponding to that position is older than
the max_age, the position won't be used.

`path` is an optional string specifying the path to read journal entries
from. If unspecified, defaults to the system default (`/var/log/journal`).

Expand Down Expand Up @@ -130,4 +136,4 @@ If running with Docker, that means to bind that path:
docker run -d --name promtail --network loki_network -p 9080:9080 \
-v /var/log/journal:/var/log/journal \
mypromtail-image -config.file=/etc/promtail/my-systemd-journal-config.yaml
```
```
8 changes: 8 additions & 0 deletions pkg/promtail/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type Config struct {

// JournalTargetConfig describes systemd journal records to scrape.
type JournalTargetConfig struct {
// MaxAge determines the oldest relative time from process start that will
// be read and sent to Loki. Values like 14h means no entry older than
// 14h will be read. If unspecified, defaults to 7h.
//
// A relative time specified here takes precedence over the saved position;
// if the cursor is older than the MaxAge value, it will not be used.
MaxAge string `yaml:"max_age"`

// Labels optionally holds labels to associate with each record coming out
// of the journal.
Labels model.LabelSet `yaml:"labels"`
Expand Down
109 changes: 104 additions & 5 deletions pkg/promtail/targets/journaltarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,48 @@ const (
// Formatter causes an immediate EOF and induces performance issues
// with how that is handled in sdjournal.
journalEmptyStr = " "

// journalDefaultMaxAgeTime represents the default earliest entry that
// will be read by the journal reader if there is no saved position
// newer than the "max_age" time.
journalDefaultMaxAgeTime = time.Hour * 7
)

type journalReader interface {
io.Closer
Follow(until <-chan time.Time, writer io.Writer) error
}

// Abstracted functions for interacting with the journal, used for mocking in tests:
type journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error)
type journalEntryFunc func(cfg sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error)

// Default implementations of abstracted functions:
var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) {
return sdjournal.NewJournalReader(c)
}

var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
var (
journal *sdjournal.Journal
err error
)

if c.Path != "" {
journal, err = sdjournal.NewJournalFromDir(c.Path)
} else {
journal, err = sdjournal.NewJournal()
}

if err != nil {
return nil, err
} else if err := journal.SeekCursor(cursor); err != nil {
return nil, err
}

return journal.GetEntry()
}

// JournalTarget tails systemd journal entries.
type JournalTarget struct {
logger log.Logger
Expand Down Expand Up @@ -76,6 +105,7 @@ func NewJournalTarget(
relabelConfig,
targetConfig,
defaultJournalReaderFunc,
defaultJournalEntryFunc,
)
}

Expand All @@ -87,6 +117,7 @@ func journalTargetWithReader(
relabelConfig []*relabel.Config,
targetConfig *scrape.JournalTargetConfig,
readerFunc journalReaderFunc,
entryFunc journalEntryFunc,
) (*JournalTarget, error) {

positionPath := fmt.Sprintf("journal-%s", jobName)
Expand All @@ -95,6 +126,9 @@ func journalTargetWithReader(
if readerFunc == nil {
readerFunc = defaultJournalReaderFunc
}
if entryFunc == nil {
entryFunc = defaultJournalEntryFunc
}

until := make(chan time.Time)
t := &JournalTarget{
Expand All @@ -109,6 +143,17 @@ func journalTargetWithReader(
until: until,
}

var maxAge time.Duration
var err error
if targetConfig.MaxAge == "" {
maxAge = journalDefaultMaxAgeTime
} else {
maxAge, err = time.ParseDuration(targetConfig.MaxAge)
}
if err != nil {
return nil, errors.Wrap(err, "parsing journal reader 'max_age' config value")
}

// Default to system path if not defined. Passing an empty string to
// sdjournal is valid but forces reads from the journal to be from
// the local machine id only, which contradicts the default behavior
Expand All @@ -119,12 +164,13 @@ func journalTargetWithReader(
journalPath = "/var/log/journal"
}

var err error
t.r, err = readerFunc(sdjournal.JournalReaderConfig{
Path: journalPath,
Cursor: position,
Formatter: t.formatter,
cfg := t.generateJournalConfig(journalConfigBuilder{
JournalPath: journalPath,
Position: position,
MaxAge: maxAge,
EntryFunc: entryFunc,
})
t.r, err = readerFunc(cfg)
if err != nil {
return nil, errors.Wrap(err, "creating journal reader")
}
Expand All @@ -139,6 +185,59 @@ func journalTargetWithReader(
return t, nil
}

type journalConfigBuilder struct {
JournalPath string
Position string
MaxAge time.Duration
EntryFunc journalEntryFunc
}

// generateJournalConfig generates a journal config by trying to intelligently
// determine if a time offset or the cursor should be used for the starting
// position in the reader.
func (t *JournalTarget) generateJournalConfig(
cb journalConfigBuilder,
) sdjournal.JournalReaderConfig {

cfg := sdjournal.JournalReaderConfig{
Path: cb.JournalPath,
Formatter: t.formatter,
}

// When generating the JournalReaderConfig, we want to preferably
// use the Cursor, since it's guaranteed unique to a given journal
// entry. When we don't know the cursor position (or want to set
// a start time), we'll fall back to the less-precise Since, which
// takes a negative duration back from the current system time.
//
// The presence of Since takes precedence over Cursor, so we only
// ever set one and not both here.

if cb.Position == "" {
cfg.Since = -1 * cb.MaxAge
return cfg
}

// We have a saved position and need to get that entry to see if it's
// older than cb.MaxAge. If it _is_ older, then we need to use cfg.Since
// rather than cfg.Cursor.
entry, err := cb.EntryFunc(cfg, cb.Position)
if err != nil {
level.Error(t.logger).Log("msg", "received error reading saved journal position", "err", err.Error())
cfg.Since = -1 * cb.MaxAge
return cfg
}

ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if time.Since(ts) > cb.MaxAge {
cfg.Since = -1 * cb.MaxAge
return cfg
}

cfg.Cursor = cb.Position
return cfg
}

func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))

Expand Down
129 changes: 128 additions & 1 deletion pkg/promtail/targets/journaltarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func (r *mockJournalReader) Follow(until <-chan time.Time, writer io.Writer) err
return nil
}

func newMockJournalEntry(entry *sdjournal.JournalEntry) journalEntryFunc {
return func(c sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error) {
return entry, nil
}
}

func (r *mockJournalReader) Write(msg string, fields map[string]string) {
allFields := make(map[string]string, len(fields))
for k, v := range fields {
Expand Down Expand Up @@ -94,7 +100,7 @@ func TestJournalTarget(t *testing.T) {
require.NoError(t, err)

jt, err := journalTargetWithReader(logger, client, ps, "test", relabels,
&scrape.JournalTargetConfig{}, newMockJournalReader)
&scrape.JournalTargetConfig{}, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
Expand All @@ -110,3 +116,124 @@ func TestJournalTarget(t *testing.T) {
assert.Len(t, client.messages, 10)
require.NoError(t, jt.Stop())
}

func TestJournalTarget_Since(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{
MaxAge: "4h",
}

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, newMockJournalEntry(nil))
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, -1*time.Hour*4)
}

func TestJournalTarget_Cursor_TooOld(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}
ps.PutString("journal-test", "foobar")

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{}

entryTs := time.Date(1980, time.July, 3, 12, 0, 0, 0, time.UTC)
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
Cursor: "foobar",
Fields: nil,
RealtimeTimestamp: uint64(entryTs.UnixNano()),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, -1*time.Hour*7)
}

func TestJournalTarget_Cursor_NotTooOld(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"

// Set the sync period to a really long value, to guarantee the sync timer
// never runs, this way we know everything saved was done through channel
// notifications when target.stop() was called.
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Fatal(err)
}
ps.PutString("journal-test", "foobar")

client := &TestClient{
log: logger,
messages: make([]string, 0),
}

cfg := scrape.JournalTargetConfig{}

entryTs := time.Now().Add(-time.Hour)
journalEntry := newMockJournalEntry(&sdjournal.JournalEntry{
Cursor: "foobar",
Fields: nil,
RealtimeTimestamp: uint64(entryTs.UnixNano() / int64(time.Microsecond)),
})

jt, err := journalTargetWithReader(logger, client, ps, "test", nil,
&cfg, newMockJournalReader, journalEntry)
require.NoError(t, err)

r := jt.r.(*mockJournalReader)
require.Equal(t, r.config.Since, time.Duration(0))
require.Equal(t, r.config.Cursor, "foobar")
}