Skip to content

Commit

Permalink
Journald reader: correctly handle journalctl restart (#42595)
Browse files Browse the repository at this point in the history
The journald reader would return an error and an empty message when
journalctl was restarting, that would break the multiline parser.

This commit fixes it by handling the "retry read" in the reader
itself.
  • Loading branch information
belimawr authored Feb 7, 2025
1 parent 95b5dce commit 1cd128b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315]
- Remove erroneous status reporting to Elastic-Agent from the Filestream input {pull}42435[42435]
- Fix truncation of bodies in request tracing by limiting bodies to 10% of the maximum file size. {pull}42327[42327]
- [Journald] Fixes handling of `journalctl` restart. A known symptom was broken multiline messages when there was a restart of journalctl while aggregating the lines. {issue}41331[41331] {pull}42595[42595]

*Heartbeat*

Expand Down
47 changes: 36 additions & 11 deletions filebeat/input/journald/pkg/journalctl/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,10 @@ func (r *Reader) Close() error {
return nil
}

// Next returns the next journal entry. If there is no entry available
// next will block until there is an entry or cancel is cancelled.
//
// If cancel is cancelled, Next returns a zero value JournalEntry
// and ErrCancelled.
func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
// next reads the next entry from journalctl. It handles any errors from
// journalctl restarting it as necessary with a backoff strategy. It either
// returns a valid journald entry or ErrCancelled when the input is cancelled.
func (r *Reader) next(cancel input.Canceler) ([]byte, error) {
msg, finished, err := r.jctl.Next(cancel)

// Check if the input has been cancelled
Expand All @@ -273,7 +271,7 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
// journalctl. Cancelling this canceller only means this Next call was
// cancelled. Because the input has been cancelled, we ignore the message
// and any error it might have returned.
return JournalEntry{}, ErrCancelled
return nil, ErrCancelled
default:
// Three options:
// - Journalctl finished reading messages from previous boots
Expand Down Expand Up @@ -343,15 +341,42 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
if restart {
if err := r.newJctl(extraArgs...); err != nil {
// If we cannot restart journalct, there is nothing we can do.
return JournalEntry{}, fmt.Errorf("cannot restart journalctl: %w", err)
return nil, fmt.Errorf("cannot restart journalctl: %w", err)
}

// Return an empty message and wait for the input to call us again
return JournalEntry{}, ErrRestarting
// Return an empty message and wait for the caller to call us again
return nil, ErrRestarting
}
}

return r.handleMessage(msg)
return msg, nil
}

// Next returns the next journal entry. If there is no entry available
// next will block until there is an entry or cancel is cancelled.
//
// If cancel is cancelled, Next returns a zero value JournalEntry
// and ErrCancelled.
func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) {
// r.next returns ErrRestarting when journalctl is restarting,
// this happens in two situations:
// - When the reader first starts, it runs journalctl without the follow
// flat to read messages from all previous boots, journalctl exits once
// all messages are read.
// - journalctl exited unexpectedly and was restarted.
// On both cases Readr.Next must block until we have a valid journal entry
// or the input is cancelled.
for {
msg, err := r.next(cancel)
if err != nil {
if errors.Is(err, ErrRestarting) {
continue
}
return JournalEntry{}, err
}

return r.handleMessage(msg)
}
}

func (r *Reader) handleMessage(msg []byte) (JournalEntry, error) {
Expand Down
39 changes: 34 additions & 5 deletions filebeat/input/journald/pkg/journalctl/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func TestEventWithNonStringData(t *testing.T) {
var jdEvent []byte

func TestRestartsJournalctlOnError(t *testing.T) {
logp.DevelopmentSetup(logp.ToObserverOutput())
ctx := context.Background()

mock := JctlMock{
Expand Down Expand Up @@ -109,15 +110,43 @@ func TestRestartsJournalctlOnError(t *testing.T) {
}

// In the first call the mock will return an error, simulating journalctl crashing
// so we should get ErrRestarting
// the reader must handle it and only return the next valid entry and no error
entry, err := reader.Next(ctx)
if !errors.Is(err, ErrRestarting) {
t.Fatalf("expecting ErrRestarting when calling Next and journalctl crashed: %s", err)
if err != nil {
t.Fatalf("expecting no error, got: %s", err)
}
if isEntryEmpty(entry) {
t.Fatal("the first call to Next cannot return an empty entry")
}
if !isEntryEmpty(entry) {
t.Fatal("the first call to Next must return an empty JournalEntry because 'journalctl has crashed'")

// We need to assert the reader correctly handled the "crash" from journalctl
// so we look for the log messages, there should be exactly 3:
// - First journalctl start
// - an error with the exit code 42
// - the second journalctl start
// The exact log messages are:
// - starting new mock journalclt ID: 1
// - reader error: 'journalctl exited with code 42', restarting...
// - starting new mock journalclt ID: 2

logs := logp.ObserverLogs().TakeAll()
if len(logs) != 3 {
t.Fatalf("expecting 3 log lines from 'input.journald.reader.journalctl-runner', got %d", len(logs))
}

if logs[0].Message != "starting new mock journalclt ID: 1" {
t.Fatalf("first log message must be the mock starting wit ID 1, got '%s' instead", logs[0].Message)
}

if logs[1].Message != "reader error: 'journalctl exited with code 42', restarting..." {
t.Fatalf("second log message must reader error with journalctl exit code 42, got '%s' instead", logs[1].Message)
}

if logs[2].Message != "starting new mock journalclt ID: 2" {
t.Fatalf("third log message must be the mock starting wit ID 2, got '%s' instead", logs[2].Message)
}

// Call Next a couple more times to ensure we can read past the error
for i := 0; i < 2; i++ {
entry, err := reader.Next(ctx)
if err != nil {
Expand Down

0 comments on commit 1cd128b

Please sign in to comment.