Skip to content

Skipping unparsable lines in docker input #12268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125]
- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164]
- Skipping unparsable log entries from docker json reader {pull}12268[12268]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (h *Harvester) Run() error {
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
case reader.ErrLineUnparsable:
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
//line unparsable, go to next line
continue
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
Expand Down
9 changes: 9 additions & 0 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

package reader

import (
"errors"
)

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
Next() (Message, error)
}

var (
//ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed
ErrLineUnparsable = errors.New("line is unparsable")
)
7 changes: 5 additions & 2 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/reader"
)

Expand Down Expand Up @@ -188,7 +189,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
var logLine logLine
err = p.parseLine(&message, &logLine)
if err != nil {
return message, err
logp.Err("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}

// Handle multiline messages, join partial lines
Expand All @@ -204,7 +206,8 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
}
err = p.parseLine(&next, &logLine)
if err != nil {
return message, err
logp.Err("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
}
message.Content = append(message.Content, next.Content...)
}
Expand Down
28 changes: 19 additions & 9 deletions libbeat/reader/readjson/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDockerJSON(t *testing.T) {
partial bool
format string
criflags bool
expectedError bool
expectedError error
expectedMessage reader.Message
}{
{
Expand All @@ -53,7 +53,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong JSON",
input: [][]byte{[]byte(`this is not JSON`)},
stream: "all",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 16,
},
Expand All @@ -62,7 +62,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)},
stream: "all",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 37,
},
Expand All @@ -71,7 +71,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`{this is not JSON nor CRI`)},
stream: "all",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 25,
},
Expand All @@ -80,7 +80,7 @@ func TestDockerJSON(t *testing.T) {
name: "Missing time",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
format: "cri",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand All @@ -217,7 +217,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
format: "docker",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 115,
},
Expand Down Expand Up @@ -289,12 +289,21 @@ func TestDockerJSON(t *testing.T) {
[]byte(`{"log":"shutdown...\n","stream`),
},
stream: "stdout",
expectedError: true,
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 139,
},
partial: true,
},
{
name: "Corrupted log message line",
input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedMessage: reader.Message{
Bytes: 97,
},
},
}

for _, test := range tests {
Expand All @@ -303,8 +312,9 @@ func TestDockerJSON(t *testing.T) {
json := New(r, test.stream, test.partial, test.format, test.criflags)
message, err := json.Next()

if test.expectedError {
if test.expectedError != nil {
assert.Error(t, err)
assert.Equal(t, test.expectedError, err)
} else {
assert.NoError(t, err)
}
Expand Down