The recombine
operator combines consecutive logs into single logs based on simple expression rules.
Field | Default | Description |
---|---|---|
id |
recombine |
A unique identifier for the operator. |
output |
Next in pipeline | The connected operator(s) that will receive all outbound entries. |
on_error |
send |
The behavior of the operator if it encounters an error. See on_error. |
is_first_entry |
An expression that returns true if the entry being processed is the first entry in a multiline series. | |
is_last_entry |
An expression that returns true if the entry being processed is the last entry in a multiline series. | |
combine_field |
required | The field from all the entries that will be recombined. |
combine_with |
"\n" |
The string that is put between the combined entries. This can be an empty string as well. When using special characters like \n , be sure to enclose the value in double quotes: "\n" . |
max_batch_size |
1000 | The maximum number of consecutive entries that will be combined into a single entry. |
max_unmatched_batch_size |
100 | The maximum number of consecutive entries that will be combined into a single entry before the match occurs (with is_first_entry or is_last_entry ), e.g. max_unmatched_batch_size=0 - all entries combined, max_unmatched_batch_size=1 - all entries uncombined until the match occurs, max_unmatched_batch_size=100 - entries combined into 100-entry-packages until the match occurs |
overwrite_with |
newest |
Whether to use the fields from the oldest or the newest entry for all the fields that are not combined. |
force_flush_period |
5s |
Flush timeout after which entries will be flushed aborting the wait for their sub parts to be merged with. |
source_identifier |
$attributes["file.path"] |
The field to separate one source of logs from others when combining them. |
max_sources |
1000 | The maximum number of unique sources allowed concurrently to be tracked for combining separately. |
max_log_size |
0 | The maximum bytes size of the combined field. Once the size exceeds the limit, all received entries of the source will be combined and flushed. "0" of max_log_size means no limit. |
Exactly one of is_first_entry
and is_last_entry
must be specified.
NOTE: this operator is only designed to work with a single input. It does not keep track of what operator entries are coming from, so it can't combine based on source.
Kubernetes logs in the CRI format have a tag that indicates whether the log entry is part of a longer log line (P) or the final entry (F). Using this tag, we can recombine the CRI logs back into complete log lines.
Configuration:
- type: file_input
include:
- ./input.log
- type: regex_parser
regex: '^(?P<timestamp>[^\s]+) (?P<stream>\w+) (?P<logtag>\w) (?P<message>.*)'
- type: recombine
combine_field: body.message
combine_with: ""
is_last_entry: "body.logtag == 'F'"
overwrite_with: "newest"
Input file:
2016-10-06T00:17:09.669794202Z stdout F Single entry log 1
2016-10-06T00:17:10.113242941Z stdout P This is a very very long line th
2016-10-06T00:17:10.113242941Z stdout P at is really really long and spa
2016-10-06T00:17:10.113242941Z stdout F ns across multiple log entries
Output logs:
[
{
"timestamp": "2020-12-04T13:03:38.41149-05:00",
"severity": 0,
"body": {
"message": "Single entry log 1",
"logtag": "F",
"stream": "stdout",
"timestamp": "2016-10-06T00:17:09.669794202Z"
}
},
{
"timestamp": "2020-12-04T13:03:38.411664-05:00",
"severity": 0,
"body": {
"message": "This is a very very long line that is really really long and spans across multiple log entries",
"logtag": "F",
"stream": "stdout",
"timestamp": "2016-10-06T00:17:10.113242941Z"
}
}
]
Some apps output multiple log lines which are in fact a single log record. A common example is a stack trace:
java.lang.Exception: Stack trace
at java.lang.Thread.dumpStack(Thread.java:1336)
at Main.demo3(Main.java:15)
at Main.demo2(Main.java:12)
at Main.demo1(Main.java:9)
at Main.demo(Main.java:6)
at Main.main(Main.java:3)
To recombine such log lines into a single log record, you need a way to tell when a log record starts or ends. In the example above, the first line differs from the other lines in not starting with a whitespace. This can be expressed with the following configuration:
- type: recombine
combine_field: body
is_first_entry: body matches "^[^\\s]"
Given the following input file:
Log message 1
Error: java.lang.Exception: Stack trace
at java.lang.Thread.dumpStack(Thread.java:1336)
at Main.demo3(Main.java:15)
at Main.demo2(Main.java:12)
at Main.demo1(Main.java:9)
at Main.demo(Main.java:6)
at Main.main(Main.java:3)
Another log message
The following logs will be output:
[
{
"timestamp": "2020-12-04T13:03:38.41149-05:00",
"severity": 0,
"body": "Log message 1"
},
{
"timestamp": "2020-12-04T13:03:38.41149-05:00",
"severity": 0,
"body": "Error: java.lang.Exception: Stack trace\n at java.lang.Thread.dumpStack(Thread.java:1336)\n at Main.demo3(Main.java:15)\n at Main.demo2(Main.java:12)\n at Main.demo1(Main.java:9)\n at Main.demo(Main.java:6)\n at Main.main(Main.java:3)"
},
{
"timestamp": "2020-12-04T13:03:38.41149-05:00",
"severity": 0,
"body": "Another log message",
},
]
When max_unmatched_batch_size
is set to 0
all logs entries will be combined into a single entry before the match occurs.
Example configuration:
- type: recombine
combine_field: body
is_first_entry: body == 'log1'
max_unmatched_batch_size: 0
Input log entries:
log2
log3
log4
log1
log5
log6
log7
log1
Output log entries:
log2\nlog3\nlog4
log1\nlog5\nlog6\nlog7
log1
When max_unmatched_batch_size
is set to 1
all logs entries will not be combined before the match occurs.
Example configuration:
- type: recombine
combine_field: body
is_first_entry: body == 'log1'
max_unmatched_batch_size: 1
Input log entries:
log2
log3
log4
log1
log5
log6
log7
log1
Output log entries:
log2
log3
log4
log1\nlog5\nlog6\nlog7
log1
When max_unmatched_batch_size
is set to 3
entries will be combined into 3-entry-packages until the match occurs.
Example configuration with is_first_entry
:
- type: recombine
combine_field: body
is_first_entry: body == 'log1'
max_unmatched_batch_size: 3
Input log entries:
log2
log3
log4
log5
log1
log6
log7
log1
log8
log1
Output log entries:
log2\nlog3\nlog4
log5
log1\nlog6\nlog7
log1\nlog8
log1
Example configuration with is_last_entry
:
- type: recombine
combine_field: body
is_last_entry: body == 'log1'
max_unmatched_batch_size: 3
Input log entries:
log2
log3
log4
log5
log1
log6
log7
log1
log8
log1
Output log entries:
log2\nlog3\nlog4
log5\nlog1
log6\nlog7\nlog1
log8\nlog1